from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from config.settings import settings from models.schemas import TokenData, UserCreate, UserResponse from config.database import get_db from models.db import users_table, roles_table, role_permissions_table, permissions_table from sqlalchemy import select, update, insert from sqlalchemy.ext.asyncio import AsyncSession from utils.logging import logger logger.info("Test log message") # Configuration pour le hachage des mots de passe pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Configuration pour OAuth2 class AuthService: @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: passEncr = pwd_context.hash(plain_password) passEncrTest123Azerty = pwd_context.hash("123Azerty+@") logger.info("password 123Azerty+@ encrypted : " +passEncrTest123Azerty) logger.info("password input : " +plain_password) logger.info("password input and encrypted : " +passEncr) logger.info("password hashed (in BDD) : "+ hashed_password) logger.info("verification plain_password & hashed_password (in BDD) : "+ str(pwd_context.verify(plain_password, hashed_password))) logger.info("verification plain_password & 123Azerty+@ encrypted : "+ str(pwd_context.verify(plain_password, passEncrTest123Azerty))) return pwd_context.verify(plain_password, hashed_password) @staticmethod def get_password_hash(password: str) -> str: return pwd_context.hash(password) @staticmethod async def authenticate_user(email: str, password: str, db): query = select(users_table).where(users_table.c.email == email) result = await db.execute(query) user = result.mappings().fetchone() # Récupère un dictionnaire plutôt qu'un tuple logger.info("user authenticated", extra={"email": email}) if not user: return None if not AuthService.verify_password(password, user["hashed_password"]): return None return user @staticmethod def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) @staticmethod def verify_token(token: str) -> dict: try: # Décoder le token payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) # Vérifier si le token est expiré expiration_time = payload.get("exp") if expiration_time and datetime.now(timezone.utc).timestamp() > expiration_time: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", ) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", ) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", ) @staticmethod async def get_current_user(token: str, db): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception except JWTError: raise credentials_exception query = select(users_table).where(users_table.c.email == email) result = await db.execute(query) user = result.mappings().fetchone() if user is None: raise credentials_exception # Récupérer le rôle et ses permissions role_query = select(roles_table.c.id, roles_table.c.name, permissions_table.c.name.label("permission")).join( role_permissions_table, role_permissions_table.c.role_id == roles_table.c.id ).join( permissions_table, role_permissions_table.c.permission_id == permissions_table.c.id ).where(roles_table.c.name == user["role"]) role_result = await db.execute(role_query) role_data = role_result.mappings().all() role = { "id": role_data[0]["id"], "name": role_data[0]["name"], "permissions": [r["permission"] for r in role_data] } # Préparez la réponse avec tous les champs requis return { "id": user["id"], "email": user["email"], "full_name": user["full_name"], "phone": user["phone"], "date_of_birth": user["date_of_birth"].isoformat(), "role": role, "is_banned": user["is_banned"], "is_deleted": user["is_deleted"], } # @staticmethod # async def create_user(user: UserCreate, db): # result = await db.execute(select(users_table)) # users = result.fetchall() # role = "admin" if len(users) == 0 else user.role # hashed_password = AuthService.get_password_hash(user.password) # query = insert(users_table).values( # email=user.email, # full_name=user.full_name, # phone=user.phone, # date_of_birth=user.date_of_birth, # organization=user.organization, # hashed_password=hashed_password, # role=role # ) # try: # result = await db.execute(query) # await db.commit() # user_id = result.inserted_primary_key[0] # logger.info("user created", extra={"user_id": user_id, "email": user.email, "role": role}) # return {"id": user_id, "email": user.email, "role": role} # except Exception as e: # await db.rollback() # logger.error("could not create user", extra={"error": str(e)}) # raise HTTPException(status_code=500, detail=f"Could not create user: {str(e)}") @staticmethod async def get_role_by_name(role_name: str, db: AsyncSession = Depends(get_db)): query = select(roles_table).where(roles_table.c.name == role_name) result = await db.execute(query) role = result.mappings().first() return role @staticmethod async def admin_required(token: str, db): credentials_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action.", ) try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = TokenData(email=email) except JWTError: logger.error("could not decode token", extra={"token": token}) raise credentials_exception query = users_table.select().where(users_table.c.email == token_data.email) result = await db.execute(query) user = result.mappings().first() if user is None: raise credentials_exception if user["role"] != "admin": logger.error("admin required", extra={"email": token_data.email}) raise credentials_exception return user @staticmethod async def update_user(user_id: int, updates: dict, token: str, db): """ Met à jour les informations d'un utilisateur. - Un utilisateur peut mettre à jour ses propres informations. - Un administrateur peut mettre à jour les informations de n'importe quel utilisateur. """ # Décodage et vérification des permissions credentials_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action.", ) try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = TokenData(email=email) except JWTError: raise credentials_exception query = select(users_table).where(users_table.c.email == token_data.email) result = await db.execute(query) current_user = result.mappings().first() if current_user is None: raise credentials_exception # Vérifier si l'utilisateur est l'admin ou lui-même if current_user["id"] != user_id and current_user["role"] != "admin": raise credentials_exception # Mise à jour des champs autorisés allowed_updates = {"full_name", "phone", "date_of_birth", "organization", "email"} updates = {key: value for key, value in updates.items() if key in allowed_updates} if "email" in updates: # Vérifie si l'email existe déjà existing_email_query = select(users_table).where(users_table.c.email == updates["email"]) result = await db.execute(existing_email_query) existing_user = result.fetchone() if existing_user and existing_user["id"] != user_id: raise HTTPException(status_code=400, detail="Email already registered by another user.") query = ( update(users_table) .where(users_table.c.id == user_id) .values(**updates) .execution_options(synchronize_session="fetch") ) try: await db.execute(query) await db.commit() except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not update user: {str(e)}") # Récupérer les informations mises à jour updated_user_query = select(users_table).where(users_table.c.id == user_id) result = await db.execute(updated_user_query) updated_user = result.mappings().first() return UserResponse(**updated_user) @staticmethod async def get_user_by_email(email: str, db): query = select(users_table).where(users_table.c.email == email) result = await db.execute(query) return result.mappings().first() @staticmethod async def check_permissions(token: str, db, required_permissions: list): credentials_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action.", ) try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = TokenData(email=email) except JWTError: logger.error("could not decode token", extra={"token": token}) raise credentials_exception query = users_table.select().where(users_table.c.email == token_data.email) result = await db.execute(query) user = result.mappings().first() if user is None: raise credentials_exception if user["role"] not in required_permissions: logger.error("permission denied", extra={"email": token_data.email, "required_permissions": required_permissions}) raise credentials_exception logger.info("permission granted", extra={"email": token_data.email, "role": user["role"]})