After_Chido_Api/services/role_service.py

105 lines
4.0 KiB
Python

from sqlalchemy import insert, select, update, delete
from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
from models.schemas import Role, TokenData
from models.db import roles_table, users_table
from config.database import get_db
from config.settings import settings
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
class RoleService:
@staticmethod
async def create_role(role: Role, db: AsyncSession, token: str):
"""
Crée un nouveau rôle (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
query = insert(roles_table).values(
name=role.name,
permissions=",".join(role.permissions), # Stocke les permissions sous forme de chaîne
)
try:
result = await db.execute(query)
await db.commit()
role_id = result.inserted_primary_key[0]
return {"id": role_id, **role.dict()}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}")
@staticmethod
async def update_role(role_id: int, data: dict, db: AsyncSession, token: str):
"""
Met à jour un rôle par son ID (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
if "permissions" in data:
data["permissions"] = ",".join(data["permissions"]) # Convertit les permissions en chaîne
query = (
update(roles_table)
.where(roles_table.c.id == role_id)
.values(**data)
)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Role not found")
await db.commit()
return {"message": "Role updated successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}")
@staticmethod
async def delete_role(role_id: int, db: AsyncSession, token: str):
"""
Supprime un rôle par son ID (réservé aux administrateurs).
"""
await RoleService.admin_required(token, db)
query = delete(roles_table).where(roles_table.c.id == role_id)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Role not found")
await db.commit()
return {"message": "Role deleted successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not delete role: {str(e)}")
@staticmethod
async def admin_required(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)):
"""
Vérifie si l'utilisateur actuel est un administrateur.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to perform this action.",
)
try:
# Décodage du token JWT
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
# Récupère l'utilisateur depuis la base de données
user_query = select(users_table).where(users_table.c.email == token_data.email)
result = await db.execute(user_query)
user = result.fetchone()
if user is None:
raise credentials_exception
# Vérifie si l'utilisateur a le rôle d'administrateur
if user["role"] != "admin":
raise credentials_exception
return user