Compare commits

...

2 Commits

Author SHA1 Message Date
Anaz 8239270278 improvment role and permissions 2025-01-10 22:37:26 +04:00
Anaz f9c7b0d8e5 improvement role and permissions 2025-01-10 22:36:41 +04:00
5 changed files with 232 additions and 65 deletions

View File

@ -1,33 +1,58 @@
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends, HTTPException
from services.role_service import RoleService from sqlalchemy.ext.asyncio import AsyncSession
from models.schemas import Role
from config.database import get_db from config.database import get_db
from services.role_service import RoleService
from models.schemas import Role, PermissionResponse
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from typing import List
router = APIRouter() router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
@router.post("/", status_code=201) @router.post("/", status_code=201)
async def create_role(role: Role, db=Depends(get_db), token: str = Depends(oauth2_scheme)): async def create_role(name: str, permissions: List[str], db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme)):
return await RoleService.create_role(role, db, token) """
Créer un nouveau rôle avec des permissions associées (réservé aux administrateurs).
"""
return await RoleService.create_role(name, permissions, db, token)
@router.get("/{role_id}") @router.get("/{role_id}", response_model=Role)
async def get_role(role_id: int, db=Depends(get_db)): async def get_role(role_id: int, db: AsyncSession = Depends(get_db)):
"""
Récupérer les détails d'un rôle spécifique.
"""
return await RoleService.get_role(role_id, db) return await RoleService.get_role(role_id, db)
@router.get("/") @router.get("/", response_model=List[Role])
async def get_all_roles(db=Depends(get_db)): async def get_all_roles(db: AsyncSession = Depends(get_db)):
"""
Récupérer tous les rôles avec leurs permissions associées.
"""
return await RoleService.get_all_roles(db) return await RoleService.get_all_roles(db)
@router.put("/{role_id}") @router.put("/{role_id}")
async def update_role(role_id: int, data: dict, db=Depends(get_db), token: str = Depends(oauth2_scheme)): async def update_role(role_id: int, role: Role, db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme)):
return await RoleService.update_role(role_id, data, db, token) """
Mettre à jour un rôle existant (réservé aux administrateurs).
"""
return await RoleService.update_role(role_id, role.dict(), db, token)
@router.delete("/{role_id}") @router.delete("/{role_id}")
async def delete_role(role_id: int, db=Depends(get_db), token: str = Depends(oauth2_scheme)): async def delete_role(role_id: int, db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme)):
"""
Supprimer un rôle (réservé aux administrateurs).
"""
return await RoleService.delete_role(role_id, db, token) return await RoleService.delete_role(role_id, db, token)
@router.get("/permissions", response_model=List[PermissionResponse])
async def get_all_permissions():
"""
Récupérer toutes les permissions disponibles.
"""
return RoleService.get_all_permissions()

View File

@ -16,7 +16,24 @@ class Settings(BaseSettings):
email_password: str = "Bp@U3VgzrZ@" email_password: str = "Bp@U3VgzrZ@"
gdpr_deletion_delay_days: int = 7 gdpr_deletion_delay_days: int = 7
testing: bool = False testing: bool = False
resetpass_url : str = "https://resetpass.mayotte-urgence.com" resetpass_url : str = "https://resetpass.mayotte-urgence.com"
available_permissions = [
"create_user",
"update_user",
"delete_user",
"view_users",
"create_role",
"update_role",
"delete_role",
"view_roles",
"assign_permission",
"create_resource",
"update_resource",
"delete_resource",
"view_resources",
"generate_reports",
"access_admin_dashboard",
]
class Config: class Config:
env_file = ".env" env_file = ".env"

View File

