After_Chido_Api/api/v1/auth.py

146 lines
5.9 KiB
Python

from datetime import datetime, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from fastapi import APIRouter, Depends, HTTPException, status, Body
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy import update, select
from services.auth_service import AuthService
from models.schemas import Token, UserCreate, UserResponse
from config.database import get_db
from models.db import users_table
from config.settings import settings
from jose import jwt, JWTError
from smtplib import SMTP
from services.user_service import UserService
from utils.logging import logger
from utils.security import get_password_hash, pwd_context
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
@router.post("/signup", status_code=201, summary="User Signup")
async def signup(user: UserCreate):
async with get_db() as db:
existing_user = await AuthService.get_user_by_email(user.email, db)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered."
)
return await UserService.create_user(user, db)
@router.post("/token", response_model=Token, summary="Login and get access token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
async with get_db() as db:
user = await AuthService.authenticate_user(form_data.username, form_data.password, db)
logger.info(f"User {form_data.username} logged in")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = AuthService.create_access_token(data={"sub": user["email"]})
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/isvalid-token", summary="Verify token validity")
async def verify_token(token: str = Body(...)):
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return {"valid": True, "message": "Token is valid", "payload": payload}
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
@router.get("/me", summary="Get current user")
async def read_users_me(token: str = Depends(oauth2_scheme)):
async with get_db() as db:
return await AuthService.get_current_user(token, db)
@router.post("/reset-password")
async def reset_password(token: str = Body(...), new_password: str = Body(...)):
async with get_db() as db:
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email = payload.get("sub")
if email is None:
raise HTTPException(status_code=400, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=400, detail="Invalid or expired token")
hashed_password = pwd_context.hash(new_password)
query = (
update(users_table)
.where(users_table.c.email == email)
.values(hashed_password=hashed_password)
.execution_options(synchronize_session="fetch")
)
await db.execute(query)
return JSONResponse(content={"message": "Password updated successfully."})
@router.post("/password-reset-request")
async def password_reset_request(email: str = Body(...)):
async with get_db() as db:
query = select(users_table).where(users_table.c.email == email)
result = await db.execute(query)
user = result.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
reset_token = jwt.encode(
{"sub": email, "exp": datetime.utcnow() + timedelta(hours=1)},
settings.secret_key,
algorithm=settings.algorithm,
)
reset_link = f"{settings.resetpass_url}/?token={reset_token}"
try:
subject = "mot de passe perdu"
sender_email = settings.email_username
receiver_email = email
logger.info(f"sender {settings.email_username} receiver {email}")
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = sender_email
message["To"] = receiver_email
text = f"You requested a password reset. Click the link below to reset your password:\n{reset_link}"
html = f"""
<html>
<body>
<p>You requested a password reset.<br>
Click the link below to reset your password:<br>
<a href="{reset_link}">Redefinir mon mot de passe</a>
</p>
</body>
</html>
"""
part1 = MIMEText(text, "plain")
part2 = MIMEText(html, "html")
message.attach(part1)
message.attach(part2)
with SMTP(settings.email_host, settings.email_port) as server:
server.starttls()
server.login(settings.email_username, settings.email_password)
server.sendmail(sender_email, receiver_email, message.as_string())
logger.info(f"Password reset email sent to {email}")
return JSONResponse(content={"message": "Password reset link sent via email."})
except Exception as e:
logger.error(f"Failed to send password reset email to {email}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to send password reset email. sender {sender_email} receiver {receiver_email}")