from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from fastapi import APIRouter, HTTPException, status, Body from fastapi.responses import JSONResponse from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlalchemy import update, select from services.auth_service import AuthService from models.schemas import Token, UserCreate, UserResponse from config.database import get_db from models.db import users_table from config.settings import settings from jose import jwt, JWTError from smtplib import SMTP from services.user_service import UserService from utils.logging import logger from utils.security import get_password_hash, pwd_context router = APIRouter() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token") @router.post("/signup", status_code=201, summary="User Signup") async def signup(user: UserCreate): async with get_db() as db: existing_user = await AuthService.get_user_by_email(user.email, db) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered." ) return await UserService.create_user(user, db) @router.post("/token", response_model=Token, summary="Login and get access token") async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Body(...)): async with get_db() as db: user = await AuthService.authenticate_user(form_data.username, form_data.password, db) logger.info(f"User {form_data.username} logged in") if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = AuthService.create_access_token(data={"sub": user["email"]}) return {"access_token": access_token, "token_type": "bearer"} @router.post("/isvalid-token", summary="Verify token validity") async def verify_token(token: str = Body(...)): try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) return {"valid": True, "message": "Token is valid", "payload": payload} except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", ) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", ) @router.get("/me", summary="Get current user") async def read_users_me(token: str = Body(...)): async with get_db() as db: return await AuthService.get_current_user(token, db) @router.post("/reset-password") async def reset_password(token: str = Body(...), new_password: str = Body(...)): async with get_db() as db: try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) email = payload.get("sub") if email is None: raise HTTPException(status_code=400, detail="Invalid token") except JWTError: raise HTTPException(status_code=400, detail="Invalid or expired token") hashed_password = pwd_context.hash(new_password) query = ( update(users_table) .where(users_table.c.email == email) .values(hashed_password=hashed_password) .execution_options(synchronize_session="fetch") ) await db.execute(query) return JSONResponse(content={"message": "Password updated successfully."}) @router.post("/password-reset-request") async def password_reset_request(email: str = Body(...)): async with get_db() as db: query = select(users_table).where(users_table.c.email == email) result = await db.execute(query) user = result.fetchone() if not user: raise HTTPException(status_code=404, detail="User not found") reset_token = jwt.encode( {"sub": email, "exp": datetime.utcnow() + timedelta(hours=1)}, settings.secret_key, algorithm=settings.algorithm, ) reset_link = f"{settings.resetpass_url}/?token={reset_token}" try: subject = "mot de passe perdu" sender_email = settings.email_username receiver_email = email logger.info(f"sender {settings.email_username} receiver {email}") message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = sender_email message["To"] = receiver_email text = f"You requested a password reset. Click the link below to reset your password:\n{reset_link}" html = f"""
You requested a password reset.
Click the link below to reset your password:
Redefinir mon mot de passe