After_Chido_Api/services/role_service.py

199 lines
8.1 KiB
Python

from sqlalchemy import insert, select, update, delete
from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
from models.schemas import TokenData
from models.db import roles_table, permissions_table, role_permissions_table, users_table
from config.database import get_db
from config.settings import settings
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
class RoleService:
@staticmethod
async def get_all_roles(db: AsyncSession):
query = select(roles_table)
result = await db.execute(query)
roles = result.mappings().all()
# Inclure les permissions pour chaque rôle
roles_with_permissions = []
for role in roles:
permissions_query = (
select(permissions_table.c.name)
.join(role_permissions_table, permissions_table.c.id == role_permissions_table.c.permission_id)
.where(role_permissions_table.c.role_id == role['id'])
)
permissions_result = await db.execute(permissions_query)
permissions = [p["name"] for p in permissions_result]
role_data = dict(role)
role_data["permissions"] = permissions
roles_with_permissions.append(role_data)
return roles_with_permissions
@staticmethod
async def create_role(name: str, permissions: list, db, token: str):
"""
Crée un nouveau rôle avec les permissions spécifiées (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
# Vérifier que les permissions sont valides
valid_permissions = settings.available_permissions
filtered_permissions = [p for p in permissions if p in valid_permissions]
if not filtered_permissions:
raise HTTPException(status_code=400, detail="No valid permissions provided.")
# Insérer le rôle
query = insert(roles_table).values(name=name)
try:
result = await db.execute(query)
await db.commit()
role_id = result.inserted_primary_key[0]
# Associer les permissions valides au rôle
for permission in filtered_permissions:
permission_query = select(permissions_table).where(permissions_table.c.name == permission)
permission_result = await db.execute(permission_query)
permission_record = permission_result.fetchone()
if permission_record:
insert_query = insert(role_permissions_table).values(
role_id=role_id, permission_id=permission_record['id']
)
await db.execute(insert_query)
await db.commit()
return {"id": role_id, "name": name, "permissions": filtered_permissions}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}")
@staticmethod
async def update_role(role_id: int, data: dict, db: AsyncSession, token: str):
"""
Met à jour un rôle par son ID (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
if "permissions" in data:
# Supprimer les anciennes permissions
delete_query = delete(role_permissions_table).where(role_permissions_table.c.role_id == role_id)
await db.execute(delete_query)
# Ajouter les nouvelles permissions
for permission in data["permissions"]:
permission_query = select(permissions_table).where(permissions_table.c.name == permission)
permission_result = await db.execute(permission_query)
permission_record = permission_result.fetchone()
if not permission_record:
raise HTTPException(status_code=400, detail=f"Permission '{permission}' not found")
insert_query = insert(role_permissions_table).values(
role_id=role_id, permission_id=permission_record['id']
)
await db.execute(insert_query)
await db.commit()
# Mettre à jour les autres champs du rôle
role_update_data = {key: value for key, value in data.items() if key != "permissions"}
if role_update_data:
query = (
update(roles_table)
.where(roles_table.c.id == role_id)
.values(**role_update_data)
)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Role not found")
await db.commit()
return {"message": "Role updated successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}")
return {"message": "Role updated successfully"}
@staticmethod
async def delete_role(role_id: int, db: AsyncSession, token: str):
"""
Supprime un rôle par son ID (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
query = delete(roles_table).where(roles_table.c.id == role_id)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Role not found")
await db.commit()
return {"message": "Role deleted successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not delete role: {str(e)}")
@staticmethod
async def admin_required(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)):
"""
Vérifie si l'utilisateur actuel est un administrateur.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to perform this action.",
)
try:
# Décodage du token JWT
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
# Récupère l'utilisateur depuis la base de données
user_query = select(users_table).where(users_table.c.email == token_data.email)
result = await db.execute(user_query)
user = result.fetchone()
if user is None:
raise credentials_exception
# Vérifie si l'utilisateur a le rôle d'administrateur
if user["role"] != "admin":
raise credentials_exception
return user
@staticmethod
async def get_all_permissions():
"""
Récupère toutes les permissions définies dans les paramètres (settings).
"""
return settings.available_permissions
@staticmethod
async def get_role(role_id: int, db):
"""
Récupère les détails d'un rôle spécifique, y compris ses permissions associées.
"""
query = select(roles_table).where(roles_table.c.id == role_id)
result = await db.execute(query)
role = result.fetchone()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
# Récupérer les permissions associées au rôle
permissions_query = (
select(permissions_table.c.name)
.join(role_permissions_table, role_permissions_table.c.permission_id == permissions_table.c.id)
.where(role_permissions_table.c.role_id == role_id)
)
permissions_result = await db.execute(permissions_query)
permissions = [row["name"] for row in permissions_result]
return {"id": role["id"], "name": role["name"], "permissions": permissions}