After_Chido_Api/services/person_report_service.py

85 lines
3.9 KiB
Python

from sqlalchemy import delete, select, update
from fastapi import HTTPException, UploadFile
from models.schemas import PersonReportCreate, PersonReportUpdate, PersonReportResponse
from config.database import get_db
from models.db import person_reports_table
from typing import Optional
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
from services.auth_service import AuthService
from services.upload_service import UploadService
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
class PersonReportService:
@staticmethod
async def create_report(report: PersonReportCreate, db, token, file: Optional[UploadFile] = None ):
user = await AuthService.get_current_user(token, db)if file else None
image_url = await UploadService.upload_image_to_s3( file, user["id"])
#image_url = file.filename if file else None
query = person_reports_table.insert().values(full_name=report.full_name,
date_of_birth=report.date_of_birth,
status=report.status,
location=report.location,
gps_coordinates=report.gps_coordinates,
photo_url=image_url["path_with_name"] if image_url else None,
reporter_email=user["email"])
try:
result = await db.execute(query)
await db.commit()
report_id = result.inserted_primary_key[0]
return await PersonReportService.get_report(report_id, db)
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not create report: {str(e)}")
@staticmethod
async def update_report(report_id: int, report: PersonReportUpdate, db):
query = (
person_reports_table.update()
.where(person_reports_table.c.id == report_id)
.values(**report.model_dump(exclude_unset=True))
)
try:
await db.execute(query)
await db.commit()
return await PersonReportService.get_report(report_id, db)
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not update report: {str(e)}")
@staticmethod
async def get_report(report_id: int, db):
query = select(person_reports_table).where(person_reports_table.c.id == report_id)
result = await db.execute(query)
report = result.mappings().first()
if not report:
raise HTTPException(status_code=404, detail="Report not found")
return PersonReportResponse(**report)
@staticmethod
async def list_reports(status: Optional[str] = None, db=Depends(get_db)):
query = select(person_reports_table)
if status:
query = query.where(person_reports_table.c.status == status)
result = await db.execute(query)
reports = result.mappings().all()
return [PersonReportResponse(**report) for report in reports]
@staticmethod
async def delete_report(report_id: int, db, token):
# Vérifier les droits de l'utilisateur
await AuthService.check_permissions( token , db, ["admin","collectivité","état"])
query = delete(person_reports_table).where(person_reports_table.c.id == report_id)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Report not found")
await db.commit()
return {"detail": "Report deleted successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not delete report: {str(e)}")