82 lines
2.1 KiB
Python
82 lines
2.1 KiB
Python
|
|
import hashlib
|
||
|
|
import secrets
|
||
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
import jwt
|
||
|
|
from argon2 import PasswordHasher
|
||
|
|
from argon2.exceptions import InvalidHash, VerifyMismatchError
|
||
|
|
from fastapi import HTTPException, status
|
||
|
|
|
||
|
|
from .config import get_settings
|
||
|
|
|
||
|
|
password_hasher = PasswordHasher()
|
||
|
|
|
||
|
|
|
||
|
|
def hash_password(password: str) -> str:
|
||
|
|
return password_hasher.hash(password)
|
||
|
|
|
||
|
|
|
||
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
||
|
|
try:
|
||
|
|
return password_hasher.verify(password_hash, password)
|
||
|
|
except (VerifyMismatchError, InvalidHash):
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def create_access_token(
|
||
|
|
*,
|
||
|
|
user_id: str,
|
||
|
|
role_codes: list[str],
|
||
|
|
permission_codes: list[str],
|
||
|
|
expires_minutes: int | None = None,
|
||
|
|
) -> tuple[str, int]:
|
||
|
|
settings = get_settings()
|
||
|
|
now = datetime.now(timezone.utc)
|
||
|
|
minutes = expires_minutes or settings.access_token_expire_minutes
|
||
|
|
expires_at = now + timedelta(minutes=minutes)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"sub": user_id,
|
||
|
|
"roles": role_codes,
|
||
|
|
"permissions": permission_codes,
|
||
|
|
"iat": int(now.timestamp()),
|
||
|
|
"exp": int(expires_at.timestamp()),
|
||
|
|
}
|
||
|
|
token = jwt.encode(
|
||
|
|
payload,
|
||
|
|
settings.jwt_secret_key,
|
||
|
|
algorithm=settings.jwt_algorithm,
|
||
|
|
)
|
||
|
|
return token, int((expires_at - now).total_seconds())
|
||
|
|
|
||
|
|
|
||
|
|
def decode_access_token(token: str) -> dict[str, Any]:
|
||
|
|
settings = get_settings()
|
||
|
|
try:
|
||
|
|
payload = jwt.decode(
|
||
|
|
token,
|
||
|
|
settings.jwt_secret_key,
|
||
|
|
algorithms=[settings.jwt_algorithm],
|
||
|
|
)
|
||
|
|
except jwt.PyJWTError as exc:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail="Invalid or expired access token",
|
||
|
|
) from exc
|
||
|
|
|
||
|
|
if not payload.get("sub"):
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail="Invalid access token payload",
|
||
|
|
)
|
||
|
|
return payload
|
||
|
|
|
||
|
|
|
||
|
|
def create_refresh_token() -> str:
|
||
|
|
return secrets.token_urlsafe(48)
|
||
|
|
|
||
|
|
|
||
|
|
def hash_token(token: str) -> str:
|
||
|
|
return hashlib.sha256(token.encode("utf-8")).hexdigest()
|