After_Chido_Api/services/points_of_interest_service.py

84 lines
3.4 KiB
Python

from sqlalchemy import insert, select, update, delete
from fastapi import HTTPException
from models.db import points_of_interest_table
from models.schemas import CreatePointOfInterest, PointOfInterest, UpdatePointOfInterest
from sqlalchemy.ext.asyncio import AsyncSession
from services.auth_service import AuthService
class PointsOfInterestService:
@staticmethod
async def create_point_of_interest(point: CreatePointOfInterest, db, token):
user = await AuthService.get_current_user(token, db)
query = insert(points_of_interest_table).values(
label=point.label,
description=point.description,
icon=point.icon,
organization=point.organization,
gps_coordinates=point.gps_coordinates,
added_by=user["id"],
)
try:
result = await db.execute(query)
await db.commit()
point_id = result.inserted_primary_key[0]
return {"id": point_id, **point.dict()}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not create point of interest: {str(e)}")
@staticmethod
async def get_point_of_interest(point_id: int, db):
query = select(points_of_interest_table).where(points_of_interest_table.c.id == point_id)
result = await db.execute(query)
point = result.mappings().fetchone()
if not point:
raise HTTPException(status_code=404, detail="Point of interest not found")
return dict(point)
@staticmethod
async def get_all_points_of_interest(db):
query = select(points_of_interest_table)
result = await db.execute(query)
points = result.mappings().all()
return [dict(point) for point in points]
@staticmethod
async def update_point_of_interest(point_id: int, point: UpdatePointOfInterest, db, token):
user = await AuthService.get_current_user(token, db)
update_values = point.model_dump(exclude_unset=True)
if not update_values:
raise HTTPException(status_code=400, detail="No fields provided for update")
query = (
update(points_of_interest_table)
.where(points_of_interest_table.c.id == point_id)
.values(**update_values, added_by=user["id"])
)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Point of interest not found")
await db.commit()
return {"message": "Point of interest updated successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not update point of interest: {str(e)}")
@staticmethod
async def delete_point_of_interest(point_id: int, db, token):
await AuthService.admin_required(token, db)
query = delete(points_of_interest_table).where(points_of_interest_table.c.id == point_id)
try:
result = await db.execute(query)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Point of interest not found")
await db.commit()
return {"message": "Point of interest deleted successfully"}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Could not delete point of interest: {str(e)}")