79 lines
2.8 KiB
Python
79 lines
2.8 KiB
Python
# app/services/upload_service.py
|
|
from typing import Self
|
|
import uuid
|
|
import boto3
|
|
from botocore.exceptions import NoCredentialsError, BotoCoreError, ClientError
|
|
from fastapi import HTTPException, UploadFile
|
|
|
|
class UploadService:
|
|
|
|
async def upload_image_to_s3(file: UploadFile, account_id: str) -> dict:
|
|
|
|
# Configuration globale
|
|
EXTENSIONS_AUTHORIZED = {
|
|
"docx": "doc",
|
|
"pdf": "doc",
|
|
"png": "image",
|
|
"jpg": "image",
|
|
"jpeg": "image",
|
|
"mp4": "video",
|
|
}
|
|
FILE_PATH_S3 = {
|
|
"image": "images/",
|
|
"video": "videos/",
|
|
"doc": "docs/",
|
|
}
|
|
|
|
S3_CONFIG = {
|
|
"url": "https://api.mayotte-urgence.com/cdn-storage",
|
|
"bucket": "sywmtnsg",
|
|
"endpoint_url": "https://ht2-storage.n0c.com:5443",
|
|
"region_name": "ht2-storage",
|
|
"aws_access_key_id": "X89EU8NHL54CIKGMZP8Q",
|
|
"aws_secret_access_key": "71RsicRiiSgXDcAZLM4vCpEZESMJ4iA9sOKp0UQy",
|
|
}
|
|
|
|
# Validate file extension
|
|
extension = file.filename.split(".")[-1].lower()
|
|
if extension not in EXTENSIONS_AUTHORIZED:
|
|
raise HTTPException(status_code=400, detail="Unsupported file extension.")
|
|
|
|
# Generate unique file name
|
|
new_filename = f"{uuid.uuid4()}.{extension}"
|
|
file_type = EXTENSIONS_AUTHORIZED[extension]
|
|
folder_path = f"public/{FILE_PATH_S3[file_type]}{account_id}/"
|
|
full_path = f"{folder_path}{new_filename}"
|
|
|
|
# Initialize S3 client
|
|
s3_client = boto3.client(
|
|
"s3",
|
|
endpoint_url=S3_CONFIG["endpoint_url"],
|
|
region_name=S3_CONFIG["region_name"],
|
|
aws_access_key_id=S3_CONFIG["aws_access_key_id"],
|
|
aws_secret_access_key=S3_CONFIG["aws_secret_access_key"],
|
|
)
|
|
|
|
try:
|
|
# Check if the folder exists (by checking a dummy object in the folder)
|
|
result = s3_client.list_objects_v2(Bucket=S3_CONFIG["bucket"], Prefix=folder_path)
|
|
if result.get("KeyCount", 0) == 0:
|
|
# Create a dummy object to ensure the folder exists
|
|
s3_client.put_object(Bucket=S3_CONFIG["bucket"], Key=f"{folder_path}")
|
|
|
|
# Upload the file
|
|
s3_client.upload_fileobj(
|
|
file.file,
|
|
S3_CONFIG["bucket"],
|
|
full_path,
|
|
ExtraArgs={"ACL": "public-read"},
|
|
)
|
|
except (BotoCoreError, ClientError) as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to upload file: {str(e)}")
|
|
|
|
return {
|
|
"success": True,
|
|
"path_with_name": S3_CONFIG["url"]+full_path.replace("public", ""),
|
|
"filename": new_filename,
|
|
"original_filename": file.filename,
|
|
"filetype": extension,
|
|
} |