from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from fastapi import APIRouter, Depends, HTTPException, status, Body from fastapi.responses import JSONResponse from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy import update, select from services.auth_service import AuthService from models.schemas import Token, UserCreate, UserResponse from config.database import get_db from models.db import users_table from config.settings import settings from sqlalchemy.ext.asyncio import AsyncSession from jose import jwt, JWTError from smtplib import SMTP from utils.logging import logger from utils.security import verify_password, get_password_hash, pwd_context router = APIRouter() @router.post("/signup", response_model=UserResponse, status_code=201, summary="User Signup") async def signup(user: UserCreate, db=Depends(get_db)): existing_user = await AuthService.get_user_by_email(user.email, db) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered." ) user.password = get_password_hash(user.password) return await AuthService.create_user(user, db) @router.post("/token", response_model=Token, summary="Login and get access token") async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db=Depends(get_db)): user = await AuthService.authenticate_user(form_data.username, form_data.password, db) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = AuthService.create_access_token(data={"sub": user["email"]}) return {"access_token": access_token, "token_type": "bearer"} @router.get("/me", response_model=UserResponse, summary="Get current user") async def read_users_me(current_user: UserResponse = Depends(AuthService.get_current_user)): return current_user @router.post("/reset-password") async def reset_password(token: str = Body(...), new_password: str = Body(...), db: AsyncSession = Depends(get_db)): try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email = payload.get("sub") if email is None: raise HTTPException(status_code=400, detail="Invalid token") except JWTError: raise HTTPException(status_code=400, detail="Invalid or expired token") # Hash the new password hashed_password = pwd_context.hash(new_password) # Update the user's password query = ( update(users_table) .where(users_table.c.email == email) .values(hashed_password=hashed_password) .execution_options(synchronize_session="fetch") ) await db.execute(query) await db.commit() return JSONResponse(content={"message": "Password updated successfully."}) @router.post("/password-reset-request") async def password_reset_request(email: str, db: AsyncSession = Depends(get_db)): query = select(users_table).where(users_table.c.email == email) result = await db.execute(query) user = result.fetchone() if not user: raise HTTPException(status_code=404, detail="User not found") # Générer un token JWT reset_token = jwt.encode( {"sub": email, "exp": datetime.utcnow() + timedelta(hours=1)}, settings.secret_key, algorithm=settings.algorithm, ) reset_link = f"{settings.resetpass_url}/?token={reset_token}" # Envoyer le lien par email try: subject = "mot de passe perdu" sender_email = settings.email_username receiver_email = email logger.info(f"sender {settings.email_username} receiver {email}") message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = sender_email message["To"] = receiver_email text = f"You requested a password reset. Click the link below to reset your password:\n{reset_link}" html = f"""
You requested a password reset.
Click the link below to reset your password:
Redefinir mon mot de passe