After_Chido_Api/services/user_service.py

118 lines
4.3 KiB
Python

from sqlalchemy import select, update, insert
from fastapi import Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from models.schemas import UserCreate, UserResponse, UserUpdateRole, UserBlockBan
from config.database import get_db
from models.db import users_table
from services.auth_service import AuthService
from utils.security import get_password_hash
from typing import Optional
class UserService:
@staticmethod
async def list_users(token: str, status: Optional[str] = None, db=Depends(get_db)):
await AuthService.check_permissions(token, ["admin"], db)
query = select(users_table)
if status:
query = query.where(users_table.c.status == status)
result = await db.execute(query)
users = result.mappings().all()
return [UserResponse(**user) for user in users]
@staticmethod
async def create_user(user: UserCreate, db):
hashed_password = get_password_hash(user.password)
query = users_table.insert().values(
email=user.email,
full_name=user.full_name,
phone=user.phone,
date_of_birth=user.date_of_birth,
organization=user.organization,
hashed_password=hashed_password,
role=user.role, # Par défaut, rôle "user"
is_active=True,
is_banned=False
)
try:
await db.execute(query)
await db.commit()
return {"message": "User created successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Error creating user: {str(e)}")
@staticmethod
async def change_user_role(user_update: UserUpdateRole, db, token: str):
await AuthService.check_permissions(token, ["admin"], db)
query = (
update(users_table)
.where(users_table.c.email == user_update.email)
.values(role=user_update.new_role)
)
result = await db.execute(query)
await db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"message": f"Role updated to {user_update.new_role}"}
@staticmethod
async def block_user(user_action: UserBlockBan, db, token: str):
await AuthService.admin_required(token, db)
query = (
update(users_table)
.where(users_table.c.email == user_action.email)
.values(is_active=False)
)
result = await db.execute(query)
await db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"message": f"User {user_action.email} blocked"}
@staticmethod
async def ban_user(user_action: UserBlockBan, db, token: str):
await AuthService.admin_required(token, db)
query = (
update(users_table)
.where(users_table.c.email == user_action.email)
.values(is_banned=True)
)
result = await db.execute(query)
await db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"message": f"User {user_action.email} banned"}
@staticmethod
async def unblock_user(user_action: UserBlockBan, db, token: str):
await AuthService.admin_required(token, db)
query = (
update(users_table)
.where(users_table.c.email == user_action.email)
.values(is_active=True)
)
result = await db.execute(query)
await db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"message": f"User {user_action.email} unblocked"}
@staticmethod
async def unban_user(user_action: UserBlockBan, db, token: str):
await AuthService.admin_required(token, db)
query = (
update(users_table)
.where(users_table.c.email == user_action.email)
.values(is_banned=False)
)
result = await db.execute(query)
await db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"message": f"User {user_action.email} unbanned"}