from sqlalchemy import insert, select, update, delete from fastapi import HTTPException, Depends, status from fastapi.security import OAuth2PasswordBearer from models.schemas import TokenData from models.db import roles_table, permissions_table, role_permissions_table, users_table from config.database import get_db from config.settings import settings from jose import jwt, JWTError from sqlalchemy.ext.asyncio import AsyncSession oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token") class RoleService: @staticmethod async def get_all_roles(db: AsyncSession): query = select(roles_table) result = await db.execute(query) roles = result.mappings().all() # Inclure les permissions pour chaque rôle roles_with_permissions = [] for role in roles: permissions_query = ( select(permissions_table.c.name) .join(role_permissions_table, permissions_table.c.id == role_permissions_table.c.permission_id) .where(role_permissions_table.c.role_id == role['id']) ) result = await db.execute(permissions_query) permissions_result = result.mappings().all() permissions = [p["name"] for p in permissions_result] role_data = dict(role) role_data["permissions"] = permissions roles_with_permissions.append(role_data) return roles_with_permissions @staticmethod async def create_role(name: str, permissions: list, db, token: str): """ Crée un nouveau rôle avec les permissions spécifiées (réservé aux administrateurs). """ await RoleService.admin_required(token, db) # Vérifier que les permissions sont valides valid_permissions = settings.available_permissions filtered_permissions = [p for p in permissions if p in valid_permissions] if not filtered_permissions: raise HTTPException(status_code=400, detail="No valid permissions provided.") # Insérer le rôle query = insert(roles_table).values(name=name) try: result = await db.execute(query) await db.commit() role_id = result.inserted_primary_key[0] # Associer les permissions valides au rôle for permission in filtered_permissions: permission_query = select(permissions_table).where(permissions_table.c.name == permission) permission_result = await db.execute(permission_query) permission_record = permission_result.mappings().fetchone() if permission_record: insert_query = insert(role_permissions_table).values( role_id=role_id, permission_id=permission_record['id'] ) await db.execute(insert_query) await db.commit() return {"id": role_id, "name": name, "permissions": filtered_permissions} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}") @staticmethod async def update_role(role_id: int, data: dict, db: AsyncSession, token: str): """ Met à jour un rôle par son ID (réservé aux administrateurs). """ await RoleService.admin_required(token, db) if "permissions" in data: # Supprimer les anciennes permissions delete_query = delete(role_permissions_table).where(role_permissions_table.c.role_id == role_id) await db.execute(delete_query) # Ajouter les nouvelles permissions for permission in data["permissions"]: permission_query = select(permissions_table).where(permissions_table.c.name == permission) permission_result = await db.execute(permission_query) permission_record = permission_result.mappings().fetchone() if not permission_record: raise HTTPException(status_code=400, detail=f"Permission '{permission}' not found") insert_query = insert(role_permissions_table).values( role_id=role_id, permission_id=permission_record['id'] ) await db.execute(insert_query) await db.commit() # Mettre à jour les autres champs du rôle role_update_data = {key: value for key, value in data.items() if key != "permissions"} if role_update_data: query = ( update(roles_table) .where(roles_table.c.id == role_id) .values(**role_update_data) ) try: result = await db.execute(query) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Role not found") await db.commit() return {"message": "Role updated successfully"} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}") return {"message": "Role updated successfully"} @staticmethod async def delete_role(role_id: int, db: AsyncSession, token: str): """ Supprime un rôle par son ID (réservé aux administrateurs). """ await RoleService.admin_required(token, db) query = delete(roles_table).where(roles_table.c.id == role_id) try: result = await db.execute(query) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Role not found") await db.commit() return {"message": "Role deleted successfully"} except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Could not delete role: {str(e)}") @staticmethod async def admin_required(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)): """ Vérifie si l'utilisateur actuel est un administrateur. """ credentials_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action.", ) try: # Décodage du token JWT payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = TokenData(email=email) except JWTError: raise credentials_exception # Récupère l'utilisateur depuis la base de données user_query = select(users_table).where(users_table.c.email == token_data.email) result = await db.execute(user_query) user = result.fetchone() if user is None: raise credentials_exception # Vérifie si l'utilisateur a le rôle d'administrateur if user["role"] != "admin": raise credentials_exception return user @staticmethod async def get_all_permissions(db: AsyncSession): """ Récupère toutes les permissions en BDD. """ query = select(permissions_table) result = await db.execute(query) permissions_result = result.mappings().all() return permissions_result @staticmethod async def get_role(role_id: int, db): """ Récupère les détails d'un rôle spécifique, y compris ses permissions associées. """ query = select(roles_table).where(roles_table.c.id == role_id) result = await db.execute(query) role = result.mappings().fetchone() if not role: raise HTTPException(status_code=404, detail="Role not found") # Récupérer les permissions associées au rôle permissions_query = ( select(permissions_table.c.name) .join(role_permissions_table, role_permissions_table.c.permission_id == permissions_table.c.id) .where(role_permissions_table.c.role_id == role_id) ) result = await db.execute(permissions_query) permissions_result = result.mappings().all() permissions = [row["name"] for row in permissions_result] return {"id": role["id"], "name": role["name"], "permissions": permissions}