Téléverser les fichiers vers "/"

main
Anaz 2025-01-09 09:08:52 +01:00
parent e6dd7b7e9f
commit c8c12372ef
5 changed files with 309 additions and 0 deletions

30
.env Normal file
View File

@ -0,0 +1,30 @@
# Configuration de la base de données
DATABASE_URL=mysql+aiomysql://sywmtnsg_admin:EEy_>2JJS0@localhost:6033/sywmtnsg_dm_management
# Configuration pour JWT
SECRET_KEY=LAGs7G8Sis9aQHcipROxpjYRxFZKjr4wNm-_O0pBTkjNYv1rgPUR87VcNswH_VYGpIrsyGdqnNa3vcVSH0f5Tg
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
# Configuration pour AWS S3 (à remplir si nécessaire)
AWS_ACCESS_KEY_ID=your-aws-access-key
AWS_SECRET_ACCESS_KEY=your-aws-secret-key
AWS_BUCKET_NAME=your-bucket-name
# Configuration pour Celery (à remplir si nécessaire)CELERY_BROKER_URL=redis://venguard.m-a-i.tech/
CELERY_BROKER_URL=redis://localhost:6379/0
# Configuration pour les logs
LOG_LEVEL=INFO
# Configuration pour l'envoi d'emails (à remplir si nécessaire)
EMAIL_HOST=smtp.example.com
EMAIL_PORT=587
EMAIL_USERNAME=your-email@example.com
EMAIL_PASSWORD=your-email-password
# Configuration pour le RGPD
GDPR_DELETION_DELAY_DAYS=7
# Configuration pour les tests
TESTING=False

237
DManagement_api.py Normal file
View File

