After_Chido_Api/api/v1/auth.py

132 lines
5.2 KiB
Python

from datetime import datetime, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from fastapi import APIRouter, Depends, Form, HTTPException, status, Body
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy import update, select
from services.auth_service import AuthService
from models.schemas import Token, UserCreate, UserResponse
from config.database import get_db
from models.db import users_table
from config.settings import settings
from sqlalchemy.ext.asyncio import AsyncSession
from jose import jwt, JWTError
from smtplib import SMTP
from services.user_service import UserService
from utils.logging import logger
from utils.security import get_password_hash, pwd_context
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
@router.post("/signup", status_code=201, summary="User Signup")
async def signup(user: UserCreate, db=Depends(get_db)):
existing_user = await AuthService.get_user_by_email(user.email, db)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered."
)
return await UserService.create_user(user, db)
@router.post("/token", response_model=Token, summary="Login and get access token")
async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db=Depends(get_db)):
user = await AuthService.authenticate_user(form_data.username, form_data.password, db)
logger.info(f"User {form_data.username} logged in")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = AuthService.create_access_token(data={"sub": user["email"]})
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", summary="Get current user")
async def read_users_me(token:str = Depends(oauth2_scheme) , db=Depends(get_db)):
return await AuthService.get_current_user(token, db)
@router.post("/reset-password")
async def reset_password(token: str = Body(...), new_password: str = Body(...), db: AsyncSession = Depends(get_db)):
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email = payload.get("sub")
if email is None:
raise HTTPException(status_code=400, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=400, detail="Invalid or expired token")
# Hash the new password
hashed_password = pwd_context.hash(new_password)
# Update the user's password
query = (
update(users_table)
.where(users_table.c.email == email)
.values(hashed_password=hashed_password)
.execution_options(synchronize_session="fetch")
)
await db.execute(query)
await db.commit()
return JSONResponse(content={"message": "Password updated successfully."})
@router.post("/password-reset-request")
async def password_reset_request(email: str, db: AsyncSession = Depends(get_db)):
query = select(users_table).where(users_table.c.email == email)
result = await db.execute(query)
user = result.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Générer un token JWT
reset_token = jwt.encode(
{"sub": email, "exp": datetime.utcnow() + timedelta(hours=1)},
settings.secret_key,
algorithm=settings.algorithm,
)
reset_link = f"{settings.resetpass_url}/?token={reset_token}"
# Envoyer le lien par email
try:
subject = "mot de passe perdu"
sender_email = settings.email_username
receiver_email = email
logger.info(f"sender {settings.email_username} receiver {email}")
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = sender_email
message["To"] = receiver_email
text = f"You requested a password reset. Click the link below to reset your password:\n{reset_link}"
html = f"""
<html>
<body>
<p>You requested a password reset.<br>
Click the link below to reset your password:<br>
<a href="{reset_link}">Redefinir mon mot de passe</a>
</p>
</body>
</html>
"""
part1 = MIMEText(text, "plain")
part2 = MIMEText(html, "html")
message.attach(part1)
message.attach(part2)
with SMTP(settings.email_host, settings.email_port) as server:
server.starttls()
server.login(settings.email_username, settings.email_password)
server.sendmail(sender_email, receiver_email, message.as_string())
logger.info(f"Password reset email sent to {email}")
return JSONResponse(content={"message": "Password reset link sent via email."})
except Exception as e:
logger.error(f"Failed to send password reset email to {email}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to send password reset email. sender {sender_email} receiver {receiver_email}")