From 1a7d66b09dd6ac9a938546a155e8a3c4b0919c7e Mon Sep 17 00:00:00 2001 From: Anaz Date: Fri, 10 Jan 2025 09:43:40 +0400 Subject: [PATCH] deleted unused file DManagement_api.py --- DManagement_api.py | 237 --------------------------------------------- 1 file changed, 237 deletions(-) delete mode 100644 DManagement_api.py diff --git a/DManagement_api.py b/DManagement_api.py deleted file mode 100644 index a96fe14..0000000 --- a/DManagement_api.py +++ /dev/null @@ -1,237 +0,0 @@ - -from fastapi import FastAPI, Depends, HTTPException, status -from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm -from pydantic import BaseModel, EmailStr -from typing import List, Optional -from datetime import datetime, timedelta -from jose import JWTError, jwt -from passlib.context import CryptContext -import mysql.connector -from mysql.connector import Error -import logging -import os -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - -# Configuration du logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Configuration de l'application FastAPI -app = FastAPI(title="Disaster Management API", description="API for managing disaster response and communication.") - -# Configuration pour la sécurité -SECRET_KEY = os.getenv("SECRET_KEY") -ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 30 - -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") - -# Database configuration -DB_CONFIG = { - 'host': os.getenv('DB_HOST'), - 'user': os.getenv('DB_USER'), - 'password': os.getenv('DB_PASSWORD'), - 'database': os.getenv('DB_NAME'), -} - -# Database connection function -def get_db_connection(): - try: - connection = mysql.connector.connect(**DB_CONFIG) - return connection - except Error as e: - logger.error(f"Error connecting to MySQL database: {e}") - raise HTTPException(status_code=500, detail="Database connection error") - -# Modèles de données -class UserBase(BaseModel): - email: EmailStr - full_name: str - phone: str - date_of_birth: datetime - organization: Optional[str] = None - role: Optional[str] = "citizen" - -class UserCreate(UserBase): - password: str - -class UserResponse(UserBase): - id: int - role: str - -class Token(BaseModel): - access_token: str - token_type: str - -class TokenData(BaseModel): - email: Optional[str] = None - -class PersonReport(BaseModel): - full_name: str - date_of_birth: datetime - status: str - location: Optional[str] = None - gps_coordinates: Optional[str] = None - photo_url: Optional[str] = None - reporter_email: str - -class NeedRequest(BaseModel): - category: str - description: str - adults: int - children: int - vulnerable: int - location: str - gps_coordinates: Optional[str] = None - requester_email: str - -class PointOfInterest(BaseModel): - label: str - description: str - icon: str - organization: str - gps_coordinates: str - -class Shelter(BaseModel): - label: str - description: str - status: str - contact_person: str - gps_coordinates: str - added_by: str - -# Utilitaires -def verify_password(plain_password, hashed_password): - return pwd_context.verify(plain_password, hashed_password) - -def get_password_hash(password): - return pwd_context.hash(password) - -def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): - to_encode = data.copy() - expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) - to_encode.update({"exp": expire}) - return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) - -def get_user(email: str): - connection = get_db_connection() - cursor = connection.cursor(dictionary=True) - try: - cursor.execute("SELECT * FROM users WHERE email = %s", (email,)) - user = cursor.fetchone() - return user - finally: - cursor.close() - connection.close() - -# Endpoints de l'API -@app.post("/signup", response_model=UserResponse) -async def signup(user: UserCreate): - connection = get_db_connection() - cursor = connection.cursor() - try: - # Check if user already exists - cursor.execute("SELECT * FROM users WHERE email = %s", (user.email,)) - if cursor.fetchone(): - raise HTTPException(status_code=400, detail="Email already registered") - - hashed_password = get_password_hash(user.password) - query = """INSERT INTO users (email, full_name, phone, date_of_birth, hashed_password, organization, role) - VALUES (%s, %s, %s, %s, %s, %s, %s)""" - values = (user.email, user.full_name, user.phone, user.date_of_birth, hashed_password, user.organization, user.role) - cursor.execute(query, values) - connection.commit() - user_id = cursor.lastrowid - logger.info(f"User created successfully: {user.email}") - return UserResponse(id=user_id, **user.dict(exclude={'password'})) - except mysql.connector.IntegrityError as e: - logger.error(f"IntegrityError creating user: {e}") - raise HTTPException(status_code=400, detail=f"Database integrity error: {str(e)}") - except Error as e: - logger.error(f"Error creating user: {e}") - raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") - finally: - cursor.close() - connection.close() - -@app.post("/token", response_model=Token) -async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): - user = get_user(form_data.username) - if not user or not verify_password(form_data.password, user["hashed_password"]): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - access_token = create_access_token(data={"sub": user["email"]}) - logger.info(f"User logged in: {user['email']}") - return {"access_token": access_token, "token_type": "bearer"} - -@app.get("/users/me", response_model=UserResponse) -async def read_users_me(token: str = Depends(oauth2_scheme)): - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - email: str = payload.get("sub") - if email is None: - raise credentials_exception - token_data = TokenData(email=email) - except JWTError: - raise credentials_exception - user = get_user(token_data.email) - if user is None: - raise credentials_exception - return UserResponse(**user) - -@app.post("/report_person", status_code=201) -async def report_person(person: PersonReport): - connection = get_db_connection() - cursor = connection.cursor() - try: - query = """INSERT INTO person_reports (full_name, date_of_birth, status, location, gps_coordinates, photo_url, reporter_email) - VALUES (%s, %s, %s, %s, %s, %s, %s)""" - values = (person.full_name, person.date_of_birth, person.status, person.location, person.gps_coordinates, person.photo_url, person.reporter_email) - cursor.execute(query, values) - connection.commit() - logger.info(f"Person report added: {person.full_name}") - return {"message": "Person report added successfully"} - except Error as e: - logger.error(f"Error adding person report: {e}") - raise HTTPException(status_code=500, detail="Could not add person report") - finally: - cursor.close() - connection.close() - -@app.get("/persons", response_model=List[PersonReport]) -async def get_persons(status: Optional[str] = None): - connection = get_db_connection() - cursor = connection.cursor(dictionary=True) - try: - if status: - query = "SELECT * FROM person_reports WHERE status = %s" - cursor.execute(query, (status,)) - else: - query = "SELECT * FROM person_reports" - cursor.execute(query) - persons = cursor.fetchall() - return [PersonReport(**person) for person in persons] - except Error as e: - logger.error(f"Error fetching persons: {e}") - raise HTTPException(status_code=500, detail="Could not fetch persons") - finally: - cursor.close() - connection.close() - -# Similar modifications for other endpoints (request_need, add_poi, add_shelter, etc.) - -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000)