Gerando tokens
Agora que já podemos criar usuários é importante conseguirmos autenticar os usuários pois desta forma podemos limitar o acesso a alguns endpoints.
Esse será arquivo com a maior quantidade de código boilerplate.
EDITE o arquivo dundie/auth.py
vamos criar as classes e funções necessárias
para a implementação de JWT que é a autenticação baseada em token e vamos
usar o algoritmo selecionado no arquivo de configuração.
Neste arquivo vamos criar os seguintes objetos:
- Um esquema de autenticação baseado em oauth, este objeto é usado pelo FastAPI para exibir um formulário de login e outros controles de autenticação na página /docs.
- Classes para serializar os 3 tipos de token que criaremos (token, refresh_token, reset_password_token)
- Fução que cria um token usando o algoritmo especificado
- Função que recebe o token e valida sua autenticidade
- Funções para retornar o objeto
User
sempre que precisarmos saber qual usuário está autenticado - Dependência para injetarmos em todas as funções que necessitem de autenticação
dundie/auth.py
"""Token based auth"""
from datetime import datetime, timedelta
from typing import Callable, Optional, Union
from functools import partial
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from sqlmodel import Session, select
from dundie.config import settings
from dundie.db import engine
from dundie.models.user import User
from dundie.security import verify_password
SECRET_KEY = settings.security.secret_key # pyright: ignore
ALGORITHM = settings.security.algorithm # pyright: ignore
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Models
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
class RefreshToken(BaseModel):
refresh_token: str
class TokenData(BaseModel):
username: Optional[str] = None
# Functions
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None,
scope: str = "access_token",
) -> str:
"""Creates a JWT Token from user data
scope: access_token or refresh_token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "scope": scope})
encoded_jwt = jwt.encode(
to_encode,
SECRET_KEY, # pyright: ignore
algorithm=ALGORITHM, # pyright: ignore
)
return encoded_jwt
create_refresh_token = partial(create_access_token, scope="refresh_token")
def authenticate_user(
get_user: Callable, username: str, password: str
) -> Union[User, bool]:
"""Authenticate the user"""
user = get_user(username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def get_user(username) -> Optional[User]:
"""Get user from database"""
query = select(User).where(User.username == username)
with Session(engine) as session:
return session.exec(query).first()
def get_current_user(
token: str = Depends(oauth2_scheme),
request: Request = None, # pyright: ignore
fresh=False
) -> User:
"""Get current user authenticated"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if request:
if authorization := request.headers.get("authorization"):
try:
token = authorization.split(" ")[1]
except IndexError:
raise credentials_exception
try:
payload = jwt.decode(
token,
SECRET_KEY, # pyright: ignore
algorithms=[ALGORITHM] # pyright: ignore
)
username: str = payload.get("sub") # pyright: ignore
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
if fresh and (not payload["fresh"] and not user.superuser):
raise credentials_exception
return user
# FastAPI dependencies
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""Wraps the sync get_active_user for sync calls"""
return current_user
AuthenticatedUser = Depends(get_current_active_user)
async def validate_token(token: str = Depends(oauth2_scheme)) -> User:
"""Validates user token"""
user = get_current_user(token=token)
return user
O objeto AuthenticatedUser
é uma dependência do FastAPI e é
através dele que iremos garantir que nossas rotas estejas protegidas
com token.
Agora só falta registrarmos as URLs responsáveis por gerar a validar o token -->