@ -0,0 +1,237 @@
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
import mysql.connector
from mysql.connector import Error
import logging
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration du logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration de l'application FastAPI
app = FastAPI(title="Disaster Management API", description="API for managing disaster response and communication.")
# Configuration pour la sécurité
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
# Database configuration
DB_CONFIG = {
'host': os.getenv('DB_HOST'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME'),
}
# Database connection function
def get_db_connection():
try:
connection = mysql.connector.connect(**DB_CONFIG)
return connection
except Error as e:
logger.error(f"Error connecting to MySQL database: {e}")
raise HTTPException(status_code=500, detail="Database connection error")
# Modèles de données
class UserBase(BaseModel):
email: EmailStr
full_name: str
phone: str
date_of_birth: datetime
organization: Optional[str] = None
role: Optional[str] = "citizen"
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: int
role: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None
class PersonReport(BaseModel):
full_name: str
date_of_birth: datetime
status: str
location: Optional[str] = None
gps_coordinates: Optional[str] = None
photo_url: Optional[str] = None
reporter_email: str
class NeedRequest(BaseModel):
category: str
description: str
adults: int
children: int
vulnerable: int
location: str
gps_coordinates: Optional[str] = None
requester_email: str
class PointOfInterest(BaseModel):
label: str
description: str
icon: str
organization: str
gps_coordinates: str
class Shelter(BaseModel):
label: str
description: str
status: str
contact_person: str
gps_coordinates: str
added_by: str
# Utilitaires
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_user(email: str):
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))
user = cursor.fetchone()
return user
finally:
cursor.close()
connection.close()
# Endpoints de l'API
@app.post("/signup", response_model=UserResponse)
async def signup(user: UserCreate):
connection = get_db_connection()
cursor = connection.cursor()
try:
# Check if user already exists
cursor.execute("SELECT * FROM users WHERE email = %s", (user.email,))
if cursor.fetchone():
raise HTTPException(status_code=400, detail="Email already registered")
hashed_password = get_password_hash(user.password)
query = """INSERT INTO users (email, full_name, phone, date_of_birth, hashed_password, organization, role)
VALUES (%s, %s, %s, %s, %s, %s, %s)"""
values = (user.email, user.full_name, user.phone, user.date_of_birth, hashed_password, user.organization, user.role)
cursor.execute(query, values)
connection.commit()
user_id = cursor.lastrowid
logger.info(f"User created successfully: {user.email}")
return UserResponse(id=user_id, **user.dict(exclude={'password'}))
except mysql.connector.IntegrityError as e:
logger.error(f"IntegrityError creating user: {e}")
raise HTTPException(status_code=400, detail=f"Database integrity error: {str(e)}")
except Error as e:
logger.error(f"Error creating user: {e}")
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
finally:
cursor.close()
connection.close()
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user["email"]})
logger.info(f"User logged in: {user['email']}")
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=UserResponse)
async def read_users_me(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
user = get_user(token_data.email)
if user is None:
raise credentials_exception
return UserResponse(**user)
@app.post("/report_person", status_code=201)
async def report_person(person: PersonReport):
connection = get_db_connection()
cursor = connection.cursor()
try:
query = """INSERT INTO person_reports (full_name, date_of_birth, status, location, gps_coordinates, photo_url, reporter_email)
VALUES (%s, %s, %s, %s, %s, %s, %s)"""
values = (person.full_name, person.date_of_birth, person.status, person.location, person.gps_coordinates, person.photo_url, person.reporter_email)
cursor.execute(query, values)
connection.commit()
logger.info(f"Person report added: {person.full_name}")
return {"message": "Person report added successfully"}
except Error as e:
logger.error(f"Error adding person report: {e}")
raise HTTPException(status_code=500, detail="Could not add person report")
finally:
cursor.close()
connection.close()
@app.get("/persons", response_model=List[PersonReport])
async def get_persons(status: Optional[str] = None):
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
if status:
query = "SELECT * FROM person_reports WHERE status = %s"
cursor.execute(query, (status,))
else:
query = "SELECT * FROM person_reports"
cursor.execute(query)
persons = cursor.fetchall()
return [PersonReport(**person) for person in persons]
except Error as e:
logger.error(f"Error fetching persons: {e}")
raise HTTPException(status_code=500, detail="Could not fetch persons")
finally:
cursor.close()
connection.close()
# Similar modifications for other endpoints (request_need, add_poi, add_shelter, etc.)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

10
htaccess Normal file
View File

@ -0,0 +1,10 @@
# DO NOT REMOVE. CLOUDLINUX PASSENGER CONFIGURATION BEGIN
PassengerAppRoot "/home/sywmtnsg/api"
PassengerBaseURI "/"
PassengerPython "/home/sywmtnsg/virtualenv/api/3.11/bin/python"
# DO NOT REMOVE. CLOUDLINUX PASSENGER CONFIGURATION END
PassengerFriendlyErrorPages on
# DO NOT REMOVE OR MODIFY. CLOUDLINUX ENV VARS CONFIGURATION BEGIN
<IfModule Litespeed>
</IfModule>
# DO NOT REMOVE OR MODIFY. CLOUDLINUX ENV VARS CONFIGURATION END

25
main.py Normal file
View File

@ -0,0 +1,25 @@
from fastapi import FastAPI
from api.v1 import users, roles, need_requests, reports, uploads, person_reports, technical_issues, points_of_interest, shelters, auth #, messages
from config.database import init_db
app = FastAPI(title="Disaster Management API", description="API for managing disaster during these days of ending world.")
# Include routers
app.include_router(auth.router, prefix="/api/v1/auth", tags=["auth"])
app.include_router(person_reports.router, prefix="/api/v1/person_reports", tags=["person_reports"])
app.include_router(users.router, prefix="/api/v1/users", tags=["users"])
app.include_router(roles.router, prefix="/api/v1/roles", tags=["roles"])
app.include_router(need_requests.router, prefix="/api/v1/needs", tags=["needs"])
app.include_router(reports.router, prefix="/api/v1/reports", tags=["reports"])
#app.include_router(messages.router, prefix="/api/v1/messages", tags=["messages"])
app.include_router(uploads.router, prefix="/api/v1/uploads", tags=["uploads"])
app.include_router(technical_issues.router, prefix="/api/v1/technical_issues", tags=["technical_issues"])
app.include_router(points_of_interest.router, prefix="/api/v1/points_of_interest", tags=["points_of_interest"])
app.include_router(shelters.router, prefix="/api/v1/shelters", tags=["shelters"])
# Initialize database
@app.on_event("startup")
async def startup():
await init_db()

7
passenger_wsgi.py Normal file
View File

@ -0,0 +1,7 @@
import imp
import os
import sys
from a2wsgi import ASGIMiddleware
from main import app
application = ASGIMiddleware(app)