After_Chido_Api/services/auth_service.py

305 lines
12 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from config.settings import settings
from models.schemas import TokenData, UserCreate, UserResponse
from config.database import get_db
from models.db import users_table, roles_table, role_permissions_table, permissions_table
from sqlalchemy import select, update, insert
from sqlalchemy.ext.asyncio import AsyncSession
from utils.logging import logger
logger.info("Test log message")
# Configuration pour le hachage des mots de passe
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Configuration pour OAuth2
class AuthService:
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
passEncr = pwd_context.hash(plain_password)
passEncrTest123Azerty = pwd_context.hash("123Azerty+@")
logger.info("password 123Azerty+@ encrypted : " +passEncrTest123Azerty)
logger.info("password input : " +plain_password)
logger.info("password input and encrypted : " +passEncr)
logger.info("password hashed (in BDD) : "+ hashed_password)
logger.info("verification plain_password & hashed_password (in BDD) : "+ str(pwd_context.verify(plain_password, hashed_password)))
logger.info("verification plain_password & 123Azerty+@ encrypted : "+ str(pwd_context.verify(plain_password, passEncrTest123Azerty)))
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
@staticmethod
async def authenticate_user(email: str, password: str, db):
query = select(users_table).where(users_table.c.email == email)
result = await db.execute(query)
user = result.mappings().fetchone() # Récupère un dictionnaire plutôt qu'un tuple
logger.info("user authenticated", extra={"email": email})
if not user:
return None
if not AuthService.verify_password(password, user["hashed_password"]):
return None
return user
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
@staticmethod
def verify_token(token: str) -> dict:
try:
# Décoder le token
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
# Vérifier si le token est expiré
expiration_time = payload.get("exp")
if expiration_time and datetime.now(timezone.utc).timestamp() > expiration_time:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
@staticmethod
async def get_current_user(token: str, db):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
query = select(users_table).where(users_table.c.email == email)
result = await db.execute(query)
user = result.mappings().fetchone()
if user is None:
raise credentials_exception
# Récupérer le rôle et ses permissions
role_query = select(roles_table.c.id, roles_table.c.name, permissions_table.c.name.label("permission")).join(
role_permissions_table, role_permissions_table.c.role_id == roles_table.c.id
).join(
permissions_table, role_permissions_table.c.permission_id == permissions_table.c.id
).where(roles_table.c.name == user["role"])
role_result = await db.execute(role_query)
role_data = role_result.mappings().all()
if not role_data:
raise credentials_exception
role = {
"id": role_data[0]["id"],
"name": role_data[0]["name"],
"permissions": [r["permission"] for r in role_data]
}
# Préparez la réponse avec tous les champs requis
return {
"id": user["id"],
"email": user["email"],
"full_name": user["full_name"],
"phone": user["phone"],
"date_of_birth": user["date_of_birth"].isoformat(),
"role": role,
"is_banned": user["is_banned"],
"is_deleted": user["is_deleted"],
}
# @staticmethod
# async def create_user(user: UserCreate, db):
# result = await db.execute(select(users_table))
# users = result.fetchall()
# role = "admin" if len(users) == 0 else user.role
# hashed_password = AuthService.get_password_hash(user.password)
# query = insert(users_table).values(
# email=user.email,
# full_name=user.full_name,
# phone=user.phone,
# date_of_birth=user.date_of_birth,
# organization=user.organization,
# hashed_password=hashed_password,
# role=role
# )
# try:
# result = await db.execute(query)
# await db.commit()
# user_id = result.inserted_primary_key[0]
# logger.info("user created", extra={"user_id": user_id, "email": user.email, "role": role})
# return {"id": user_id, "email": user.email, "role": role}
# except Exception as e:
# await db.rollback()
# logger.error("could not create user", extra={"error": str(e)})
# raise HTTPException(status_code=500, detail=f"Could not create user: {str(e)}")
@staticmethod
async def get_role_by_name(role_name: str, db: AsyncSession = Depends(get_db)):
query = select(roles_table).where(roles_table.c.name == role_name)
result = await db.execute(query)
role = result.mappings().first()
return role
@staticmethod
async def admin_required(token: str, db):
credentials_exception = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to perform this action.",
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
logger.error("could not decode token", extra={"token": token})
raise credentials_exception
query = users_table.select().where(users_table.c.email == token_data.email)
result = await db.execute(query)
user = result.mappings().first()
if user is None:
raise credentials_exception
if user["role"] != "admin":
logger.error("admin required", extra={"email": token_data.email})
raise credentials_exception
return user
@staticmethod
async def update_user(user_id: int, updates: dict, token: str, db):
"""
Met à jour les informations d'un utilisateur.
- Un utilisateur peut mettre à jour ses propres informations.
- Un administrateur peut mettre à jour les informations de n'importe quel utilisateur.
"""
# Décodage et vérification des permissions
credentials_exception = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to perform this action.",
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
query = select(users_table).where(users_table.c.email == token_data.email)
result = await db.execute(query)
current_user = result.mappings().first()
if current_user is None:
raise credentials_exception
# Vérifier si l'utilisateur est l'admin ou lui-même
if current_user["id"] != user_id and current_user["role"] != "admin":
raise credentials_exception
# Mise à jour des champs autorisés
allowed_updates = {"full_name", "phone", "date_of_birth", "organization", "email"}
updates = {key: value for key, value in updates.items() if key in allowed_updates}
if "email" in updates: # Vérifie si l'email existe déjà
existing_email_query = select(users_table).where(users_table.c.email == updates["email"])
result = await db.execute(existing_email_query)
existing_user = result.fetchone()
if existing_user and existing_user["id"] != user_id:
raise HTTPException(status_code=400, detail="Email already registered by another user.")
query = (
update(users_table)
.where(users_table.c.id == user_id)
.values(**updates)
.execution_options(synchronize_session="fetch")
)
try:
await db.execute(query)
await db.commit()
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not update user: {str(e)}")
# Récupérer les informations mises à jour
updated_user_query = select(users_table).where(users_table.c.id == user_id)
result = await db.execute(updated_user_query)
updated_user = result.mappings().first()
return UserResponse(**updated_user)
@staticmethod
async def get_user_by_email(email: str, db):
query = select(users_table).where(users_table.c.email == email)
result = await db.execute(query)
return result.mappings().first()
@staticmethod
async def check_permissions(token: str, db, required_permissions: list):
credentials_exception = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to perform this action.",
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
logger.error("could not decode token", extra={"token": token})
raise credentials_exception
query = users_table.select().where(users_table.c.email == token_data.email)
result = await db.execute(query)
user = result.mappings().first()
if user is None:
raise credentials_exception
if user["role"] not in required_permissions:
logger.error("permission denied", extra={"email": token_data.email, "required_permissions": required_permissions})
raise credentials_exception
logger.info("permission granted", extra={"email": token_data.email, "role": user["role"]})