from sqlalchemy import insert, select, update from models.schemas import NeedRequestCreate from config.database import get_db from models.db import need_requests_table, users_table from models.schemas import TokenData from fastapi import HTTPException, Depends, logger, status from fastapi.security import OAuth2PasswordBearer from config.settings import settings from jose import jwt, JWTError from datetime import datetime oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token") class NeedRequestService: @staticmethod async def create_need(need: NeedRequestCreate, db): query = insert(need_requests_table).values( category=need.category, description=need.description, adults=need.adults, children=need.children, vulnerable=need.vulnerable, location=need.location, gps_coordinates=need.gps_coordinates, requester_email=need.requester_email ) try: result = await db.execute(query) await db.commit() need_id = result.inserted_primary_key[0] return {"id": need_id, **need.model_dump()} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not create need request: {str(e)}") @staticmethod async def get_need(need_id: int, db): query = select(need_requests_table).where(need_requests_table.c.id == need_id) result = await db.execute(query) need = result.mappings().fetchone() if need is None: raise HTTPException(status_code=404, detail="Need request not found") return dict(need) @staticmethod async def get_all_needs(db): query = select(need_requests_table).where(need_requests_table.c.deleted == None) result = await db.execute(query) needs = result.mappings().all() return [dict(need) for need in needs] @staticmethod async def update_need(need_id: int, data: dict, db, token: str): user = await NeedRequestService.verify_requester_or_admin(need_id, token, db) if "deleted" in data: # Emp�che la mise � jour directe du champ `deleted` raise HTTPException(status_code=400, detail="Invalid update field") query = ( update(need_requests_table) .where(need_requests_table.c.id == need_id) .values(**data) ) try: result = await db.execute(query) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Need request not found") await db.commit() return {"message": "Need request updated successfully"} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not update need request: {str(e)}") @staticmethod async def delete_need(need_id: int, db, token: str): user = await NeedRequestService.verify_requester_or_admin(need_id, token, db) query = ( update(need_requests_table) .where(need_requests_table.c.id == need_id) .values(deleted=datetime.now(datetime.timezone.utc())) ) try: result = await db.execute(query) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Need request not found") await db.commit() return {"message": "Need request deleted successfully"} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not delete need request: {str(e)}") @staticmethod async def verify_requester_or_admin(need_id: int, token: str, db): credentials_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action.", ) try: # D�codage du token JWT payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = TokenData(email=email) except JWTError: raise credentials_exception # R�cup�re l'utilisateur depuis la base de donn�es user_query = select(users_table).where(users_table.c.email == token_data.email) result = await db.execute(user_query) user = result.mappings().fetchone() logger.info("user loooooooooooooooooooo: " +user) if user is None: raise credentials_exception # R�cup�re la demande de besoin need_query = select(need_requests_table).where(need_requests_table.c.id == need_id) result = await db.execute(need_query) need = result.mappings().fetchone() if need is None: raise HTTPException(status_code=404, detail="Need request not found") # V�rifie si l'utilisateur est l'auteur ou un administrateur if need["requester_email"] != user["email"] and user["role"] != "admin": return False return True