From 81a93c56834d9c6d61d51bfd7189146f4689682e Mon Sep 17 00:00:00 2001 From: Anaz Date: Tue, 25 Feb 2025 09:34:56 +0400 Subject: [PATCH] Correction token duration and added token validity check endpoint --- .env | 2 +- api/v1/auth.py | 17 +++++++++++++++++ config/settings.py | 2 +- services/auth_service.py | 26 ++++++++++++++++++++++++++ utils/security.py | 4 ++-- 5 files changed, 47 insertions(+), 4 deletions(-) diff --git a/.env b/.env index e7d86c8..4b2bdef 100644 --- a/.env +++ b/.env @@ -4,7 +4,7 @@ DATABASE_URL=mysql+aiomysql://sywmtnsg_admin:EEy_>2JJS0@localhost:6033/sywmtnsg_ # Configuration pour JWT SECRET_KEY=LAGs7G8Sis9aQHcipROxpjYRxFZKjr4wNm-_O0pBTkjNYv1rgPUR87VcNswH_VYGpIrsyGdqnNa3vcVSH0f5Tg ALGORITHM=HS256 -ACCESS_TOKEN_EXPIRE_MINUTES=30 +ACCESS_TOKEN_EXPIRE_MINUTES=4320 # Configuration pour AWS S3 (à remplir si nécessaire) #AWS_ACCESS_KEY_ID=your-aws-access-key diff --git a/api/v1/auth.py b/api/v1/auth.py index 068dbe1..c812523 100644 --- a/api/v1/auth.py +++ b/api/v1/auth.py @@ -43,6 +43,23 @@ async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends access_token = AuthService.create_access_token(data={"sub": user["email"]}) return {"access_token": access_token, "token_type": "bearer"} +@router.post("/isvalid-token", summary="Verify token validity") +async def verify_token(token: str = Body(...)): + try: + # Décoder le token pour vérifier sa validité + payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) + return {"valid": True, "message": "Token is valid", "payload": payload} + except jwt.ExpiredSignatureError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token has expired", + ) + except JWTError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token", + ) + @router.get("/me", summary="Get current user") async def read_users_me(token:str = Depends(oauth2_scheme) , db=Depends(get_db)): return await AuthService.get_current_user(token, db) diff --git a/config/settings.py b/config/settings.py index e54d932..11d82e5 100644 --- a/config/settings.py +++ b/config/settings.py @@ -5,7 +5,7 @@ class Settings(BaseSettings): database_url: str = "mysql+aiomysql://sywmtnsg_admin:EEy_>2JJS0@localhost:6033/sywmtnsg_dm_management" secret_key: str = "LAGs7G8Sis9aQHcipROxpjYRxFZKjr4wNm-_O0pBTkjNYv1rgPUR87VcNswH_VYGpIrsyGdqnNa3vcVSH0f5Tg" algorithm: str = "HS256" - access_token_expire_minutes: int = 30 + access_token_expire_minutes: int = 4320 aws_access_key_id: str = "" aws_secret_access_key: str = "" aws_bucket_name: str = "" diff --git a/services/auth_service.py b/services/auth_service.py index a8e1406..9d01978 100644 --- a/services/auth_service.py +++ b/services/auth_service.py @@ -62,6 +62,32 @@ class AuthService: expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) + + @staticmethod + def verify_token(token: str) -> dict: + try: + # Décoder le token + payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) + + # Vérifier si le token est expiré + expiration_time = payload.get("exp") + if expiration_time and datetime.now(timezone.utc).timestamp() > expiration_time: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token has expired", + ) + + return payload + except jwt.ExpiredSignatureError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token has expired", + ) + except JWTError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token", + ) @staticmethod async def get_current_user(token: str, db): diff --git a/utils/security.py b/utils/security.py index 4a6947e..afa52c3 100644 --- a/utils/security.py +++ b/utils/security.py @@ -13,6 +13,6 @@ def verify_password(plain_password: str, hashed_password: str) -> bool: def create_access_token(data: dict) -> str: to_encode = data.copy() - expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) + expire = datetime.now(datetime.timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) - return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) \ No newline at end of file + return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)