68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
"""JWT authentication utilities — token creation, validation, and dependency."""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
import jwt
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
|
|
from app.config import settings
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
|
|
|
|
SECRET_KEY = settings.JWT_SECRET
|
|
ALGORITHM = settings.JWT_ALGORITHM
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES
|
|
REFRESH_TOKEN_EXPIRE_DAYS = settings.REFRESH_TOKEN_EXPIRE_DAYS
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT access token.
|
|
|
|
Encodes *data* into a JWT signed with SECRET_KEY.
|
|
If *expires_delta* is omitted, ACCESS_TOKEN_EXPIRE_MINUTES is used.
|
|
"""
|
|
to_encode = data.copy()
|
|
expire = datetime.now(timezone.utc) + (
|
|
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
)
|
|
to_encode.update({"exp": expire, "type": "access"})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def create_refresh_token(data: dict) -> str:
|
|
"""Create a JWT refresh token valid for REFRESH_TOKEN_EXPIRE_DAYS days."""
|
|
to_encode = data.copy()
|
|
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
to_encode.update({"exp": expire, "type": "refresh"})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def verify_token(token: str) -> dict:
|
|
"""Decode and verify a JWT.
|
|
|
|
Raises HTTPException 401 if the token is invalid or expired.
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token has expired",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
|
|
"""FastAPI dependency that extracts the current user from a Bearer JWT."""
|
|
payload = verify_token(token)
|
|
return payload
|