149 lines
6.1 KiB
Python
149 lines
6.1 KiB
Python
from sqlalchemy import insert, select, update, delete
|
|
from fastapi import HTTPException, Depends, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from models.schemas import TokenData
|
|
from models.db import roles_table, permissions_table, role_permissions_table, users_table
|
|
from config.database import get_db
|
|
from config.settings import settings
|
|
from jose import jwt, JWTError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from services.auth_service import AuthService
|
|
|
|
class RoleService:
|
|
@staticmethod
|
|
async def get_all_roles(db: AsyncSession):
|
|
query = select(roles_table)
|
|
result = await db.execute(query)
|
|
roles = result.mappings().all()
|
|
|
|
# Inclure les permissions pour chaque rôle
|
|
roles_with_permissions = []
|
|
for role in roles:
|
|
permissions_query = (
|
|
select(permissions_table.c.name)
|
|
.join(role_permissions_table, permissions_table.c.id == role_permissions_table.c.permission_id)
|
|
.where(role_permissions_table.c.role_id == role['id'])
|
|
)
|
|
result = await db.execute(permissions_query)
|
|
permissions_result = result.mappings().all()
|
|
permissions = [p["name"] for p in permissions_result]
|
|
role_data = dict(role)
|
|
role_data["permissions"] = permissions
|
|
roles_with_permissions.append(role_data)
|
|
|
|
return roles_with_permissions
|
|
|
|
@staticmethod
|
|
async def create_role(name: str, db: AsyncSession, token: str):
|
|
"""
|
|
Crée un nouveau rôle avec les permissions spécifiées (réservé aux administrateurs).
|
|
"""
|
|
await AuthService.admin_required(token, db)
|
|
# Insérer le rôle
|
|
query = insert(roles_table).values(name=name)
|
|
try:
|
|
result = await db.execute(query)
|
|
await db.commit()
|
|
role_id = result.inserted_primary_key[0]
|
|
return {"id": role_id, "name": name}
|
|
except Exception as e:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}")
|
|
|
|
@staticmethod
|
|
async def update_role(role_id: int, data: dict, db: AsyncSession, token: str):
|
|
"""
|
|
Met à jour un rôle par son ID (réservé aux administrateurs).
|
|
"""
|
|
await AuthService.admin_required(token, db)
|
|
if "permissions" in data:
|
|
# Supprimer les anciennes permissions
|
|
delete_query = delete(role_permissions_table).where(role_permissions_table.c.role_id == role_id)
|
|
await db.execute(delete_query)
|
|
|
|
# Ajouter les nouvelles permissions
|
|
for permission in data["permissions"]:
|
|
permission_query = select(permissions_table).where(permissions_table.c.name == permission)
|
|
permission_result = await db.execute(permission_query)
|
|
permission_record = permission_result.mappings().fetchone()
|
|
if not permission_record:
|
|
raise HTTPException(status_code=400, detail=f"Permission '{permission}' not found")
|
|
|
|
insert_query = insert(role_permissions_table).values(
|
|
role_id=role_id, permission_id=permission_record['id']
|
|
)
|
|
await db.execute(insert_query)
|
|
await db.commit()
|
|
|
|
# Mettre à jour les autres champs du rôle
|
|
role_update_data = {key: value for key, value in data.items() if key != "permissions"}
|
|
if role_update_data:
|
|
query = (
|
|
update(roles_table)
|
|
.where(roles_table.c.id == role_id)
|
|
.values(**role_update_data)
|
|
)
|
|
try:
|
|
result = await db.execute(query)
|
|
if result.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="Role not found")
|
|
await db.commit()
|
|
return {"message": "Role updated successfully"}
|
|
except Exception as e:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}")
|
|
|
|
return {"message": "Role updated successfully"}
|
|
|
|
@staticmethod
|
|
async def delete_role(role_id: int, db: AsyncSession, token: str):
|
|
"""
|
|
Supprime un rôle par son ID (réservé aux administrateurs).
|
|
"""
|
|
await AuthService.admin_required(token, db)
|
|
query = delete(roles_table).where(roles_table.c.id == role_id)
|
|
try:
|
|
result = await db.execute(query)
|
|
if result.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="Role not found")
|
|
await db.commit()
|
|
return {"message": "Role deleted successfully"}
|
|
except Exception as e:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=500, detail=f"Could not delete role: {str(e)}")
|
|
|
|
@staticmethod
|
|
async def get_all_permissions(db: AsyncSession):
|
|
"""
|
|
Récupère toutes les permissions en BDD.
|
|
"""
|
|
query = select(permissions_table)
|
|
result = await db.execute(query)
|
|
permissions_result = result.mappings().all()
|
|
return permissions_result
|
|
|
|
@staticmethod
|
|
async def get_role(role_id: int, db: AsyncSession):
|
|
"""
|
|
Récupère les détails d'un rôle spécifique, y compris ses permissions associées.
|
|
"""
|
|
query = select(roles_table).where(roles_table.c.id == role_id)
|
|
result = await db.execute(query)
|
|
role = result.mappings().fetchone()
|
|
|
|
if not role:
|
|
raise HTTPException(status_code=404, detail="Role not found")
|
|
|
|
# Récupérer les permissions associées au rôle
|
|
permissions_query = (
|
|
select(permissions_table.c.name)
|
|
.join(role_permissions_table, role_permissions_table.c.permission_id == permissions_table.c.id)
|
|
.where(role_permissions_table.c.role_id == role_id)
|
|
)
|
|
result = await db.execute(permissions_query)
|
|
permissions_result = result.mappings().all()
|
|
permissions = [row["name"] for row in permissions_result]
|
|
|
|
return {"id": role["id"], "name": role["name"], "permissions": permissions}
|
|
|