from sqlalchemy import insert, select, update, delete from fastapi import HTTPException, Depends, status from fastapi.security import OAuth2PasswordBearer from models.schemas import Role, TokenData from models.db import roles_table, users_table from config.database import get_db from config.settings import settings from jose import jwt, JWTError from sqlalchemy.ext.asyncio import AsyncSession oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") class RoleService: @staticmethod async def create_role(role: Role, db: AsyncSession, token: str): """ Crée un nouveau rôle (réservé aux administrateurs). """ await RoleService.admin_required(token, db) query = insert(roles_table).values( name=role.name, permissions=",".join(role.permissions), # Stocke les permissions sous forme de chaîne ) try: result = await db.execute(query) await db.commit() role_id = result.inserted_primary_key[0] return {"id": role_id, **role.dict()} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}") @staticmethod async def update_role(role_id: int, data: dict, db: AsyncSession, token: str): """ Met à jour un rôle par son ID (réservé aux administrateurs). """ await RoleService.admin_required(token, db) if "permissions" in data: data["permissions"] = ",".join(data["permissions"]) # Convertit les permissions en chaîne query = ( update(roles_table) .where(roles_table.c.id == role_id) .values(**data) ) try: result = await db.execute(query) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Role not found") await db.commit() return {"message": "Role updated successfully"} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}") @staticmethod async def delete_role(role_id: int, db: AsyncSession, token: str): """ Supprime un rôle par son ID (réservé aux administrateurs). """ await RoleService.admin_required(token, db) query = delete(roles_table).where(roles_table.c.id == role_id) try: result = await db.execute(query) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Role not found") await db.commit() return {"message": "Role deleted successfully"} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not delete role: {str(e)}") @staticmethod async def admin_required(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)): """ Vérifie si l'utilisateur actuel est un administrateur. """ credentials_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action.", ) try: # Décodage du token JWT payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = TokenData(email=email) except JWTError: raise credentials_exception # Récupère l'utilisateur depuis la base de données user_query = select(users_table).where(users_table.c.email == token_data.email) result = await db.execute(user_query) user = result.fetchone() if user is None: raise credentials_exception # Vérifie si l'utilisateur a le rôle d'administrateur if user["role"] != "admin": raise credentials_exception return user