deleted unused file DManagement_api.py

main
Anaz 2025-01-10 09:43:40 +04:00
parent 3ab2f0884a
commit 1a7d66b09d
1 changed files with 0 additions and 237 deletions

View File

@ -1,237 +0,0 @@
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
import mysql.connector
from mysql.connector import Error
import logging
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration du logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration de l'application FastAPI
app = FastAPI(title="Disaster Management API", description="API for managing disaster response and communication.")
# Configuration pour la sécurité
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
# Database configuration
DB_CONFIG = {
'host': os.getenv('DB_HOST'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME'),
}
# Database connection function
def get_db_connection():
try:
connection = mysql.connector.connect(**DB_CONFIG)
return connection
except Error as e:
logger.error(f"Error connecting to MySQL database: {e}")
raise HTTPException(status_code=500, detail="Database connection error")
# Modèles de données
class UserBase(BaseModel):
email: EmailStr
full_name: str
phone: str
date_of_birth: datetime
organization: Optional[str] = None
role: Optional[str] = "citizen"
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: int
role: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None
class PersonReport(BaseModel):
full_name: str
date_of_birth: datetime
status: str
location: Optional[str] = None
gps_coordinates: Optional[str] = None
photo_url: Optional[str] = None
reporter_email: str
class NeedRequest(BaseModel):
category: str
description: str
adults: int
children: int
vulnerable: int
location: str
gps_coordinates: Optional[str] = None
requester_email: str
class PointOfInterest(BaseModel):
label: str
description: str
icon: str
organization: str
gps_coordinates: str
class Shelter(BaseModel):
label: str
description: str
status: str
contact_person: str
gps_coordinates: str
added_by: str
# Utilitaires
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_user(email: str):
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))
user = cursor.fetchone()
return user
finally:
cursor.close()
connection.close()
# Endpoints de l'API
@app.post("/signup", response_model=UserResponse)
async def signup(user: UserCreate):
connection = get_db_connection()
cursor = connection.cursor()
try:
# Check if user already exists
cursor.execute("SELECT * FROM users WHERE email = %s", (user.email,))
if cursor.fetchone():
raise HTTPException(status_code=400, detail="Email already registered")
hashed_password = get_password_hash(user.password)
query = """INSERT INTO users (email, full_name, phone, date_of_birth, hashed_password, organization, role)
VALUES (%s, %s, %s, %s, %s, %s, %s)"""
values = (user.email, user.full_name, user.phone, user.date_of_birth, hashed_password, user.organization, user.role)
cursor.execute(query, values)
connection.commit()
user_id = cursor.lastrowid
logger.info(f"User created successfully: {user.email}")
return UserResponse(id=user_id, **user.dict(exclude={'password'}))
except mysql.connector.IntegrityError as e:
logger.error(f"IntegrityError creating user: {e}")
raise HTTPException(status_code=400, detail=f"Database integrity error: {str(e)}")
except Error as e:
logger.error(f"Error creating user: {e}")
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
finally:
cursor.close()
connection.close()
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user["email"]})
logger.info(f"User logged in: {user['email']}")
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=UserResponse)
async def read_users_me(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
user = get_user(token_data.email)
if user is None:
raise credentials_exception
return UserResponse(**user)
@app.post("/report_person", status_code=201)
async def report_person(person: PersonReport):
connection = get_db_connection()
cursor = connection.cursor()
try:
query = """INSERT INTO person_reports (full_name, date_of_birth, status, location, gps_coordinates, photo_url, reporter_email)
VALUES (%s, %s, %s, %s, %s, %s, %s)"""
values = (person.full_name, person.date_of_birth, person.status, person.location, person.gps_coordinates, person.photo_url, person.reporter_email)
cursor.execute(query, values)
connection.commit()
logger.info(f"Person report added: {person.full_name}")
return {"message": "Person report added successfully"}
except Error as e:
logger.error(f"Error adding person report: {e}")
raise HTTPException(status_code=500, detail="Could not add person report")
finally:
cursor.close()
connection.close()
@app.get("/persons", response_model=List[PersonReport])
async def get_persons(status: Optional[str] = None):
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
try:
if status:
query = "SELECT * FROM person_reports WHERE status = %s"
cursor.execute(query, (status,))
else:
query = "SELECT * FROM person_reports"
cursor.execute(query)
persons = cursor.fetchall()
return [PersonReport(**person) for person in persons]
except Error as e:
logger.error(f"Error fetching persons: {e}")
raise HTTPException(status_code=500, detail="Could not fetch persons")
finally:
cursor.close()
connection.close()
# Similar modifications for other endpoints (request_need, add_poi, add_shelter, etc.)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)