@ -4,27 +4,44 @@ from datetime import datetime
metadata = MetaData() metadata = MetaData()
# Table definitions # Table definitions
# Table des utilisateurs
users_table = Table( users_table = Table(
"users", 'users',
metadata, metadata,
Column("id", Integer, primary_key=True), Column('id', Integer, primary_key=True, autoincrement=True),
Column("email", String(255), unique=True, nullable=False), Column('email', String(255), unique=True, nullable=False),
Column("full_name", String(255), nullable=False), Column('full_name', String(255), nullable=False),
Column("phone", String(20), nullable=False), Column('phone', String(20)),
Column("date_of_birth", DateTime, nullable=False), Column('date_of_birth', String(20)),
Column("organization", String(255)), Column('organization', String(255)),
Column("hashed_password", String(255), nullable=False), Column('hashed_password', String(255), nullable=False),
Column("role", String(50), default="user"), Column('role', String(50), nullable=False),
Column("is_blocked", Boolean, default=False), Column('is_active', Boolean, default=True),
Column("is_deleted", Boolean, default=False) Column('is_banned', Boolean, default=False)
) )
# Table des rôles
roles_table = Table( roles_table = Table(
"roles", 'roles',
metadata, metadata,
Column("id", Integer, primary_key=True), Column('id', Integer, primary_key=True, autoincrement=True),
Column("name", String(50), unique=True, nullable=False), Column('name', String(50), unique=True, nullable=False)
Column("permissions", Text) )
# Table des permissions
permissions_table = Table(
'permissions',
metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('name', String(100), unique=True, nullable=False)
)
# Table d'association entre rôles et permissions
role_permissions_table = Table(
'role_permissions',
metadata,
Column('role_id', Integer, ForeignKey('roles.id'), primary_key=True),
Column('permission_id', Integer, ForeignKey('permissions.id'), primary_key=True)
) )
need_requests_table = Table( need_requests_table = Table(

View File

@ -2,6 +2,19 @@ from pydantic import BaseModel, EmailStr
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional
class RoleBase(BaseModel):
name: str
class RoleCreate(RoleBase):
permissions: List[str]
class RoleResponse(RoleBase):
id: int
permissions: List[str]
class Config:
orm_mode = True
class UserBase(BaseModel): class UserBase(BaseModel):
email: EmailStr email: EmailStr
full_name: str full_name: str
@ -19,7 +32,7 @@ class UserResponse(BaseModel):
phone: str phone: str
date_of_birth: str date_of_birth: str
organization: Optional[str] = None organization: Optional[str] = None
role: str role: RoleResponse
is_active: bool is_active: bool
is_banned: bool is_banned: bool
@ -30,10 +43,14 @@ class UserUpdateRole(BaseModel):
class UserBlockBan(BaseModel): class UserBlockBan(BaseModel):
email: EmailStr email: EmailStr
class Role(BaseModel): class PermissionBase(BaseModel):
id: int
name: str name: str
permissions: List[str]
class PermissionResponse(PermissionBase):
id: int
class Config:
orm_mode = True
# Demande de besoin (NeedRequest) # Demande de besoin (NeedRequest)
class NeedRequestBase(BaseModel): class NeedRequestBase(BaseModel):
@ -44,12 +61,10 @@ class NeedRequestBase(BaseModel):
vulnerable: int vulnerable: int
location: str location: str
gps_coordinates: Optional[str] = None gps_coordinates: Optional[str] = None
class NeedRequestCreate(NeedRequestBase): class NeedRequestCreate(NeedRequestBase):
requester_email: EmailStr requester_email: EmailStr
class NeedRequestUpdate(BaseModel): class NeedRequestUpdate(BaseModel):
category: Optional[str] = None category: Optional[str] = None
description: Optional[str] = None description: Optional[str] = None
@ -60,7 +75,6 @@ class NeedRequestUpdate(BaseModel):
gps_coordinates: Optional[str] = None gps_coordinates: Optional[str] = None
status: Optional[str] = None status: Optional[str] = None
class NeedRequestResponse(NeedRequestBase): class NeedRequestResponse(NeedRequestBase):
id: int id: int
requester_email: EmailStr requester_email: EmailStr
@ -72,7 +86,7 @@ class UserReport(BaseModel):
reported_user_id: int reported_user_id: int
reason: str reason: str
status: Optional[str] = "pending" status: Optional[str] = "pending"
class UserReportUpdate(BaseModel): class UserReportUpdate(BaseModel):
reason: Optional[str] = None reason: Optional[str] = None
status: Optional[str] = None # Exemples : "pending", "resolved" status: Optional[str] = None # Exemples : "pending", "resolved"
@ -81,11 +95,11 @@ class TechnicalIssue(BaseModel):
user_id: int user_id: int
description: str description: str
status: str status: str
class UpdateTechnicalIssue(BaseModel): class UpdateTechnicalIssue(BaseModel):
status: Optional[str] = None status: Optional[str] = None
description: Optional[str] = None description: Optional[str] = None
class PersonReportBase(BaseModel): class PersonReportBase(BaseModel):
full_name: str full_name: str
date_of_birth: datetime date_of_birth: datetime
@ -118,16 +132,16 @@ class PointOfInterest(BaseModel):
organization: Optional[str] = None organization: Optional[str] = None
gps_coordinates: str gps_coordinates: str
added_by: int # ID de l'utilisateur qui a ajouté ce point added_by: int # ID de l'utilisateur qui a ajouté ce point
class Shelter(BaseModel): class Shelter(BaseModel):
label: str label: str
description: Optional[str] = None description: Optional[str] = None
status: str # "available", "full", "closed" status: str # "available", "full", "closed"
contact_person: Optional[str] = None contact_person: Optional[str] = None
gps_coordinates: str gps_coordinates: str
added_by: int # ID de l'utilisateur qui a ajouté cet abri added_by: int # ID de l'utilisateur qui a ajouté cet abri
class PersonReportResponse(PersonReportBase): class PersonReportResponse(PersonReportBase):
id: int id: int
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime

View File

@ -1,8 +1,8 @@
from sqlalchemy import insert, select, update, delete from sqlalchemy import insert, select, update, delete
from fastapi import HTTPException, Depends, status from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from models.schemas import Role, TokenData from models.schemas import Role, TokenData, Permission
from models.db import roles_table, users_table from models.db import roles_table, permissions_table, role_permissions_table, users_table
from config.database import get_db from config.database import get_db
from config.settings import settings from config.settings import settings
from jose import jwt, JWTError from jose import jwt, JWTError
@ -13,20 +13,61 @@ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
class RoleService: class RoleService:
@staticmethod @staticmethod
async def create_role(role: Role, db: AsyncSession, token: str): async def get_all_roles(db: AsyncSession):
query = select(roles_table)
result = await db.execute(query)
roles = result.mappings().all()
# Inclure les permissions pour chaque rôle
roles_with_permissions = []
for role in roles:
permissions_query = (
select(permissions_table.c.name)
.join(role_permissions_table, permissions_table.c.id == role_permissions_table.c.permission_id)
.where(role_permissions_table.c.role_id == role['id'])
)
permissions_result = await db.execute(permissions_query)
permissions = [p["name"] for p in permissions_result]
role_data = dict(role)
role_data["permissions"] = permissions
roles_with_permissions.append(role_data)
return roles_with_permissions
@staticmethod
async def create_role(name: str, permissions: list, db, token: str):
""" """
Crée un nouveau rôle (réservé aux administrateurs). Crée un nouveau rôle avec les permissions spécifiées (réservé aux administrateurs).
""" """
await RoleService.admin_required(token, db) await RoleService.admin_required(token, db)
query = insert(roles_table).values(
name=role.name, # Vérifier que les permissions sont valides
permissions=",".join(role.permissions), # Stocke les permissions sous forme de chaîne valid_permissions = settings.available_permissions
) filtered_permissions = [p for p in permissions if p in valid_permissions]
if not filtered_permissions:
raise HTTPException(status_code=400, detail="No valid permissions provided.")
# Insérer le rôle
query = insert(roles_table).values(name=name)
try: try:
result = await db.execute(query) result = await db.execute(query)
await db.commit() await db.commit()
role_id = result.inserted_primary_key[0] role_id = result.inserted_primary_key[0]
return {"id": role_id, **role.dict()}
# Associer les permissions valides au rôle
for permission in filtered_permissions:
permission_query = select(permissions_table).where(permissions_table.c.name == permission)
permission_result = await db.execute(permission_query)
permission_record = permission_result.fetchone()
if permission_record:
insert_query = insert(role_permissions_table).values(
role_id=role_id, permission_id=permission_record['id']
)
await db.execute(insert_query)
await db.commit()
return {"id": role_id, "name": name, "permissions": filtered_permissions}
except Exception as e: except Exception as e:
await db.rollback() await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}") raise HTTPException(status_code=500, detail=f"Could not create role: {str(e)}")
@ -38,21 +79,43 @@ class RoleService:
""" """
await RoleService.admin_required(token, db) await RoleService.admin_required(token, db)
if "permissions" in data: if "permissions" in data:
data["permissions"] = ",".join(data["permissions"]) # Convertit les permissions en chaîne # Supprimer les anciennes permissions
query = ( delete_query = delete(role_permissions_table).where(role_permissions_table.c.role_id == role_id)
update(roles_table) await db.execute(delete_query)
.where(roles_table.c.id == role_id)
.values(**data) # Ajouter les nouvelles permissions
) for permission in data["permissions"]:
try: permission_query = select(permissions_table).where(permissions_table.c.name == permission)
result = await db.execute(query) permission_result = await db.execute(permission_query)
if result.rowcount == 0: permission_record = permission_result.fetchone()
raise HTTPException(status_code=404, detail="Role not found") if not permission_record:
raise HTTPException(status_code=400, detail=f"Permission '{permission}' not found")
insert_query = insert(role_permissions_table).values(
role_id=role_id, permission_id=permission_record['id']
)
await db.execute(insert_query)
await db.commit() await db.commit()
return {"message": "Role updated successfully"}
except Exception as e: # Mettre à jour les autres champs du rôle
await db.rollback() role_update_data = {key: value for key, value in data.items() if key != "permissions"}
raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}") if role_update_data:
query = (
update(roles_table)
.where(roles_table.c.id == role_id)
.values(**role_update_data)
)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Role not found")
await db.commit()
return {"message": "Role updated successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not update role: {str(e)}")
return {"message": "Role updated successfully"}
@staticmethod @staticmethod
async def delete_role(role_id: int, db: AsyncSession, token: str): async def delete_role(role_id: int, db: AsyncSession, token: str):
@ -102,3 +165,34 @@ class RoleService:
raise credentials_exception raise credentials_exception
return user return user
@staticmethod
async def get_all_permissions():
"""
Récupère toutes les permissions définies dans les paramètres (settings).
"""
return settings.available_permissions
@staticmethod
async def get_role(role_id: int, db):
"""
Récupère les détails d'un rôle spécifique, y compris ses permissions associées.
"""
query = select(roles_table).where(roles_table.c.id == role_id)
result = await db.execute(query)
role = result.fetchone()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
# Récupérer les permissions associées au rôle
permissions_query = (
select(permissions_table.c.name)
.join(role_permissions_table, role_permissions_table.c.permission_id == permissions_table.c.id)
.where(role_permissions_table.c.role_id == role_id)
)
permissions_result = await db.execute(permissions_query)
permissions = [row["name"] for row in permissions_result]
return {"id": role["id"], "name": role["name"], "permissions": permissions}