After_Chido_Api/services/upload_service.py

79 lines
2.8 KiB
Python

# app/services/upload_service.py
from typing import Self
import uuid
import boto3
from botocore.exceptions import NoCredentialsError, BotoCoreError, ClientError
from fastapi import HTTPException, UploadFile
class UploadService:
async def upload_image_to_s3(file: UploadFile, account_id: str) -> dict:
# Configuration globale
EXTENSIONS_AUTHORIZED = {
"docx": "doc",
"pdf": "doc",
"png": "image",
"jpg": "image",
"jpeg": "image",
"mp4": "video",
}
FILE_PATH_S3 = {
"image": "images/",
"video": "videos/",
"doc": "docs/",
}
S3_CONFIG = {
"url": "https://api.mayotte-urgence.com/cdn-storage",
"bucket": "sywmtnsg",
"endpoint_url": "https://ht2-storage.n0c.com:5443",
"region_name": "ht2-storage",
"aws_access_key_id": "X89EU8NHL54CIKGMZP8Q",
"aws_secret_access_key": "71RsicRiiSgXDcAZLM4vCpEZESMJ4iA9sOKp0UQy",
}
# Validate file extension
extension = file.filename.split(".")[-1].lower()
if extension not in EXTENSIONS_AUTHORIZED:
raise HTTPException(status_code=400, detail="Unsupported file extension.")
# Generate unique file name
new_filename = f"{uuid.uuid4()}.{extension}"
file_type = EXTENSIONS_AUTHORIZED[extension]
folder_path = f"public/{FILE_PATH_S3[file_type]}{account_id}/"
full_path = f"{folder_path}{new_filename}"
# Initialize S3 client
s3_client = boto3.client(
"s3",
endpoint_url=S3_CONFIG["endpoint_url"],
region_name=S3_CONFIG["region_name"],
aws_access_key_id=S3_CONFIG["aws_access_key_id"],
aws_secret_access_key=S3_CONFIG["aws_secret_access_key"],
)
try:
# Check if the folder exists (by checking a dummy object in the folder)
result = s3_client.list_objects_v2(Bucket=S3_CONFIG["bucket"], Prefix=folder_path)
if result.get("KeyCount", 0) == 0:
# Create a dummy object to ensure the folder exists
s3_client.put_object(Bucket=S3_CONFIG["bucket"], Key=f"{folder_path}")
# Upload the file
s3_client.upload_fileobj(
file.file,
S3_CONFIG["bucket"],
full_path,
ExtraArgs={"ACL": "public-read"},
)
except (BotoCoreError, ClientError) as e:
raise HTTPException(status_code=500, detail=f"Failed to upload file: {str(e)}")
return {
"success": True,
"path_with_name": S3_CONFIG["url"]+full_path.replace("public", ""),
"filename": new_filename,
"original_filename": file.filename,
"filetype": extension,
}