After_Chido_Api/services/role_service.py

199 lines
8.1 KiB
Python

from sqlalchemy import insert, select, update, delete
from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
from models.schemas import Role, TokenData, Permission
from models.db import roles_table, permissions_table, role_permissions_table, users_table
from config.database import get_db
from config.settings import settings
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
class RoleService:
@staticmethod
async def get_all_roles(db: AsyncSession):
query = select(roles_table)
result = await db.execute(query)
roles = result.mappings().all()
# Inclure les permissions pour chaque rôle
roles_with_permissions = []
for role in roles:
permissions_query = (
select(permissions_table.c.name)
.join(role_permissions_table, permissions_table.c.id == role_permissions_table.c.permission_id)
.where(role_permissions_table.c.role_id == role['id'])
)
permissions_result = await db.execute(permissions_query)
permissions = [p["name"] for p in permissions_result]
role_data = dict(role)
role_data["permissions"] = permissions
roles_with_permissions.append(role_data)
return roles_with_permissions
@staticmethod
async def create_role(name: str, permissions: list, db, token: str):
"""
Crée un nouveau rôle avec les permissions spécifiées (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
# Vérifier que les permissions sont valides
valid_permissions = settings.available_permissions
filtered_permissions = [p for p in permissions if p in valid_permissions]
if not filtered_permissions:
raise HTTPException(status_code=400, detail="No valid permissions provided.")
# Insérer le rôle
query = insert(roles_table).values(name=name)
try:
result = await db.execute(query)
await db.commit()
role_id = result.inserted_primary_key[0]
# Associer les permissions valides au rôle
for permission in filtered_permissions:
permission_query = select(permissions_table).where(permissions_table.c.name == permission)
permission_result = await db.execute(permission_query)
permission_record = permission_result.fetchone()
if permission_record:
insert_query = insert(role_permissions_table).values(
role_id=role_id, permission_id=permission_record['id']
)
await db.execute(insert_query)
await db.commit()
return {"id": role_id, "name": name, "permissions": filtered_permissions}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}")
@staticmethod
async def update_role(role_id: int, data: dict, db: AsyncSession, token: str):
"""
Met à jour un rôle par son ID (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
if "permissions" in data:
# Supprimer les anciennes permissions
delete_query = delete(role_permissions_table).where(role_permissions_table.c.role_id == role_id)
await db.execute(delete_query)
# Ajouter les nouvelles permissions
for permission in data["permissions"]:
permission_query = select(permissions_table).where(permissions_table.c.name == permission)
permission_result = await db.execute(permission_query)
permission_record = permission_result.fetchone()
if not permission_record:
raise HTTPException(status_code=400, detail=f"Permission '{permission}' not found")
insert_query = insert(role_permissions_table).values(
role_id=role_id, permission_id=permission_record['id']
)
await db.execute(insert_query)
await db.commit()
# Mettre à jour les autres champs du rôle
role_update_data = {key: value for key, value in data.items() if key != "permissions"}
if role_update_data:
query = (
update(roles_table)
.where(roles_table.c.id == role_id)
.values(**role_update_data)
)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Role not found")
await db.commit()
return {"message": "Role updated successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}")
return {"message": "Role updated successfully"}
@staticmethod
async def delete_role(role_id: int, db: AsyncSession, token: str):
"""
Supprime un rôle par son ID (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
query = delete(roles_table).where(roles_table.c.id == role_id)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Role not found")
await db.commit()
return {"message": "Role deleted successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not delete role: {str(e)}")
@staticmethod
async def admin_required(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)):
"""
Vérifie si l'utilisateur actuel est un administrateur.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to perform this action.",
)
try:
# Décodage du token JWT
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
# Récupère l'utilisateur depuis la base de données
user_query = select(users_table).where(users_table.c.email == token_data.email)
result = await db.execute(user_query)
user = result.fetchone()
if user is None:
raise credentials_exception
# Vérifie si l'utilisateur a le rôle d'administrateur
if user["role"] != "admin":
raise credentials_exception
return user
@staticmethod
async def get_all_permissions():
"""
Récupère toutes les permissions définies dans les paramètres (settings).
"""
return settings.available_permissions
@staticmethod
async def get_role(role_id: int, db):
"""
Récupère les détails d'un rôle spécifique, y compris ses permissions associées.
"""
query = select(roles_table).where(roles_table.c.id == role_id)
result = await db.execute(query)
role = result.fetchone()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
# Récupérer les permissions associées au rôle
permissions_query = (
select(permissions_table.c.name)
.join(role_permissions_table, role_permissions_table.c.permission_id == permissions_table.c.id)
.where(role_permissions_table.c.role_id == role_id)
)
permissions_result = await db.execute(permissions_query)
permissions = [row["name"] for row in permissions_result]
return {"id": role["id"], "name": role["name"], "permissions": permissions}