Projeto Dundie API
LINUXTips - Python Web API
Olá, este é o material de apoio do projeto desenvolvido durante o treinamento Python Web API para ter acesso aos videos e grupo de alunos + emissão do certificado clique no link e matricule-se.
Durante o treinamento, você vai aprender:
- Montar o ambiente de desenvolvimento
- Organizar as pastas do projeto
- Orquestrar os serviços com containers
- Modelar o banco de dados
- Fazer migração de dados
- Criar uma CLI para gerenciar o projeto
- Criar uma API com FastAPI
- Autenticar usuários com JWT
- Enviar e-mails com background tasks
- Trabalhar com task queue
- Planejar testes
- Preparar um pipeline de testes
- Escrever testes automatizados
- Usar protocolo web-sockets
- Integrar com um Front-End
Se tiver qualquer dúvida não hesite em postar em nosso grupo do Discord, lá você vai obter ajuda do intrutor e também dos demais alunos. (O link para o grupo está no primeiro item lá na plataforma)
Ahh e se você não usa o Discord, não tem problema, pode acessar https://LINUXtips/social e postar lá aba referente ao treinamento.
Faça parte do Discord da LINUXtips cliando em Convite Discord Geral (observação: no discord existem salas públicas e uma sala exclusiva para alunos.)
Siga o autor deste treinamento no twitter Bruno Rocha
Tudo pronto para começar?
Você pode mudar o tema dessa página, caso prefira pro exemplo dark ou light theme clicando no ícone na barra superior.
commit 99412ec on 2024-09-13 23:44:08
Requisitos
Opção 1: Executando localmente
- Acesso a um computador com Python
- Terminal Linux/Mac (ou WSL)
- Docker e docker compose
- Editor de código de sua preferência (VSCode, vim, micro, emacs)
Durante as aulas em video o instrutor estará usando o editor vim mas você pode usar o editor de sua preferência.
Opção 2: Executando online no Gitpod
- Browser (Chrome ou Firefox)
- Conta no https://gitpod.io
A conta gratuita do gitpod oferece 50 horas por mês de uso e no ambiente já tem editor (VSCode), terminal Linux e containers, tudo isso rodando diretamenteem seu navegador.
Os comandos apresentados serão todos executados em um terminal Linux, se estiver no Windows recomendo usar o WSL, uma máquina virtual ou um container Linux, ou por conta própria adaptar os comandos necessários.
Repositório
Obtenha uma cópia do Repositório
- Faça login no github e faça um fork do respositório deste projeto clicando em:
Os blocos de código aqui apresentados que contém $
significam que é um comando que deve ser executado no terminal, o simbolo $
não faz parte do comando, este simbolo indica o prompt e provavelmente o seu terminal irá exibir $
ou algum caracter de marcação similar como >
ou #
.
É recomendado que você digite os comandos ao invés de copiar e colar, mas caso prefira copiar e colar lembre-se que o sinal de $
não precisa ser copiado.
Obtendo os arquivos
Opção 1: Rodando localmente em seu computador
Faça o clone o projeto para uma pasta no seu ambiente local usando git, substituindo USER
pelo seu nome de usuário do github:
$ cd pasta/onde/vc/guarda/seus/projetos
$ git clone git@github.com:USER/dundie-api.git
$ cd dundie-api
Para clonar usando git@...
você precisa ter seu git local configurado com chave ssh, caso contrário terá que clonar usando o endereço https://github.com/USER/dundie-api
Opção 2: Rodando no seu navegador com gitpod
Caso tenha optado por utilizar o gitpod basta acessar https://gitpod.io e efetuar o login utilizando sua conta do Github.
O gitpod irá criar um ambiente virtual para você, com tudo configurado para rodar o projeto e você terá acesso ao editor e ao terminal.
Ambiente de Desenvolvimento
Ambiente Dev é para fins de programação o conjunto de ferramentas, bibliotecas e variáveis que precisam estar disponíveis para desenvolver, testar e manter o projeto.
Preparando o Ambiente
Opção 1: Rodando no seu computador local
Dentro da pasta dundie-api
e crie um ambiente virtual.
$ python -m venv .venv
E ative a virtualenv
No Linux/Mac ou Windows WSL
$ source .venv/bin/activate
No Windows Power Shell
$ .\venv\Scripts\activate.ps1
O ambiente virtual ativado fará com que seu terminal exiba (.venv)
juntamente do prompt, você ainda pode digitar no terminal which python
para confirmar se o ambiente está mesmo ativado, o output deverá ser [...]/.venv/bin/python
Opção 2: Rodando online com o gitpod
No gitpod.io não é preciso criar um abiente virtual, o ambiente já vem configurado isoladamente.
Instalando as dependências
Com o ambiente pronto podemos agora instalar as dependências básicas do projeto que estão contidas no arquivo requirements-dev.txt
.
Confirme que o arquivo já contém a lista de todas as ferramentas que usaremos para fins de desenvolvimento e debugging, confira o conteúdo do arquivo abrindo em seu editor ou através do comando cat
no terminal Linux.
$ cat requirements-dev.txt
ipython # terminal
ipdb # debugger
sdb # debugger remoto
pip-tools # lock de dependencias
pytest # execução de testes
pytest-order # ordenação de testes
httpx==0.26.0 # requests async para testes
black # auto formatação
flake8 # linter
Instalaremos as dependencias com a ferramenta pip
que é um módulo do Python.
-
Atualizamos o pip
$ python -m pip install --upgrade pip
-
Instalamos as dependencias de desenvolvimento
$ python -m pip install -r requirements-dev.txt
-
Instalamos o projeto em modo de desenvolvimento.
Esta instalação permite maior facilidade nos testes e auto-complete do editor de código
$ python -m pip install -e ".[dev]"
Os metadados de instalação estão definidos no arquivo pyproject.toml
, neste arquivo estão listados os atributos do projeto, os arquivos e módulos que fazem parte, a versão e as dependencias.
O problema que vamos resolver
A Dunder Mifflin é a maior e mais bem sucedida empresa de papéis do mundo
Este é Michael Scott, gerente regional da Dunder Mifflin de Scranton, Pensilvania.
Ele se acha o melhor gerente do mundo, mas na verdade é bem sem graça e as vezes até inconveniente.
Este é o time de funcionários que o Michael gerencia, eles estão com cara de alegres (com excessão da Angela), porém nem todos estão felizes, eles estão na verdade bastante desmotivados e começaram a reclamar das piadas sem graça do Michael e da falta de reconhecimento da empresa.
Projeto Dundie Rewards
A Dunder Mifflin nos contratou para desenvolver uma API para o novo projeto Dundie Rewards, que consiste em um sistema de recompensas onde cada funcionário terá uma conta e irá ganhar pontos.
Esses pontos poderão ser atribuidos pelo Michael quando ele achar que um funcionário merece, automaticamente em datas especiais ou em batimento de metas, e os funcionários poderão trocar pontos entre eles como forma de reconhecimento do trabalho dos colegas.
Ao final do ano, os funcionários vão poder trocar seus pontos por dinheiro para gastar como quiserem.
Vamos começar este projeto desenvolvendo a estrutura de pastas, a orquestração do ambiente, a modelagem de dados e a API.
Futuramente na fase 2 iremos integrar com um front-end.
Funcionalidades
Usuários
- Registro de novos funcionários via CLI e via API
- CLI:
dundie create-user [Nome] [Email] [Password Inicial] [departamento]
- API:
POST /user/ data={nome:.., email:.., departamento:.., password:..}
- Usuários do departamento
management
são considerados super usuáripos - O
username
é gerado a partir do slug do nome do usuárioPam Besly
->pam-besly
- CLI:
- Autenticação de usuários com JWT token
- Para usar a API será necessário uma chamada
GET /token data={username, password}
e todas as chamadas subsequentes precisarão do token informado nosHEADERS
.
- Para usar a API será necessário uma chamada
- O usuário pode requisitar um token para alteração de senha
Transações
- Transações não podem ser deletadas ou canceladas, cada transação é um item no histórico da conta do usuário em questão.
- Qualquer usuário com um token poderá fazer uma chamada para
POST /transaction/username/ data={value=100}
esta chamada adiciona um novo registro na tabelaTransaction
contentouser_id: <usuario_dono_da_conta>, from_id: <usuário que fez o depósito>, value: <integer>, date: <timestamp>
- O usuário só poderá fazer uma transação caso o saldo da sua própria conta seja suficiente para cobrir o custo da transação.
- O saldo de um usuário é a soma de todas as suas transações.
Endpoints:
POST /transaction/username/
- Registra uma transação para um usuário- Acesso:
Geral
- Validação: O usuário autenticado é o
from_id
da transação e deve ter saldo suficiente.
- Acesso:
GET /transaction/username/
- Retorna as transações de um usuário incluindo seubalance
(saldo todal)- Acesso:
Manager
ouusername == current_user
- Acesso:
GET /transaction/
- Retorna todas as transações
API Spec
Arquitetura
Fase 1
- 1 Serviço de API
- 1 Serviço de Banco de Dados
graph LR; A["API (FastAPI) fa:fa-globe"] B[("Banco de Dados (PG) fa:fa-cubes")] A --> B
Fase 2
- 1 Serviço de API
- 1 Serviço de Banco de Dados
- 1 Serviço consumidor de tarefas (RQ)
- 1 Serviço de fila de mensagens (Redis)
graph TD; A["API (FastAPI) fa:fa-globe"] B[("Banco de Dados (PG) fa:fa-cubes")] C>"Fila de mensagens (Redis) fa:fa-bars"] D[["Consumidor de tarefas (RQ) fa:fa-gears"]] A --> B A --> C D --> C D --> B
Agora vamos ver como será a estrutura dos arquivos -->
Estrutura de arquivos
Esta é estrutura deste repositório, os arquivos com *
são os que você vai precisar editar ao longo deste guia.
$ tree --filesfirst -L 3 -I docs
.
├── docker-compose.yaml # Container Orchestration
├── Dockerfile.dev # Container Dev Image
├── MANIFEST.in # Arquivos do projeto
├── pyproject.toml # Metadados do projeto
├── requirements-dev.txt # Dev tools
├── requirements.in # Dependencies
├── settings.toml # Config por ambiente
├── setup.py # Setuptools bootstrap
├── test.sh # CI Pipeline
├── dundie # Main Package
│ ├── app.py* # FastAPI app
│ ├── auth.py* # Token JWT
│ ├── cli.py* # CLI app
│ ├── config.py # Config management
│ ├── db.py* # Database connection
│ ├── default.toml # Default settings
│ ├── __init__.py
│ ├── security.py* # Password Hashing
│ ├── VERSION.txt # SCM versioning
│ ├── models
│ │ ├── __init__.py*
│ │ ├── transaction.py* # Models for transaction
│ │ └── user.py* # Models for User
│ ├── routes
│ │ ├── auth.py* # Token and Auth URLs
│ │ ├── __init__.py*
│ │ ├── transaction.py* # Transaction URLs
│ │ └── user.py* # User URLs
│ └── tasks
│ ├── __init__.py*
│ ├── transaction.py* # Transaction Taks
│ └── user.py* # User Tasks
├── postgres
│ ├── create-databases.sh # DB startup
│ └── Dockerfile # DB image
└── tests
├── conftest.py* # Pytest config
├── __init__.py
└── test_api.py* # API tests
Todos os arquivos acima já estão criados no repositório, você vai precisar apenas editar,
alguns arquivos como o .secrets.toml
(para guardar dados sensiveis) você irá criar localmente
pois este arquivo não deverá ser comitado ao repositório.
Criando uma API base
Vamos editar o arquivo dundie/app.py
e colocar a minima aplicação FastAPI
só para que possamos rodar o container e testar se tudo está funcionando.
dundie/app.py
from fastapi import FastAPI
app = FastAPI(
title="dundie",
version="0.1.0",
description="dundie is a rewards API",
)
Salve as alterações e agora vamos partir para a definição do container ->
Criando um container
Vamos agora verificar o Dockerfile.dev
que está na raiz do repositório e será a imagem responsável por executar nossa api.
Dockerfile.dev
# Build the app image
FROM python:3.10
# Create directory for the app user
RUN mkdir -p /home/app
# Create the app user
RUN groupadd app && useradd -g app app
# Create the home directory
ENV APP_HOME=/home/app/api
RUN mkdir -p $APP_HOME
WORKDIR $APP_HOME
# install
COPY . $APP_HOME
RUN pip install -r requirements-dev.txt
RUN pip install -e .
RUN chown -R app:app $APP_HOME
USER app
CMD ["uvicorn","dundie.app:app","--host=0.0.0.0","--port=8000","--reload"]
O arquivo acima define o passo a passo para construir uma imagem de container customizada a partir da python:3.10
, neste script de cosntrução da imagem estamos criando diretórios, ajustando permissões, copiando arquivos da aplicação e isntalando dependencias, além de definirmos o comando principal de execução do programa.
Com esta definição pronta o próximo passo é construir a imagem do container:
docker build -f Dockerfile.dev -t dundie:latest .
Agora em nosso sistema teremos uma imagem chamada dundie
com a tag latest
e podemos executar.
$ docker run --rm -it -v $(pwd):/home/app/api -p 8000:8000 dundie
INFO: Will watch for changes in these directories: ['/home/app/api']
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO: Started reloader process [1] using StatReload
INFO: Started server process [8]
INFO: Waiting for application startup.
INFO: Application startup complete.
Acesse: http://0.0.0.0:8000/docs e terá acesso a página default da OpenAPI spec que acompanha o FastAPI.
Ainda não temos rotas definidas portanto podemos passar o próximo passo.
Orquestrando serviços
Os serviços:
graph LR; A["API (FastAPI) fa:fa-globe"] B[("Banco de Dados (PG) fa:fa-cubes")] A --> B
Docker compose
Para iniciar a nossa API + o Banco de dados vamos precisar de um orquestrador de containers, em produção isso será feito com Kubernetes mas no ambiente de desenvolvimento podemos usar o docker compose.
No arquivo docker-compose.yaml
- Definimos 2 serviços
api
edb
- Informamos os parametros de build com os dockerfiles
- Na
api
abrimos a porta8000
- Na
api
passamos 2 variáveis de ambienteDUNDIE_DB__uri
eDUNDIE_DB_connect_args
para usarmos na conexão com o DB - Marcamos que a
api
depende dodb
para iniciar. - No
db
informamos o setup básico do postgres e pedimos para criar 2 bancos de dados, um para a app e um para testes.
docker-compose.yaml
version: '3.9'
services:
api:
build:
context: .
dockerfile: Dockerfile.dev
ports:
- "8000:8000"
environment:
DUNDIE_DB__uri: "postgresql://postgres:postgres@db:5432/${DUNDIE_DB:-dundie}"
DUNDIE_DB__connect_args: "{}"
SQLALCHEMY_SILENCE_UBER_WARNING: 1
volumes:
- .:/home/app/api
depends_on:
- db
stdin_open: true
tty: true
db:
build: postgres
image: dundie_postgres-13-alpine-multi-user
volumes:
- dundie_pg_data:/var/lib/postgresql/data
ports:
- "5432:5432"
environment:
- POSTGRES_DBS=dundie, dundie_test
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
dundie_pg_data:
O próximo passo é estando na raiz do repositório executar:
docker compose up -d
Para verificar os serviços que estão rodando:
$ docker compose ps
Name Command State Ports
api_1 /bin/sh -c uvicorn dundie... Up 8000
db_1 docker-entrypoint.sh postgres Up 5432
Os serviços ficarão em execução em segundo plano, se quiser manter o terminal aberto para acompanhar os logs pode omitir o -d
ou então abrir um segundo terminal e executar docker compose logs --follow
Modelagem
Definindo os models com Pydantic
Esta será a modelagem do banco de dados completo, mas começaremos apenas com a tabela de usuários.
https://dbdesigner.page.link/GqDU95ApwZs7a9RH9
Vamos modelar o banco de dados definido acima usando o SQLModel, que é uma biblioteca que integra o SQLAlchemy e o Pydantic e funciona muito bem com o FastAPI.
Vamos começar a estruturar os model principal para armazenar os usuários
EDITE o arquivo dundie/models/user.py
"""User related data models"""
from typing import Optional
from sqlmodel import Field, SQLModel
class User(SQLModel, table=True):
"""Represents the User Model"""
id: Optional[int] = Field(default=None, primary_key=True)
email: str = Field(unique=True, nullable=False)
username: str = Field(unique=True, nullable=False)
avatar: Optional[str] = None
bio: Optional[str] = None
password: str = Field(nullable=False)
name: str = Field(nullable=False)
dept: str = Field(nullable=False)
currency: str = Field(nullable=False)
@property
def superuser(self):
""""Users belonging to management dept are admins."""
return self.dept == "management"
Para que seja possivel importar e o SQLAlchemy reconhecer o nosso objeto EDITE arquivo dundie/models/__init__.py
adicione
from sqlmodel import SQLModel
from .user import User
__all__ = ["User", "SQLModel"]
NOTA as tabelas Balance e Transaction iremos definir posteriormente.
Agora podemos nos conectar com o banco de dados ->
Configurações
Agora que temos pelo menos uma tabela mapeada para uma classe precisamos estabelecer conexão com o banco de dados e para isso precisamos carregar configurações
Verifique o arquivo dundie/default.toml
[default]
[default.db]
uri = ""
connect_args = {check_same_thread=false}
echo = false
Lembra que no docker-compose.yaml
passamos as variáveis DUNDIE_DB...
aquelas variáveis vão sobrescrever os valores definidos no default.toml
, por exemplo, DUNDIE_DB__uri=...
irá preencher o valor uri
na seção [default.db]
do arquivo default.toml
Para carregar as configurações vamos usar o plugin dynaconf
que já está instalado e só precisamos carregar criando uma instancia de settings
que será o objeto usado durante toda a aplicação para acessar as configurações:
Veja em dundie/config.py
como estamos inicializando o plugin de configurações.
"""Settings module"""
import os
from dynaconf import Dynaconf
HERE = os.path.dirname(os.path.abspath(__file__))
settings = Dynaconf(
envvar_prefix="dundie",
preload=[os.path.join(HERE, "default.toml")],
settings_files=["settings.toml", ".secrets.toml"],
environments=["development", "production", "testing"],
env_switcher="dundie_env",
load_dotenv=False,
)
No arquivo acima estamos definindo que o objeto settings
irá
carregar variáveis do arquivo default.toml
e em seguida dos arquivos
settings.toml
e .secrets.toml
e que será possivel usar DUNDIE_
como
prefixo nas variáveis de ambiente para sobrescrever os valores.
Agora já podemos acessar esses valores e criar a conexão com o banco de dados -->
Conexão com o DB
Para conectar com o banco de dados, precisamos criar um objeto engine
, este objeto armazena as configurações
como o endereço do banco, usuário e senha. O objeto engine
é o responsável por executar as consultas SQL que
usaremos para definir as tabelas e também para consultar e alterar dados.
EDITE o arquivo dundie/db.py
e deixe conforme o código abaixo:
dundie/db.py
"""Database connection"""
from sqlmodel import create_engine
from .config import settings
engine = create_engine(
settings.db.uri, # pyright: ignore
echo=settings.db.echo, # pyright: ignore
connect_args=settings.db.connect_args, # pyright: ignore
)
Criamos um objeto engine
que aponta para uma conexão com o banco de
dados e para isso usamos as variáveis que lemos do settings
, o objeto settings
será capaz de carregar
essas variáveis do ambiente ou dos arquivos .toml
que definimos para configuração.
O comentário # pyright: ignore
só é necessário caso você esteja usando um editor com LSP que faz verificação
de tipos, como o VSCode ou Neovim. Este comentário faz com que o LSP ignore a checagem de tipos para essas linhas,
e é útil pois como esses valores são dinâmicos podemos receber qualquer tipo.
Migrations
Portanto agora já temos uma tabela mapeada e um conexão com o banco de dados precisamos agora garantir que a estrutura da tabela existe dentro do banco de dados.
Para isso vamos usar a biblioteca alembic
que gerencia migrações, ou seja,
alterações na estrutura das tabelas e automação de alteração em dados.
Começamos na raiz do repositório, no seu terminal rodando:
alembic init migrations
O alembic irá criar um arquivo chamado alembic.ini
e uma pasta chamada migrations
que servirá para armazenar o histórico de alterações do banco de dados.
Começaremos editando o arquivo migrations/env.py
Atenção nos comentários do snippet abaixo explicando exatamente onde efetuar cada uma das edições
# 1 - No topo do arquivo adicionamos
from dundie import models
from dundie.db import engine
from dundie.config import settings
# 2 - Perto da linha 23 mudamos de
# target_metadata = None
# para:
target_metadata = models.SQLModel.metadata
# 3 - Na função `run_migrations_offline()` mudamos
# url = config.get_main_option("sqlalchemy.url")
# para:
url = settings.db.uri
# 4 - Na função `run_migration_online` mudamos
# connectable = engine_from_config...
# para:
connectable = engine
Agora precisamos fazer só mais um ajuste
edite migrations/script.py.mako
e em torno da linha 10
adicione
#from alembic import op
#import sqlalchemy as sa
import sqlmodel # linha NOVA
Agora sim podemos começar a usar o alembic para gerenciar as migrations, precisamos executar este comando dentro do shell do container.
Executando comandos dentro do container
Todos os comandos a partir de agora serão executados no shell dentro do container e para fazer isso usaremos sempre docker compose exec
antes que qualquer comando.
Experimente: docker compose exec api /bin/bash
$ docker compose exec api /bin/bash
app@c5dd026e8f92:~/api$ # este é o shell dentro do container api
# digite exit para sair
Podemos redirecionar comandos diretamente para dentro do container com docker compose exec api [comando a ser executado]
Gerando e aplicando migrations
Agora para gerar um registro inicial de migration usaremos o comando alembic revision --autogenerate
e isso será executado dentro do container conforme exemplo abaixo:
$ docker compose exec api alembic revision --autogenerate -m "initial"
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.autogenerate.compare] Detected added table 'user'
Generating /home/app/api/migrations/versions/ee59b23815d3_initial.py ... done
Repare que o alembic identificou o nosso model User
e gerou uma migration
inicial que fará a criação desta tabela no banco de dados.
Podemos aplicar a migration rodando dentro do container com alembic upgrade head
:
$ docker compose exec api alembic upgrade head
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> ee59b23815d3, initial
E neste momento a tabela será criada no Postgres e já podemos começar a interagir via SQL client ou através da classe User
que modelamos anteriormente.
Pode usar um client como https://antares-sql.app para se conectar ao banco de dados, usar o psql na linha de comando ou abrir o shell do ipython dentro do container.
Acessando o banco de dados através do shell
$ docker compose exec api ipython
# Agora está no ipython dentro do shell do container
In [1]:
Digite
from sqlmodel import Session, select
from dundie.db import engine
from dundie.models import User
with Session(engine) as session:
print(list(session.exec(select(User))))
O resultado será uma lista vazia []
indicando que ainda não temos nenhum usuário no banco de dados.
Digite
exit
para sair do ipython.
Foi preciso muito boilerplate para conseguir interagir com banco de dados através do shell portanto para facilitar a nossa vida vamos adicionar uma aplicação cli
onde vamos poder executar tarefas administrativas via linha de comando como criar ou listar usuários. -->
Criando a CLI
Command Line Interface é uma parte importante de todo serviço, é através dessa ferramanta que geralmente os administradores do serviço interagem com ele, seja para realizar tarefas de manutenção, configuração ou recuperar o sistema em caso de falhas.
Vamos criar uma CLI para o nosso serviço, para isso vamos usar o typer, que é uma das melhores bibliotecas para criar CLIs em Python.
Começaremos adicionando um comando shell
que abrirá um shell interativo com os objetos da aplicação e um outro
comando user-list
para listar todos os usuários cadastrados.
EDITE dundie/cli.py
import typer
from rich.console import Console
from rich.table import Table
from sqlmodel import Session, select
from .config import settings
from .db import engine
from .models import User
main = typer.Typer(name="dundie CLI", add_completion=False)
@main.command()
def shell():
"""Opens interactive shell"""
_vars = {
"settings": settings,
"engine": engine,
"select": select,
"session": Session(engine),
"User": User,
}
typer.echo(f"Auto imports: {list(_vars.keys())}")
try:
from IPython import start_ipython
start_ipython(
argv=["--ipython-dir=/tmp", "--no-banner"], user_ns=_vars
)
except ImportError:
import code
code.InteractiveConsole(_vars).interact()
@main.command()
def user_list():
"""Lists all users"""
table = Table(title="dundie users")
fields = ["name", "username", "dept", "email", "currency"]
for header in fields:
table.add_column(header, style="magenta")
with Session(engine) as session:
users = session.exec(select(User))
for user in users:
table.add_row(*[getattr(user, field) for field in fields])
Console().print(table)
E agora podemos executar.
$ docker compose exec api dundie --help
Usage: dundie [OPTIONS] COMMAND [ARGS]...
╭─ Options ──────────────────────────────────────────────────╮
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────╯
╭─ Commands ─────────────────────────────────────────────────╮
│ shell Opens interactive shell │
│ user-list Lists all users │
╰────────────────────────────────────────────────────────────╯
E cada um dos comandos:
O comando user-list
para listar todos os usuários (que por enquanto irá retornar uma tabela vazia)
$ docker compose exec api dundie user-list
dundie users
┏━━━━━━┳━━━━━━━━━━┳━━━━━━┳━━━━━━━┳━━━━━━━━━━┓
┃ name ┃ username ┃ dept ┃ email ┃ currency ┃
┡━━━━━━╇━━━━━━━━━━╇━━━━━━╇━━━━━━━╇━━━━━━━━━━┩
└──────┴──────────┴──────┴───────┴──────────┘
e o comando shell
que irá abrir um shell interativo com os objetos da aplicação.
$ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User']
In [1]: session.exec(select(User))
Out[1]: <sqlalchemy.engine.result.ScalarResult at 0x7fb539d5e170>
In [2]: settings.db
Out[2]: <Box: {'connect_args': {}, 'uri': 'postgresql://postgres:postgres@db:5432/dundie', 'echo': False}>
Ainda não temos usuários cadastrados pois ainda está faltando uma parte importante que é o hash de senhas para os usuários, vamos resolver -->
Configurando SECRET_KEY
Precisamos ser capazes de encryptar tokens e gerar hash para as senhas dos usuários e para isso temos alguns requisitos, primeiro precisamos de uma chave secreta em nosso arquivo de settings, esta chave será usada em nosso algoritmo de criptografia quando começarmos a gerar tokens.
EDITE dundie/default.toml
e adicione ao final
dundie/default.toml
[default.security]
# Set secret key in .secrets.toml
# SECRET_KEY = ""
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 600
Como explicado no próprio comentário do arquivo default.toml
, vamos colocar uma secret key separada
no arquivo .secrets.toml
na raiz do repositório, isso é recomendável pois podemos adicionar o
arquivo .secrets.toml
ao .gitignore
para que ele não seja enviado para o repositório e desta
maneira evitamos expor a chave secreta.
CRIE o arquivo .secrets.toml
(na raiz do repositório)
[development]
dynaconf_merge = true
[development.security]
# openssl rand -hex 32
SECRET_KEY = "ONLYFORDEVELOPMENT"
Repare que estamos agora usando a seção environment
e isso tem a ver
com o modo como o dynaconf gerencia os settings, esses valores serão
carregados apenas durante a execução em fase de desenvolvimento, em produção
o dynaconf carrega apenas valores das variáveis de ambiente (recomendado) ou
de uma seção similar nomeada [production]
.
Você pode gerar uma secret key mais segura se quiser usando:
$ python -c "print(__import__('secrets').token_hex(32))"
b9483cc8a0bad1c2fe31e6d9d6a36c4a96ac23859a264b69a0badb4b32c538f8
# OU no Linux
$ openssl rand -hex 32
b9483cc8a0bad1c2fe31e6d9d6a36c4a96ac23859a264b69a0badb4b32c538f8
Garantindo que os settings sempre tenham uma SECRET_KEY
Como a secret key será de extrema importância para o funcionamento da API precisamos garantir que esta chave de configuração esteja sempre presente antes do sistema inicializar.
EDITE dundie/config.py
# No topo faça o import de `Validator`
from dynaconf import Dynaconf, Validator
# No final adicione a validação
settings.validators.register( # pyright: ignore
Validator("security.SECRET_KEY", must_exist=True, is_type_of=str),
)
settings.validators.validate() # pyright: ignore
A partir de agora caso a SECRET_KEY não esteja disponível a aplicação não irá inicializar.
Agora sim podemos adicionar o código para geração de hash do password -->
Criando um hash
Para fazer com que o password dos usuários seja salvo como um hash ao invés de plain-text vamos criar uma função para criar o hash e outra para verificar.
Além disso vamos criar uma classe herdando de str
e customizar o método validate
desta forma podemos usar esta classe na definição de campo do nosso model User
e
o pydantic vai chamar o método validate
para transformar o valor do campo em um hash.
EDITE Agora vamos o dundie/security.py
e adicione alguns elementos
"""Security utilities"""
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password) -> bool:
"""Verifies a hash against a password"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password) -> str:
"""Generates a hash from plain text"""
return pwd_context.hash(password)
class HashedPassword(str):
"""Takes a plain text password and hashes it.
use this as a field in your SQLModel
class User(SQLModel, table=True):
username: str
password: HashedPassword
"""
@classmethod
def __get_validators__(cls):
# one or more validators may be yielded which will be called in the
# order to validate the input, each validator will receive as an input
# the value returned from the previous validator
yield cls.validate
@classmethod
def validate(cls, v):
"""Accepts a plain text password and returns a hashed password."""
if not isinstance(v, str):
raise TypeError("string required")
hashed_password = get_password_hash(v)
# you could also return a string here which would mean model.password
# would be a string, pydantic won't care but you could end up with some
# confusion since the value's type won't match the type annotation
# exactly
return cls(hashed_password)
EDITE agora o arquivo dundie/models/user.py
No topo na linha 4
from dundie.security import HashedPassword
E no model mudamos o campo password
na linha 18 para
password: HashedPassword
E no final de dundie/models/user.py
uma função para gerar os usernames, transformando nomes completos como Bruno Rocha em um slug como bruno-rocha
def generate_username(name: str) -> str:
"""Generates a slug username from a name"""
return name.lower().replace(" ", "-")
Agora sim está tudo pronto para adicionarmos ao nosso CLI um comando para criar novos usuários -->
Comando para criar usuários
EDITE dundie/cli.py
e adicione no topo do arquivo:
from dundie.models.user import generate_username
e no final adicione um novo comando:
@main.command()
def create_user(
name: str,
email: str,
password: str,
dept: str,
username: str | None = None,
currency: str = "USD",
):
"""Create user"""
with Session(engine) as session:
user = User(
name=name,
email=email,
password=password, # pyright: ignore
dept=dept,
username=username or generate_username(name),
currency=currency,
)
session.add(user)
session.commit()
session.refresh(user)
typer.echo(f"created {user.username} user")
return user
A função create_user
será exposta na CLI como o subcomando create-user
, ou seja, _
será substituido por -
então no terminal execute para ver a mensagem de ajuda dundie create-user --help
:
$ docker compose exec api dundie create-user --help
Usage: dundie create-user [OPTIONS] NAME EMAIL PASSWORD DEPT
Create user
╭─ Arguments ──────────────────────────────────────╮
│ * name TEXT [default: None] │
│ [required] │
│ * email TEXT [default: None] │
│ [required] │
│ * password TEXT [default: None] │
│ [required] │
│ * dept TEXT [default: None] │
│ [required] │
╰──────────────────────────────────────────────────╯
╭─ Options ────────────────────────────────────────╮
│ --username TEXT [default: None] │
│ --currency TEXT [default: USD] │
│ --help Show this message and │
│ exit. │
╰──────────────────────────────────────────────────╯
E então execute o comando para criar o usuário para o gerente Michael Scott
No terminal quando uma linha fica muito longa podemos adicionar
uma quebra de linha com \
e o terminal vai entender que é uma
única linha.
E no caso de argumentos com espaço como o nome "Michael Scott" precisamos usar aspas para o terminal entender que é um único argumento.
Crie o usuário:
$ docker compose exec api dundie create-user \
"Michael Scott" mscott@dm.com boss123 management
created michael-scott user
E para listar o usuário criado:
$ docker compose exec api dundie user-list
dundie users
┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ name ┃ username ┃ dept ┃ email ┃ currency ┃
┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Michael Scott │ michael-scott │ management │ mscott@dm.com │ USD │
└───────────────┴───────────────┴────────────┴───────────────┴──────────┘
Agora podemos finalmente começar a criar a nossa API -->
Definindo Serializers
Agora vamos criar endpoints na API para efetuar as operações que fizemos através da CLI, teremos as seguintes rotas:
GET /user/
- Lista todos os usuáriosPOST /user/
- Cadastro de novo usuárioGET /user/{username}/
- Detalhe de um usuário
Serializers
A primeira coisa que precisamos é definir serializers, que são models intermediários usados para serializar e de-serializar dados de entrada e saída da API e eles são necessários pois não queremos export o model do banco de dados diretamente na API e também queremos a possibilidade de serializar campos opcionais dependendo do nível de acesso do usuário, por exemplo, admins poderão ver mais campos que usuários regulares.
EDITE dundie/models/user.py
No topo na linha 5
from pydantic import BaseModel, root_validator
No final após a linha 20
class UserResponse(BaseModel):
"""Serializer for User Response"""
name: str
username: str
dept: str
avatar: Optional[str] = None
bio: Optional[str] = None
currency: str
class UserRequest(BaseModel):
"""Serializer for User request payload"""
name: str
email: str
dept: str
password: str
currency: str = "USD"
username: Optional[str] = None
avatar: Optional[str] = None
bio: Optional[str] = None
@root_validator(pre=True)
def generate_username_if_not_set(cls, values):
"""Generates username if not set"""
if values.get("username") is None:
values["username"] = generate_username(values["name"])
return values
Podemos testar os serializers em nosso shell só para ter certeza do funcionamento correto.
$ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User']
In [1]: from dundie.models.user import UserRequest
In [2]: new = UserRequest(
...: name="Bruno Rocha",
...: email="bruno@dm.com",
...: dept="Sales",
...: password="1234",
...: )
In [3]: new.username
Out[3]: 'bruno-rocha'
In [4]: new.currency
Out[4]: 'USD'
In [5]: db_user = User.from_orm(new)
In [6]: session.add(db_user)
In [7]: session.commit()
In [12]: session.exec(select(User).where(User.username=="bruno-rocha")).first()
Out[12]: User(bio=None, email='bruno@dm.com', username='bruno-rocha', name='Bruno Rocha', currency=
'USD', id=2, avatar=None, password='$2b$12$v/1h3sKAFCOuiKuXsThAXOBuny46TPYzKyoaBVisCFHlwaxPlKWpu',
dept='Sales')
Como pode ver acima podemos criar usuários via API e serializar usando o UserRequest
e só a partir dele criar a instancia de User
que iremos salvar no banco de dados.
E da mesma forma podemos fazer o caminho inverso, serializando do banco de dados para a API em JSON.
In [19]: bruno = session.exec(select(User).where(User.username=="bruno-rocha")).first()
In [20]: from dundie.models.user import UserResponse
In [21]: UserResponse.parse_obj(bruno).json()
Out[21]: '{"name": "Bruno Rocha", "username": "bruno-rocha", "dept": "Sales", "avatar": null, "bio"
: null, "currency": "USD"}'
Injeção de dependência
O FastAPI tem um excelente sistema de injeção de dependêcias, isto é util por exemplo para quando quisermos que um endpoint seja acessivel apenas por usuários administradores, ao invés de colocarmos lógica para verificar o nivel de permissão do usuário diretamente em cada view, podemos criar uma dependência.
Outro exemplo (que vamos aplicar agora) é precisarmos garantir que cada endpoint, ao ter sua função view invocada, uma sessão de conexão com o banco de dados já esteja disponível.
Vamos criar uma dependência chamada ActiveSession
que posteriormente será usada
na definição das views.
No topo de dundie/db.py
nas linhas 2 e 3
from fastapi import Depends
from sqlmodel import Session, create_engine
No final de dundie/db.py
após a linha 13
def get_session():
with Session(engine) as session:
yield session
ActiveSession = Depends(get_session)
O objeto que ActiveSession
é uma dependência para rotas do FastAPI
quando usarmos este objeto como parâmetro de uma view o FastAPI
vai executar de forma lazy este objeto e passar o retorno da função
atrelada a ele como argumento da nossa view.
Neste caso teremos sempre uma conexão com o banco de dados dentro de cada
view que marcarmos com session: Session = ActiveSession
.
Veremos a seguir como usar esta dependência -->
Criando as Views
Agora criaremos as views (funções) para expor os serializers com os usuários que acessaremos usando a sessão de conexão ao banco de dados.
edite dundie/routes/user.py
from typing import List
from fastapi import APIRouter
from fastapi.exceptions import HTTPException
from sqlmodel import Session, select
from dundie.db import ActiveSession
from dundie.models.user import User, UserRequest, UserResponse
router = APIRouter()
@router.get("/", response_model=List[UserResponse])
async def list_users(*, session: Session = ActiveSession):
"""List all users."""
users = session.exec(select(User)).all()
return users
@router.get("/{username}/", response_model=UserResponse)
async def get_user_by_username(
*, session: Session = ActiveSession, username: str
):
"""Get user by username"""
query = select(User).where(User.username == username)
user = session.exec(query).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(*, session: Session = ActiveSession, user: UserRequest):
"""Creates new user"""
db_user = User.from_orm(user) # transform UserRequest in User
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
Acima criamos 3 views, uma para listar todos os usuários, uma para listar apenas um usuário e outra para criar um novo usuário.
repare que a rota de criação de um novo usuário temos 2 detalhes diferentes: 1) estamos usando o método post
e 2) estamos retornando o status code 201
(criado) em caso de sucesso.
Em todas as views estamos usando os serializers e a injeção de dependência.
Apesar de termos criados as funções, o FastAPI ainda não sabe disso, o próximo passo será fazer o roteamento de URL -->
Roteamento de URL
Com as funções já prontas agora podemos fazer o roteamento, que é o processo
de mapear uma URL como /user/
a uma função como a user_list
que criarmos,
ou seja, sempre que o servidor receber um request na URL /user/
irá executar
a função mapeada passando todo o contexto HTTP e o FastAPI se encarrega de fazer
a injeção das dependências.
Vamos começar criando um router principal que irá agrerar todas as rotas:
EDITE dundie/routes/__init__.py
from fastapi import APIRouter
from .user import router as user_router
main_router = APIRouter()
main_router.include_router(user_router, prefix="/user", tags=["user"])
E agora EDITE dundie/app.py
NO topo na linha 4
from .routes import main_router
Logo depois de app = FastAPI(...)
após a linha 11
app.include_router(main_router)
E agora sim pode acessar a API e verá as novas rotas prontas para serem usadas, http://0.0.0.0:8000/docs/
Pode tentar pela web interface ou com um http client puro:
# rest_nvim
GET http://localhost:8000/user/
#+END
HTTP/1.1 200 OK
date: Fri, 23 Dec 2022 18:04:23 GMT
server: uvicorn
content-length: 220
content-type: application/json
#+RESPONSE
[
{
"name": "Michael Scott",
"username": "michael-scott",
"dept": "management",
"avatar": null,
"bio": null,
"currency": "USD"
},
{
"name": "Bruno Rocha",
"username": "bruno-rocha",
"dept": "Sales",
"avatar": null,
"bio": null,
"currency": "USD"
}
]
#+END
Ou diretamente via cURL
curl -X 'GET' -k 'http://localhost:8000/user/'
Ou criar um usuário
curl -X 'POST' -H 'Content-Type: application/json' \
--data-raw '{"email": "pam@dm.com", "dept": "Accounting", "password": "jimjim", "name": "Pam Besly"}' \
-k 'http://localhost:8000/user/'
Pegar um usuário pelo username
curl -X 'GET' -k 'http://localhost:8000/user/michael-scott/'
{
"name": "Michael Scott",
"username": "michael-scott",
"dept": "management",
"avatar": null,
"bio": null,
"currency": "USD"
}
Listar todos
curl -X 'GET' \
'http://0.0.0.0:8000/user/' \
-H 'accept: application/json'
[
{
"name": "Michael Scott",
"username": "michael-scott",
"dept": "management",
"avatar": null,
"bio": null,
"currency": "USD"
},
{
"name": "Bruno Rocha",
"username": "bruno-rocha",
"dept": "Sales",
"avatar": null,
"bio": null,
"currency": "USD"
},
{
"name": "Dwight Schrute",
"username": "dwight-schrute",
"dept": "Sales",
"avatar": null,
"bio": null,
"currency": "USD"
},
{
"name": "Pam Besly",
"username": "pam-besly",
"dept": "Accounting",
"avatar": null,
"bio": null,
"currency": "USD"
},
{
"name": "Jim Halpert",
"username": "jim-halpert",
"dept": "Sales",
"avatar": null,
"bio": null,
"currency": "USD"
}
]
Só tem um pequeno problema: Qualquer um consegue criar usuários em nossa API sem estar autenticado e isso não é desejável, vamos resolver este problema implementando autenticação -->
Gerando tokens
Agora que já podemos criar usuários é importante conseguirmos autenticar os usuários pois desta forma podemos limitar o acesso a alguns endpoints.
Esse será arquivo com a maior quantidade de código boilerplate.
EDITE o arquivo dundie/auth.py
vamos criar as classes e funções necessárias
para a implementação de JWT que é a autenticação baseada em token e vamos
usar o algoritmo selecionado no arquivo de configuração.
Neste arquivo vamos criar os seguintes objetos:
- Um esquema de autenticação baseado em oauth, este objeto é usado pelo FastAPI para exibir um formulário de login e outros controles de autenticação na página /docs.
- Classes para serializar os 3 tipos de token que criaremos (token, refresh_token, reset_password_token)
- Fução que cria um token usando o algoritmo especificado
- Função que recebe o token e valida sua autenticidade
- Funções para retornar o objeto
User
sempre que precisarmos saber qual usuário está autenticado - Dependência para injetarmos em todas as funções que necessitem de autenticação
dundie/auth.py
"""Token based auth"""
from datetime import datetime, timedelta
from typing import Callable, Optional, Union
from functools import partial
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from sqlmodel import Session, select
from dundie.config import settings
from dundie.db import engine
from dundie.models.user import User
from dundie.security import verify_password
SECRET_KEY = settings.security.secret_key # pyright: ignore
ALGORITHM = settings.security.algorithm # pyright: ignore
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Models
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
class RefreshToken(BaseModel):
refresh_token: str
class TokenData(BaseModel):
username: Optional[str] = None
# Functions
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None,
scope: str = "access_token",
) -> str:
"""Creates a JWT Token from user data
scope: access_token or refresh_token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "scope": scope})
encoded_jwt = jwt.encode(
to_encode,
SECRET_KEY, # pyright: ignore
algorithm=ALGORITHM, # pyright: ignore
)
return encoded_jwt
create_refresh_token = partial(create_access_token, scope="refresh_token")
def authenticate_user(
get_user: Callable, username: str, password: str
) -> Union[User, bool]:
"""Authenticate the user"""
user = get_user(username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def get_user(username) -> Optional[User]:
"""Get user from database"""
query = select(User).where(User.username == username)
with Session(engine) as session:
return session.exec(query).first()
def get_current_user(
token: str = Depends(oauth2_scheme),
request: Request = None, # pyright: ignore
fresh=False
) -> User:
"""Get current user authenticated"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if request:
if authorization := request.headers.get("authorization"):
try:
token = authorization.split(" ")[1]
except IndexError:
raise credentials_exception
try:
payload = jwt.decode(
token,
SECRET_KEY, # pyright: ignore
algorithms=[ALGORITHM] # pyright: ignore
)
username: str = payload.get("sub") # pyright: ignore
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
if fresh and (not payload["fresh"] and not user.superuser):
raise credentials_exception
return user
# FastAPI dependencies
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""Wraps the sync get_active_user for sync calls"""
return current_user
AuthenticatedUser = Depends(get_current_active_user)
async def validate_token(token: str = Depends(oauth2_scheme)) -> User:
"""Validates user token"""
user = get_current_user(token=token)
return user
O objeto AuthenticatedUser
é uma dependência do FastAPI e é
através dele que iremos garantir que nossas rotas estejas protegidas
com token.
Agora só falta registrarmos as URLs responsáveis por gerar a validar o token -->
Criando Endpoints de Auth
Agora precisamos mapear a função de geração e validação de tokens na API, expondo a
URL /token
para que possamos gerar um token para um usuário.
EDITE dundie/routes/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from dundie.auth import (
RefreshToken,
Token,
User,
authenticate_user,
create_access_token,
create_refresh_token,
get_user,
validate_token,
)
from dundie.config import settings
ACCESS_TOKEN_EXPIRE_MINUTES = settings.security.access_token_expire_minutes # pyright: ignore
REFRESH_TOKEN_EXPIRE_MINUTES = settings.security.refresh_token_expire_minutes # pyright: ignore
router = APIRouter()
@router.post("/token", response_model=Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
):
user = authenticate_user(get_user, form_data.username, form_data.password)
if not user or not isinstance(user, User):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # pyright: ignore
access_token = create_access_token(
data={"sub": user.username, "fresh": True},
expires_delta=access_token_expires,
)
refresh_token_expires = timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES) # pyright: ignore
refresh_token = create_refresh_token(
data={"sub": user.username}, expires_delta=refresh_token_expires
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
@router.post("/refresh_token", response_model=Token)
async def refresh_token(form_data: RefreshToken):
user = await validate_token(token=form_data.refresh_token)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # pyright: ignore
access_token = create_access_token(
data={"sub": user.username, "fresh": False},
expires_delta=access_token_expires,
)
refresh_token_expires = timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES) # pyright: ignore
refresh_token = create_refresh_token(
data={"sub": user.username}, expires_delta=refresh_token_expires
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
E agora vamos adicionar essas URLS ao router principal
EDITE dundie/routes/__init__.py
No topo na linha 3
from .auth import router as auth_router
E depois na linha 9
main_router.include_router(auth_router, tags=["auth"])
Vamos testar a aquisição de um token via curl. (ou pode utilizar algum outro cliente HTTP de sua preferencia como o Imsomnia ou Postman)
curl -X 'POST' \
'http://localhost:8000/token' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=michael-scott&password=boss123'
passe como parametro os dados de um usuário que tenha criado via linha de comando.
Resposta:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImZyZXNoIjp0cnVlLCJleHAiOjE2Njg2Mjg0NjgsInNjb3BlIjoiYWNjZXNzX3Rva2VuIn0.P-F3onD2vFFIld_ls1irE9rOgLNk17SNDASls31lgkU",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTY2ODY2MjY2OCwic2NvcGUiOiJyZWZyZXNoX3Rva2VuIn0.AWV8QtySYmcukxTgTa9GedLK00o6wrbyMt9opW42eyQ",
"token_type": "bearer"
}
agora sim já conseguimos gerar o token, só falta proteger as URLs -->
Protegendo rotas
Apenas super usuários terão permissão para criar novos usuários, portanto
vamos proteger a view POST /user/
com autenticação via TOKEN
EDITE dundie/auth.py
e adicione no final uma dependencia para
garantir que o usuário autenticado é super usuário.
async def get_current_super_user(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not a super user"
)
return current_user
SuperUser = Depends(get_current_super_user)
Agora vamos usar essa dependencia para garantir o super usuário em nossa rota
EDITE dundie/routes/user.py
No topo próximo a linha 9
from dundie.auth import SuperUser
E no roteamento da view create_user
como parametro para o decorator .post
passamos uma lista de
dependencias que sejam satisfeitas pelo FAstAPI antes de executar o código da view, ou seja, o código só será executado caso o usuário autenticado via token seja um superusuário.
@router.post("/", response_model=UserResponse, status_code=201, dependencies=[SuperUser])
async def create_user(*, session: Session = ActiveSession, user: UserRequest):
...
Como adicionamos dependencies=[SuperUser]
no roteamento e isso é o suficiente para o FastAPI detectar que existe pelo menos uma URL que necessita de autenticação e agora já teremos os controles de autenticação na API.
Ao tentar criar um usuário sem autenticar teremos agora um erro HTTP_401_UNAUTHORIZED
e se o usuário autenticado não for um superuser termos o erro HTTP_403_FORBIDDEN
Os requests vão precisar do token, portanto o usuário primeiro precisa pedir um token na URL /token
e depois usar este token na requisição protegida
curl -X 'POST' \
'http://localhost:8000/user/?fresh=false' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJtaWNoYWVsLXNjb3R0IiwiZnJlc2giOnRydWUsImV4cCI6MTY3MTgyOTc2NCwic2NvcGUiOiJhY2Nlc3NfdG9rZW4ifQ.wdIUyJS9TX2Ku8BMI_AIJhAXQb-TSHmX11qKs5C4PF0' \
-H 'Content-Type: application/json' \
-d '{
"name": "Kevin Malone",
"email": "kevin@dm.com",
"dept": "Sales",
"password": "stacy"
}'
Erros HTTP
Quando as coisas dão errado precisamos informar o cliente HTTP com o código de status correto, erros podem acontecer quando ocorre uma falha inesperada no servidor, quando o recurdo solicitado não existe ou quando o cliente efetua uma requisição inválida.
Vamos tentar duplicar a criação de um usuário fazendo novamente a mesma chamada POST e a mensagem que receberemos é:
Faça essa chamada mais de 1x:
curl -X 'POST' -H 'Content-Type: application/json' \
--data-raw '{"email": "pam@dm.com", "dept": "Accounting", "password": "jimjim", "name": "Pam Besly"}' \
-k 'http://localhost:8000/user/'
HTTP/1.1 500 Internal Server Error
Internal Server Error
A mensagem de erro não ajuda muito a sabermos o que ocorreu de fato e portanto podemos curtomizar este comportamento.
Quando temos este caso expecifico o código de erro correto é o 409 Conflict
que innforma que o estado interno está em conflito com o estado que está sendo enviado no request, ou seja, estamos tentando criar um usuário que já existe.
Para customizar este comportamento podemos editar o arquivo routes/user.py
# No topo
from sqlalchemy.exc import IntegrityError
# Na função `create_user`
async def create_user(.......):
...
try:
session.commit()
except IntegrityError:
raise HTTPException(status_code=409, detail="User already exists")
A exception IntegrityError
será levantada para qualquer problemas encontrado no banco de dados portanto não é ainda a melhor opção, precisamos ser mais especificos para ajudar quem está usando a API, portanto vamos fazer as seguintes modificações:
- Continuar tratando a IntegrityError porém com o código 500 e mensagem de erro genérica.
- Adicionar um guard para garantir que o usuário a ser criado não existe.
@router.post(
"/", response_model=UserResponse, status_code=201, dependencies=[SuperUser]
)
async def create_user(*, session: Session = ActiveSession, user: UserRequest):
"""Creates new user"""
if session.exec(select(User).where(User.username == user.username)).first():
raise HTTPException(status_code=409, detail="Username already taken")
db_user = User.from_orm(user) # transform UserRequest in User
session.add(db_user)
try:
session.commit()
except IntegrityError:
raise HTTPException(status_code=500, detail="Database IntegrityError")
session.refresh(db_user)
return db_user
E agora sim teremos o retorno esperado
HTTP/1.1 409 Conflict
Username already taken
E no caso de um outro erro de integridade ai invés de mostrar apenas o erro 500 genérico informamos especificamente que se trata de um problema no banco de dados, porém sem expor o erro diretamente.
Uma boa prática seria colocar um logger ou um analisador de exceptions como o NewRelic ou o Sentry, faremos isso em outra parte do treinamento.
Vamos agora continuar implementando as rotas de usuário -->
Update User
Agora vamos adicionar uma rota para que o usuário possa alterar o próprio perfil.
O usuário será capaz de mudar apenas os campos bio
e avatar
bio será um texto e avatar a URL de uma imagem, e é claro, o usuário só
poderá alterar o seu próprio perfil.
Vamos começar criando o serializer que irá receber essas informações a serem alteradas:
EDITE models/user.py
e adicione:
class UserProfilePatchRequest(BaseModel):
avatar: Optional[str] = None
bio: Optional[str] = None
E agora EDITE routes/user.py
e adicione ao final.
@router.patch("/{username}/", response_model=UserResponse)
async def update_user(
*,
session: Session = ActiveSession,
patch_data: UserProfilePatchRequest,
current_user: User = AuthenticatedUser,
username: str
):
user = session.exec(select(User).where(User.username == username)).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id != current_user.id and not current_user.superuser:
raise HTTPException(status_code=403, detail="You can only update your own profile")
# Update
user.avatar = patch_data.avatar
user.bio = patch_data.bio
session.add(user)
session.commit()
session.refresh(user)
return user
Agora podemos testar a rota fazendo a alteração do perfil do usuário michael-scott, lembre-se que primeiro
será necessário obter um token válido com uma chamada na rota /token
curl -X 'PATCH' \
-H 'Authorization: Bearer ...' \
-H 'Content-Type: application/json'
--data-raw '{"avatar": "https://test.com/MichaelScott.png", "bio": "I am the boss"}' \
-k 'http://localhost:8000/user/michael-scott/'
O usuário também precisará alterar a senha caso ele esqueça, vamos implementar esta funcionalidade -->
Change Password
O endpoint de alteração de senha precisa ficar separado do perfil pois este endpoint precisa de alguns detalhes extras:
- O usuário precisa preencher a senha e a confirmação
- A mudança pode ser feita pelo próprio usuário, pelo superuser ou através de um token requisitado por email (funcionalidade de esqueci a senha)
Começamos adicionando o serializer para receber o request da alteração do password.
EDITE models/user.py
# No topo
from fastapi import HTTPException, status
from dundie.security import get_password_hash
...
# No final
class UserPasswordPatchRequest(BaseModel):
password: str
password_confirm: str
@root_validator(pre=True)
def check_passwords_match(cls, values):
"""Checks if passwords match"""
if values.get("password") != values.get("password_confirm"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Passwords do not match"
)
return values
@property
def hashed_password(self) -> str:
"""Returns hashed password"""
return get_password_hash(self.password)
Para editar o password do usuário teremos as seguintes condições
# O usuário pode editar o próprio password
current_user == user
# O usuário pode editar o password de outro usuário se for superuser
current_user.supersuser is True
# ou se o usuário tiver um token válido
Query("?pwd_reset_token") is valid
Vamos implementar a lógica acima como uma dependencia do FastAPI, usaremos esta dependencia na rota de alteração de senha e a dependência irá garantir que pelo menos um dos requisitos seja satisfeito.
EDITE dundie/auth.py
e no final:
async def get_user_if_change_password_is_allowed(
*,
request: Request,
pwd_reset_token: Optional[str] = None, # from path?pwd_reset_token=xxxx
username: str, # from /path/{username}
) -> User:
"""Returns User if one of the conditions is met.
1. There is a pwd_reset_token passed as query parameter and it is valid OR
2. authenticated_user is supersuser OR
3. authenticated_user is User
"""
target_user = get_user(username) # The user we want to change the password
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
try:
valid_pwd_reset_token = get_current_user(token=pwd_reset_token or "") == target_user
except HTTPException:
valid_pwd_reset_token = False
try:
authenticated_user = get_current_user(token="", request=request)
except HTTPException:
authenticated_user = None
if any(
[
valid_pwd_reset_token,
authenticated_user and authenticated_user.superuser,
authenticated_user and authenticated_user.id == target_user.id,
]
):
return target_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not allowed to change this user's password",
)
CanChangeUserPassword = Depends(get_user_if_change_password_is_allowed)
Agora temos CanChangeUserPassword
como dependencia para usar em uma rota do FastAPI
isso vai garantir que a URL só será executada se todas as condições da dependencia foram
resolvidas.
E agora em routes/user.py
vamos criar uma rota com o método POST
O ideal para seguir a semantica REST seria criar este método como PATCH porém formulários HTML permitem apenas GET e POST e para facilitar o trabalho do front-end vamos usar POST.
@router.post("/{username}/password/", response_model=UserResponse)
async def change_password(
*,
session: Session = ActiveSession,
patch_data: UserPasswordPatchRequest,
user: User = CanChangeUserPassword
):
user.password = patch_data.hashed_password # pyright: ignore
session.add(user)
session.commit()
session.refresh(user)
return user
Agora podemos testar a rota de alteração de senha, autenticando com o token válido para o usuário michael-scott por exemplo:
curl -X 'POST' \
-H 'Authorization: Bearer ...' \
-H 'Content-Type: application/json'
--data-raw '{"password": "boss1234", "password_confirm": "boss1234"}' \
-k 'http://localhost:8000/user/michael-scott/password/'
O usuário michael-scott
sendo um superuser, também tem permissão para alterar senha de outros usuários,
Agora imagine que um usuário esqueceu a própria senha, mas ao invés de pedir para o gerente ele quer ele mesmo alterar a senha, para isso vamos criar um endpoint para enviar um email com um token válido para alterar a senha e acessar o mesmo endpoint. -->
Esqueci minha senha
O próximo passo para completar a gestão de usuários é criarmos uma URL onde o usuário irá informar o seu email
e o sistema vai verificar se existe um usuário com este e-mail cadastrado e então enviar um e-mail com o token para permitir a alteração de senha.
Nós já temos uma função que é capaz de gerar um token em dundie/auth.py
chamada create_access_token
E vamos usar esta função para gerar o token de alteração de senha.
O fluxo será o seguinte:
- Usuário requisita um token de senha em :
POST /user/pwd_reset_token/
{
"email": "michael-scott@dm.com"
}
Response: 200 Ok
"Email will be sent if user is registered"
- A view roteada em
/user/pwd_reset_token
vai fazer o seguinte:
- Invocar a função:
try_to_send_pwd_reset_email(email)
.
-
A função
try_to_send_pwd_reset_email
irá fazer o seguinte: -
Procurar o usuário pelo e-mail
-
Criar um token com expiração curta (o tempo de expiração será definido nos settings)
-
Renderizar um template com o link para redefinir senha
-
Enviar o e-mail
Enviando Email
Vamos começar criando uma função que irá receber alguns parametros e enviar um e-mail, teremos uma versão da função que de fato envia um e-mail via HTTP, e teremos outra que apenas escreve a mensagem em um arquivo de log simulando o envio de e-mail que será útil para testes.
EDITE dundie/tasks/user.py
import smtplib
from datetime import timedelta
from time import sleep
from sqlmodel import Session, select
from dundie.auth import create_access_token
from dundie.config import settings
from dundie.db import engine
from dundie.models.user import User
def send_email(email: str, message: str):
if settings.email.debug_mode is True: # pyright: ignore
_send_email_debug(email, message)
else:
_send_email_smtp(email, message)
def _send_email_debug(email: str, message: str):
"""Mock email sending by printing to a file"""
with open("email.log", "a") as f:
sleep(3) # pretend it takes 3 seconds
f.write(f"--- START EMAIL {email} ---\n" f"{message}\n" "--- END OF EMAIL ---\n")
def _send_email_smtp(email: str, message: str):
"""Connect to SMTP server and send email"""
with smtplib.SMTP_SSL(
settings.email.smtp_server, settings.email.smtp_port # pyright: ignore # pyright: ignore
) as server:
server.login(settings.email.smtp_user, settings.email.smtp_password) # pyright: ignore
server.sendmail(
settings.email.smtp_sender, # pyright: ignore
email,
message.encode("utf8"),
)
MESSAGE = """\
From: Dundie <{sender}>
To: {to}
Subject: Password reset for Dundie
Please use the following link to reset your password:
{url}?pwd_reset_token={pwd_reset_token}
This link will expire in {expire} minutes.
"""
def try_to_send_pwd_reset_email(email):
"""Given an email address sends email if user is found"""
with Session(engine) as session:
user = session.exec(select(User).where(User.email == email)).first()
if not user:
return
sender = settings.email.smtp_sender # pyright: ignore
url = settings.security.PWD_RESET_URL # pyright: ignore
expire = settings.security.RESET_TOKEN_EXPIRE_MINUTES # pyright: ignore
pwd_reset_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=expire), # pyright: ignore
scope="pwd_reset",
)
send_email(
email=user.email,
message=MESSAGE.format(
sender=sender,
to=user.email,
url=url,
pwd_reset_token=pwd_reset_token,
expire=expire,
),
)
O próximo passo é editar o arquivo dundie/default.toml
e adicionar os settings necessários para o serviço de emails.
[default.security]
...
RESET_TOKEN_EXPIRE_MINUTES = 10
PWD_RESET_URL = "https://dm.com/reset_password"
[default.email]
debug_mode = true
smtp_sender = "no-reply@dm.com"
smtp_server = "localhost"
smtp_port = 1025
smtp_user = "<replace in .secrets.toml>"
smtp_password = "<replace in .secrets.toml>"
Agora podemos abrir um terminal e testar essas funções
$ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User']
In [1]: from dundie.tasks.user import try_to_send_pwd_reset_email
In [2]: try_to_send_pwd_reset_email("mscott@dm.com") # wait 3 seconds
In [3]: open("email.log").readlines()
Out[3]:
['--- START EMAIL mscott@dm.com ---\n',
'From: Dundie <no-reply@dm.com>\n',
'To: mscott@dm.com\n',
'Subject: Password reset for Dundie\n',
'\n',
'Please use the following link to reset your password:\n',
'https://dm.com/reset_password?pwd_reset_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJtaWNoYWVsLXNjb3R0IiwiZXhwIjoxNjcyNjc3OTk1LCJzY29wZSI6InB3ZF9yZXNldCJ9.nAZNxHYniofTSCzBh38gPi5Qd0FoKONw1Ge6Yp40l5s\n',
'\n',
'This link will expire in 10 minutes.\n',
'\n',
'--- END OF EMAIL ---\n']
Cada e-mail enviado será adicionado ao arquivo email.log enquanto settings.email.debug_mode
estiver ativado, futuramente podemos colocar os dados de um servidor smtp de verdade.
Agora a parte principal é criar uma rota que permitirá ao usuário solicitar o token de alteração de senha e disparar a task em background para o envio do e-mail.
EDITE dundie/routes/user.py
e no final vamos adicionar.
# import
from dundie.tasks.user import try_to_send_pwd_reset_email
# view
@router.post("/pwd_reset_token/")
async def send_password_reset_token(*, email: str = Body(embed=True)):
"""Sends an email with the token to reset password."""
try_to_send_pwd_reset_email(email)
return {
"message": "If we found a user with that email, we sent a password reset token to it."
}
Neste endpoint estamos recebendo email
apenas no corpo do request, ao invés de criarmos um
serializer apenas para armazenar esta informação podemos usar o serializer genérico Body
que
permite receber o valor de um campo diretamente no corpo do request.
No caso de um endereço de em-mail pode ser interessante criar um serializer para efetuar a verificando que o e-mail é valido, mas isso fica como melhoria para depois.
Testando:
curl -X 'POST' -H 'Content-Type: application/json' \
--data-raw '{"email": "mscott@dm.com"}' -k \
'http://localhost:8000/user/pwd_reset_token/'
POST http://localhost:8000/user/pwd_reset_token/
#+END
HTTP/1.1 200 OK
date: Mon, 02 Jan 2023 16:42:56 GMT
server: uvicorn
content-length: 87
content-type: application/json
#+RESPONSE
{
"message": "If we found a user with that email, we sent a password reset token to it."
}
#+END
Você pode agora verificar o conteúdo do arquivo email.log
para ver se a mensagem foi realmente enviada.
$ cat email.log
...
No arquivo dundie/tasks/user.py
estamos criando uma string MESSAGE
para usar como template para o e-mail enviado,
mas seria ideal salvar essa string em um arquivo separado, por exemplo pwd_reset_email_template.jinja
e então usar o jinja2
para renderizar o template, lembre-se que usamos o Jinja2 no Day1 do treinamento.
Consegue fazer esta alteração?
Por questões de privacidade nós não podemos confirmar se a operação deu certo, o usuário terá que verificar na caixa de e-mail que em nosso caso é o arquivo de log.
Mas repare que ao chamar a URL precisamos esperar 3 segundos pela resposta, o ideal é que o request seja imediato e a função
taks.try_to_send_pwd_reset_email
seja executada em background. -->
Enviando emails assíncronos
Ao chamar a URL /user/pwd_reset_token/
a resposta demorou 3 segundos pois estamos bloqueando o request até o e-mail ser enviado, o ideal é que isso seja feito em background, vamos transformar a chamada de try_to_send_pwd_reset_email
em uma task.
O FastAPI tem uma classe chamada BackgroundTasks
que nos permite adicionar funções que serão executadas em background, vamos importá-la e adicioná-la como um parâmetro da nossa rota.
As funções que adicionarmos a background_tasks serão adicionadas ao event-loop assíncrono gerenciado pelo FastAPI, desta forma não será necessário esperar o tempo de envio do e-mail, podemos retornar a resposta para o cliente e mesmo depois do request terminado a tarefa continuará executando do lado servidor.
EDITE dundie/routes/user.py
# No topo
from fastpi import BackgroundTasks
...
# Na função
@router.post("/pwd_reset_token/")
async def send_password_reset_token(
*,
email: str = Body(embed=True),
background_tasks: BackgroundTasks, # NEW
):
background_tasks.add_task(try_to_send_pwd_reset_email, email=email) # NEW
return {
"message": "If we found a user with that email, we sent a password reset token to it."
}
Pode testar repetindo a mesma chamada anterior (ou usando a UI em /docs/)
curl -X 'POST' -H 'Content-Type: application/json' \
--data-raw '{"email": "mscott@dm.com"}' -k \
'http://localhost:8000/user/pwd_reset_token/'
Repare que agora a resposta é instantânea e o e-mail é enviado em background.
Assim terminamos a API de gestão de usuários e Auth por enquanto
A tareda de envio de e-mail é bastante simples e não muito complexa em questões de resiliência a falhas, se o e-mail falhar o envio, o usuário simplesmente não vai receber e portanto irá tentar novamente.
Existem casos onde as tarefas precisam de um pouco mais de robustes e controles como retry e auditoria, neste caso usaremos um gerenciador de filas como Celery ou Python-RQ, mas isso veremos adiante.
Com estas rotas agora já podemos ter um front-end integrado para a gestão de usuários, mas o nosso próximo passo será cuidar da API de transações -->
Modelagem de dados
A modelagem dos dados será a seguinte (considerando que já concluimos a tabela de User), fica faltando implementar as outras 2 tabelas.
https://dbdesigner.page.link/GqDU95ApwZs7a9RH9
Portanto criaremos os models para Transaction
e Balance
EDITE dundie/models/transaction.py
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from dundie.models.user import User
class Transaction(SQLModel, table=True):
"""Represents the Transaction Model"""
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id", nullable=False)
from_id: int = Field(foreign_key="user.id", nullable=False)
value: int = Field(nullable=False)
date: datetime = Field(default_factory=datetime.utcnow, nullable=False)
# Populates a `.incomes` on `User`
user: Optional["User"] = Relationship(
back_populates="incomes",
sa_relationship_kwargs={"primaryjoin": 'Transaction.user_id == User.id'},
)
# Populates a `.expenses` on `User`
from_user: Optional["User"] = Relationship(
back_populates="expenses",
sa_relationship_kwargs={"primaryjoin": 'Transaction.from_id == User.id'},
)
class Balance(SQLModel, table=True):
"""Store the balance of a user account"""
user_id: int = Field(
foreign_key="user.id",
nullable=False,
primary_key=True,
unique=True,
)
value: int = Field(nullable=False)
updated_at: datetime = Field(
default_factory=datetime.utcnow,
nullable=False,
sa_column_kwargs={"onupdate": datetime.utcnow}
)
# Populates a `._balance` on `User`
user: Optional["User"] = Relationship(back_populates="_balance")
Como estamos includindo relacionamentos entre os models Transaction
, Balance
e User
precisamos implementar as respectivas mudanças no dundie/models/user.py
incomes
irá retornar uma query com todos os pontos que o usuário ganhouexpenses
irá retornar uma query com todos os pontos que o usuário gastou- E
balance
irá retornar o saldo atual do usuário, para balance estamos usando um relacionamento one-to-one, por isso precisamos usar olazy="dynamic"
e uma property do Python para garantir que caso o saldo seja inexistente sempre tenhamos um valor de retorno padrão.
from typing import TYPE_CHECKING, Optional
class User(...):
...
# Populates a `.user` on `Transaction`
incomes: Optional[list["Transaction"]] = Relationship(
back_populates="user",
sa_relationship_kwargs={"primaryjoin": 'User.id == Transaction.user_id'},
)
# Populates a `.from_user` on `Transaction`
expenses: Optional[list["Transaction"]] = Relationship(
back_populates="from_user",
sa_relationship_kwargs={"primaryjoin": 'User.id == Transaction.from_id'},
)
# Populates a `.user` on `Balance`
_balance: Optional["Balance"] = Relationship(
back_populates="user",
sa_relationship_kwargs={"lazy": "dynamic"}
)
@property
def balance(self) -> int:
"""Returns the current balance of the user"""
if (user_balance := self._balance.first()) is not None: # pyright: ignore
return user_balance.value
return 0
E por fim adicionamos o novo model ao contexto do dundie/models/__init__.py
from sqlmodel import SQLModel
from .user import User
from .transaction import Transaction, Balance
__all__ = ["User", "SQLModel", "Transaction", "Balance"]
Criando as migrations
Com os models criados pediamos ao alembic para criar o arquivo de migration com as mudanças que precisam ser aplicadas ao Postgresql.
$ docker compose exec api alembic revision --autogenerate -m "transaction"
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.autogenerate.compare] Detected added table 'balance'
INFO [alembic.autogenerate.compare] Detected added table 'transaction'
INFO [alembic.ddl.postgresql] Detected sequence named 'user_id_seq' as owned by integer column 'user(id)', assuming SERIAL and omitting
Generating /home/app/api/migrations/versions/8af1cd3be673_transaction.py ... done
E em sequencia aplicamos para criar as tabelas no banco de dados:
$ docker compose exec api alembic upgrade head
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade f39cbdb1efa7 -> b0abf3428204, transaction
Neste momento nossas tabelas transaction
e balance
já devem estar criadas.
Antares SQL
Agora vamos criar a lógica de negócios que cuidará da adição de novas transações -->
Transaction Business Logic
A operação de adicionar uma transação será feita com uma requisição POST
ao endpoint /transaction/{username}/
e o corpo da transaction será:
{
"value": "integer",
}
O usuário autenticado através de token será usado para popular o campo from_id
e o campo date
será
preenchido automaticamente.
NOTE Via
CLI
também será possivel adicionar transaction, e neste caso ofrom_id
será o useradmin
(que precisamos garantir a criação via migrations)
Fluxo de operação
graph TD A[Requisição POST] --> B["Obter usuários user_id e from_id"] B --> B1[[Chegagem de saldo e permissões]] B1 --> C["Procedimento permitido?"] C --> D{Sim} C --> E{Não} D --> F[Adicionar Transaction] F --> G[Atualizar saldo] G --> H[Dar commit na session] E --> I[Retornar com erro]
Atualizando Saldo
Vamos criar uma função com a lógica necessária para adicionar transaction e atualizar o saldo baseando-se nas regras anteriores.
EDITE dundie/tasks/transaction.py
from typing import Optional
from sqlmodel import Session
from dundie.db import engine
from dundie.models import User, Transaction, Balance
class TransactionError(Exception):
"""Can't add transaction"""
def add_transaction(
*,
user: User,
from_user: User,
value: int,
session: Optional[Session] = None
):
"""Adds a new transaction to the specified user.
params:
user: The user to add transaction to.
from_user: The user where amount is coming from or superuser
value: The value being added
"""
if not from_user.superuser and from_user.balance < value:
raise TransactionError("Insufficient balance")
session = session or Session(engine)
transaction = Transaction(user=user, from_user=from_user, value=value) # pyright: ignore
session.add(transaction)
session.commit()
session.refresh(user)
session.refresh(from_user)
for holder in (user, from_user):
total_income = sum([t.value for t in holder.incomes]) # pyright: ignore
total_expense = sum([t.value for t in holder.expenses]) # pyright: ignore
balance = session.get(
Balance, holder.id
) or Balance(user=holder, value=0) # pyright: ignore
balance.value = total_income - total_expense
session.add(balance)
session.commit()
A função add_transaction
recebe como parâmetros o user
que receberá a transação, o from_user
que é o usuário que está enviando a transação e o value
que é o valor da transação, esta função será executado tanto via CLI quanto via REST API.
Para que funcione via CLI precisamos garantir que o sistema sempre tenha um usuário admin
padrão para o sistema, vamos garantir a existência deste usuário -->
Data Migrations
Até agora usamos o alembic para criar migrations de estrutura (DDL) em operações de criação ou alteração de campos e tabelas.
Entretanto, em alguns casos precisamos criar dados para alimentar a aplicação, como por exemplo criar um usuário administrador para que possamos acessar a aplicação.
Sempre que precisar garantir a existência de dados alimentados em tabelas do sistema usaremos o conceito de Data Migrations.
Começamos criando uma migration
vazia para efetuarmos a operação de adição do usuário.
$ docker compose exec api alembic revision -m "ensure_admin_user"
Generating /home/app/api/migrations/versions/9aa820fb7f01_ensure_admin_user.py ... done
Repare que dessa vez não usamos --autogenerate
pois essa migration estará vazia, e neste
caso vamos manualmente escrever o código que desejamos que seja executado.
Edite o arquivo criado em migrations/versions/9aa820fb7f01_ensure_admin_user.py
OBS O arquivo criado ai no seu sistema pode ter um nome diferente, mas o conteúdo é o mesmo.
"""ensure_admin_user
Revision ID: 9aa820fb7f01
Revises: 6f4df3b5e155
Create Date: 2023-01-06 13:13:37.907183
"""
from alembic import op
import sqlalchemy as sa
import sqlmodel
from dundie.models.user import User # NEW
from sqlmodel import Session # NEW
# revision identifiers, used by Alembic.
revision = '9aa820fb7f01'
down_revision = '6f4df3b5e155'
branch_labels = None
depends_on = None
def upgrade() -> None: # NEW
bind = op.get_bind()
session = Session(bind=bind)
admin = User(
name="Admin",
username="admin",
email="admin@dm.com",
dept="management",
currency="USD",
password="admin", # pyright: ignore
)
# if admin user already exists it will raise IntegrityError
try:
session.add(admin)
session.commit()
except sa.exc.IntegrityError:
session.rollback()
def downgrade() -> None:
pass
A migration acima irá setar o password como admin
e é muito importante que você defina um password mais complexo ou que faça a alteração corretamente em ambientes de produção.
Uma dica é que tenha no arquivo de settings um campo para definir o password inicial de admin e então na migration ao invés
de passarmos admin
podemos ler de settings.DEFAULT_ADMIN_PASSWORD
, para essa variável existir ela precisa estar no arquivo default.toml
e então poderá ser sobrescrita usando variável de ambiente DUNDIE_DEFAULT_ADMIN_PASSWORD
.
Essa alteração eu vou deixar para você fazer, pode ser depois, no final deste tutorial todas as tarefas estarão listadas.
Agora salve o arquivo e aplique a migration.
$ docker compose exec api alembic upgrade head
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade 6f4df3b5e155 -> 9aa820fb7f01, ensure_admin_user
Agora toda vez que as migrations forem aplicadas, ação que será executada sempre que houver nova atualização ou deploy, garantimos que o usuário admin será criado.
E agora que temos a certeza que o admin
vai sempre existir podemos partir para a criação de um comando na CLI -->
Transaction CLI
Agora vamos criar um comando para adicionar saldo via CLI, sempre que feito via CLI o usuário
de origem dos pontos from_id
será o admin
.
O comando vai simplesmente chamar a função add_transaction
que criamos anteriormente mas colocaremos
lógica adicional para garantir a existencia dos usuários e para formatar os dados aprensentados em
uma tabela no terminal.
EDITE dundie/cli.py
e adicione um novo comando no final do arquivo.
# No topo
from dundie.tasks.transaction import add_transaction
from dundie.models.transaction import Transaction, Balance
# No comando `shell` adicione novos objetos
def shell():
...
_vars = {
...
"Transaction": Transaction,
"Balance": Balance,
"add_transaction": add_transaction,
}
# Crie o comando que adiciona transactions
@main.command()
def transaction(
username: str,
value: int,
):
"""Adds specified value to the user"""
table = Table(title="Transaction")
fields = ["user", "before", "after"]
for header in fields:
table.add_column(header, style="magenta")
with Session(engine) as session:
from_user = session.exec(select(User).where(User.username == "admin")).first()
if not from_user:
typer.echo("admin user not found")
exit(1)
user = session.exec(select(User).where(User.username == username)).first()
if not user:
typer.echo(f"user {username} not found")
exit(1)
from_user_before = from_user.balance
user_before = user.balance
add_transaction(user=user, from_user=from_user, session=session, value=value)
table.add_row(from_user.username, str(from_user_before), str(from_user.balance))
table.add_row(user.username, str(user_before), str(user.balance))
Console().print(table)
E para usar podemos fazer o seguinte no terminal:
$ docker compose exec api dundie transaction jim-halpert 900
Transaction
┏━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━┓
┃ user ┃ before ┃ after ┃
┡━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━┩
│ admin │ 0 │ -900 │
│ jim-halpert │ 0 │ 900 │
└─────────────┴────────┴───────┘
O usuário admin será ficará com saldo negativo e não tem limite de transferencia, assim como qualquer usuário que seja super-user.
Ao chamarmos o comando dundie transaction
assim como o user-list
o retorno é mostrado
em uma tabela formatada no terminal, em alguns casos seria interessante poder passar um
argumento --format=json
e obter o retorno em formato JSON para posterior tratamento.
Consegue adicionar essa funcionalidade?
Agora vamos partir para a API adicionando a mesma funcionalidade -->
Transaction API
Agora podemos finalmente criar o endpoint na API que vai fornecer a mesma funcionalidade,
porém com algumas diferenças entre a implementação que fizemos no CLI,
no caso da API o from_user
será o usuário que estiver autenticado.
EDITE dundie/routes/transaction.py
e adicione o seguinte código:
from fastapi import APIRouter, Body, HTTPException
from dundie.auth import AuthenticatedUser
from dundie.db import ActiveSession
from dundie.models import User
from dundie.tasks.transaction import add_transaction, TransactionError
from sqlmodel import select, Session
router = APIRouter()
@router.post('/{username}/', status_code=201)
async def create_transaction(
*,
username: str,
value: int = Body(embed=True),
current_user: User = AuthenticatedUser,
session: Session = ActiveSession
):
"""Adds a new transaction to the specified user."""
user = session.exec(select(User).where(User.username == username)).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
try:
add_transaction(user=user, from_user=current_user, value=value, session=session)
except TransactionError as e:
raise HTTPException(status_code=400, detail=str(e))
# At this point there was no error, so we can return
return {"message": "Transaction added"}
Agora podemos adicionar essas rotas no router principal editando dundie/routes/__init__.py
from fastapi import APIRouter
from .auth import router as auth_router
from .user import router as user_router
from .transaction import router as transaction_router
main_router = APIRouter()
main_router.include_router(auth_router, tags=["auth"])
main_router.include_router(user_router, prefix="/user", tags=["user"])
main_router.include_router(transaction_router, prefix="/transaction", tags=["transaction"])
Neste momento o endpoint já deve aparecer na API.
OBS o parâmetro
fresh
que vemos na API se refere a possibilidade de renovar o token de autenticação.
E podemos testar fazendo uma requisição HTTP.
Lembre-se de trocar o token pelo token gerado a partir da URL /token/, por exemplo, gerando um token para o usuário
admin
ou outro superuser permitirá a adição de pontos infinitos, usando token de um usuário comum permitirá apenas o envio de pontos comportados pelo saldo do usuário.
Adicionando 300 pontos ao usuário Bruno Rocha.
$ curl -X 'POST' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer TOKEN_AQUI' \
--data-raw '{"value": 300}' \
-k 'http://localhost:8000/transaction/bruno-rocha/'
Adicione algumas transactions e vamos partir para a API de filtragem e exibição -->
Filtrando dados
Agora que já podemos trocar pontos entre usuários vamos criar endpoints onde será possivel consultar as transações e saldos.
POST /transaction/{username}/
(feito) - adiciona transaçãoGET /transaction/
- Lista todas as transações- Se
superuser
exibe todas, caso contrário apenas as próprias. - permite filtros:
?from_user=username
,?user=username
- permite ordenação:
?order_by=from_user,user,value,date
- permite paginação:
?page=1&size=10
- Se
Antes de criarmos o endpoint precisamos criar um model de saida, TransactionResponse
para evitar o retorno do próprio model do banco de dados e se fizermos isso em dundie/models/transaction.py
teremos um problema de circular imports.
- # !!!! Exemplo em dundie/models/transaction.py
- from dundie.models.user import User # <- CIRCULAR IMPORT
Para contornar este problema de maneira simples, vamos agora criar um novo arquivo, desta forma isolamos o import e evitamos o import circular.
OBS Neste momento vamos colocar apenas o serializar para Transaction neste novo módulo mas futuramente podemos mover todos os serializers definidos em
models/user.py
emodels/transaction.py
para este mesmo módulo também.
Neste serializer vamos utilizar root_validator
para criar campos que são calculados no
momento da serialização.
CRIE o arquivo dundie/models/serializers.py
VocÊ pode criar usando
touch dundie/models/serializers.py
ou usando seu IDE ou navegador de arquivos.
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, root_validator
from sqlmodel import Session
from dundie.db import engine
from .user import User
class TransactionResponse(BaseModel, extra="allow"):
id: int
value: int
date: datetime
# These 2 fields will be calculated at response time.
user: Optional[str] = None
from_user: Optional[str] = None
@root_validator(pre=True)
def get_usernames(cls, values: dict):
with Session(engine) as session:
user = session.get(User, values["user_id"])
values["user"] = user and user.username
from_user = session.get(User, values["from_id"])
values["from_user"] = from_user and from_user.username
return values
Podemos testar no shell com:
$ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User',
'Transaction', 'Balance', 'add_transaction']
In [1]: from dundie.models.serializers import TransactionResponse
In [2]: t = session.get(Transaction, 1)
In [3]: TransactionResponse.parse_obj(t)
Out[3]: TransactionResponse(
value=100,
date=datetime.datetime(2023, 1, 6, 12, 21, 55, 30204),
user='bruno-rocha',
from_user='michael-scott',
user_id=2,
from_id=1,
id=1
)
Agora vamos EDITAR o arquivo dundie/routes/transaction.py
# No topo
from fastapi_pagination import Page, Params
from fastapi_pagination.ext.sqlmodel import paginate
from dundie.models import Transaction
from dundie.models.serializers import TransactionResponse
from sqlmodel import text
from sqlalchemy.orm import aliased
# No final do arquivo
@router.get("/", response_model=Page[TransactionResponse])
async def list_transactions(
*,
current_user: User = AuthenticatedUser,
session: Session = ActiveSession,
params: Params = Depends(),
from_user: Optional[str] = None,
user: Optional[str] = None,
order_by: Optional[str] = None,
):
query = select(Transaction)
# Optional `AND` filters
if user:
query = query.join(
User, Transaction.user_id == User.id
).where(User.username == user)
if from_user:
FromUser = aliased(User) # aliased needed to desambiguous the join
query = query.join(
FromUser, Transaction.from_id == FromUser.id
).where(FromUser.username == from_user)
# Mandatory access filter
# regular users can only see their own transactions
if not current_user.superuser:
query = query.where(
Transaction.user_id == current_user.id | Transaction.from_id == current_user.id
)
# Ordering based on &order_by=date (asc) or -date (desc)
if order_by:
order_text = text(
order_by.replace("-", "") + " " + ("desc" if "-" in order_by else "asc")
)
query = query.order_by(order_text)
# wrap response_model in a pagination object {"items": [], total, page, size }
return paginate(query=query, session=session, params=params)
Agora temos um novo endpoint listando todas as transactions e com os filtros que especificamos.
$ curl 'GET' -H 'Content-Type: application/json' \
-H 'Authorization: Bearer TOKEN_AQUI' \
-k 'http://localhost:8000/transaction/\
?page=1&size=2&from_user=michael-scott&user=bruno-rocha&order_by=-date'
#+RESPONSE
{
"items": [
{
"id": 12,
"value": 300,
"date": "2023-01-10T17:08:29.953452",
"user": "bruno-rocha",
"from_user": "michael-scott",
"from_id": 1,
"user_id": 2
},
{
"id": 11,
"value": 112,
"date": "2023-01-10T17:07:49.296277",
"user": "bruno-rocha",
"from_user": "michael-scott",
"from_id": 1,
"user_id": 2
}
],
"total": 6,
"page": 1,
"size": 2
}
#+END
Repare que como usamos o plugin fastapi_pagination agora o formato da resposta está diferente contendo items
, total
, page
e size
Revisão da API
Agora que já temos bastante funcionalidade na API vamos revisar e identificar o que está faltando.
Auth
- POST /token - login via formulário para gerar acccess token
- POST /refresh_token - Obter um novo token sem a necessidade de fazer login novamente
User
- GET /user/ - Lista todos os usuários
- GET /user/{username} - Lista um usuário específico
- POST🔒 /user/ - Cria um novo usuário
- PATCH🔒 /user/{username} - Altera informações do usuário
- POST /user/{username}/password - Altera a senha do usuário (?pwd_reset_token ou 🔒)
- POST /user/pwd_reset_token/ - Solicita um token via email para resetar a senha (?email)
Transaction
- POST🔒 /transaction/ - Cria uma nova transaction de
from_user para user
- GET🔒 /transaction/ - Lista transactions do usuário logado (ou todas em caso de superuser)
- filters:
user
,from_user
- sort:
order_by=date
(asc) ou-date
(desc) - pagination:
page
,size
- filters:
Mas ainda falta informação neste retorno, onde está o saldo total do usuário? vamos resolver -->
Expondo saldo do usuário
Na listagem de usuário está faltando exibir o saldo total do usuário, esta é uma informação sensivel e portanto estará disponível apenas em alguns casos.
?show_balance=true
for passado na URL das rotas GET de /user/- O usuário logado é superuser ou
- O usuário logado está acessando sua própria conta
EDITE o arquivo dundie/auth.py
e vamos adicionar mais uma dependencia
baseada em autenticação.
async def show_balance_field(
*,
request: Request,
show_balance: Optional[bool] = False, # from /user/?show_balance=true
) -> bool:
"""Returns True if one of the conditions is met.
1. show_balance is True AND
2. authenticated_user.superuser OR
3. authenticated_user.username == username
"""
if not show_balance:
return False
username = request.path_params.get("username")
try:
authenticated_user = get_current_user(token="", request=request)
except HTTPException:
authenticated_user = None
if any(
[
authenticated_user and authenticated_user.superuser,
authenticated_user and authenticated_user.username == username,
]
):
return True
return False
ShowBalanceField = Depends(show_balance_field)
Agora precisamos de um serializer contendo o campo balance
e posteriormente no endpoint
usaremos este serializer como retorno apenas quando a dependência acima for satisfeita,
usando uma abordagem chamada conditional response model
EDITE dundie/models/user.py
# Logo abaixo da classe UserResponse
class UserResponseWithBalance(UserResponse):
balance: Optional[int] = None
@root_validator(pre=True)
def set_balance(cls, values: dict):
"""Sets the balance of the user"""
instance = values["_sa_instance_state"].object
values["balance"] = instance.balance
return values
Agora EDITE o dundie/routes/user.py
e vamos usar a dependencia
nos endpoints list_users
e get_user_by_username
e além de adicionar
a dependencia vamos alterar o responde_model
tornando o condicional.
# IMPORTS
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from pydantic import parse_obj_as
from dundie.auth import ShowBalanceField
from dundie.models.user import UserResponseWithBalance
# list_users
@router.get(
"/",
response_model=List[UserResponse] | List[UserResponseWithBalance],
response_model_exclude_unset=True,
)
async def list_users(
*, session: Session = ActiveSession, show_balance_field: bool = ShowBalanceField
):
"""List all users.
NOTES:
- This endpoint can be accessed with a token authentication
- show_balance query parameter takes effect only for authenticated superuser.
"""
users = session.exec(select(User)).all()
if show_balance_field:
users_with_balance = parse_obj_as(List[UserResponseWithBalance], users)
return JSONResponse(jsonable_encoder(users_with_balance))
return users
# get user by username
@router.get(
"/{username}/",
response_model=UserResponse | UserResponseWithBalance,
response_model_exclude_unset=True,
)
async def get_user_by_username(
*, session: Session = ActiveSession, username: str, show_balance_field: bool = ShowBalanceField
):
"""Get user by username"""
query = select(User).where(User.username == username)
user = session.exec(query).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if show_balance_field:
user_with_balance = parse_obj_as(UserResponseWithBalance, user)
return JSONResponse(jsonable_encoder(user_with_balance))
return user
Você pode testar essa funcionalidade fazendo chamadas a URL /user/
e /user/{username}
e verificar que
quando o argumento ?show_balance=true
for passado na URL, o serializer de retorno irá conter o campo
do saldo, mas isso só será feito se o usuário for o superuser, ou o próprio usuário autenticado.
A API está pronta!
Não é bem assim... nada está pronto enquanto não tiver cobertura de testes -->
Definindo um pipeline
O Pipeline de testes será
- Garantir que o ambiente está em execução com o docker compose
- Garantir que existe um banco de dados
dundie_test
e que este banco está vazio. - Executar as migrations com alembic e garantir que funcionou
- Executar os testes com Pytest
- Apagar o banco de dados de testes
Vamos adicionar um comando reset_db
no cli
edite dundie/cli.py
# imports
from .db import engine, SQLModel
# Final
@main.command()
def reset_db(
force: bool = typer.Option(
False, "--force", "-f", help="Run with no confirmation"
)
):
"""Resets the database tables"""
force = force or typer.confirm("Are you sure?")
if force:
SQLModel.metadata.drop_all(engine)
O comando acima poderá ser executado com a flag -f
que irá pualr a etapa de confirmação.
Em um ambiente de CI geralmente usamos Github Actions
ou Jenkins
para executar
esses passos, em nosso caso vamos usar um script em bash para executar essas tarefas,
no treinamento Python Automation que também faz parte do pacote Python Expert será
abordado como automatizar esses processos usando os principais serviços de CI.
Confira o conteúdo do arquivo test.sh
na raiz do repositório.
#!/usr/bin/bash
# Start environment with docker compose
DUNDIE_DB=dundie_test docker compose up -d
# wait 5 seconds
sleep 5
# Ensure database is clean
docker compose exec api dundie reset-db -f
docker compose exec api alembic stamp base
# run migrations
docker compose exec api alembic upgrade head
# run tests
docker compose exec api pytest -v -l --tb=short --maxfail=1 tests/
# Stop environment
docker compose down
- definimos um banco de dados diferente usando a variável
DUNDIE_DB
- iniciamos o ambiente com o docker compose
- esperamos 5 segundos para garantir que o banco de dados está pronto
- resetamos o banco de dados
- executamos as migrations para garantir que temos todas as tabelas e dados
- executamos os testes usando pytest
- finalizamos o ambiente
Em caso de falha nos testes o ambiente não será parado, permitindo assim o debugging com o ambiente em execução.
O próximo passo é configurar o nosso test-runner, o pytest -->
Configurando Pytest
Para os tests vamos utilizar o Pytest para testar algumas rotas da API,
o Pytest pode ser configurado através de hooks e fixtures que ficam no
arquivo tests/conftest.py
, fixtures são geralmente funções que
provêm funcionalidades que serão injetadas (via injeção de dependencias)
em cada teste que escrevermos, portanto se queremos testar multiplos
usuários, começamos criando fixtures que preparam clientes HTTP autenticados
com os tokens de cada um desses usuários.
Uma outra coisa importante que faremos é apontar o banco de dados
para o banco de dados de teste que iniciamos no script test.sh
Setup
- Obter um token para o usuário admin
- Criar usuário1
- Obter um token para o usuário1
- Criar usuario2
- Obter um token para o usuario2
- Criar usuario3
- Obter um token para o usuario3
Durante o setup teremos fixtures do Pytest já configuradas com clientes HTTP para acessar a API com qualquer um dos usuários ou de forma anonima.
Começamos configurando o Pytest
EDITE tests/conftest.py
import os
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.exc import IntegrityError
from dundie.app import app
from dundie.cli import create_user
os.environ["DUNDIE_DB__uri"] = "postgresql://postgres:postgres@db:5432/dundie_test"
@pytest.fixture(scope="function")
def api_client():
"""Unauthenticated test client"""
return TestClient(app)
def create_api_client_authenticated(username, dept="sales", create=True):
"""Creates a new api client authenticated for the specified user."""
if create:
try:
create_user(name=username, email=f"{username}@dm.com", password=username, dept=dept)
except IntegrityError:
pass
client = TestClient(app)
token = client.post(
"/token",
data={"username": username, "password": username},
headers={"Content-Type": "application/x-www-form-urlencoded"},
).json()["access_token"]
client.headers["Authorization"] = f"Bearer {token}"
return client
@pytest.fixture(scope="function")
def api_client_admin():
return create_api_client_authenticated("admin", create=False)
@pytest.fixture(scope="function")
def api_client_user1():
return create_api_client_authenticated("user1", dept="management")
@pytest.fixture(scope="function")
def api_client_user2():
return create_api_client_authenticated("user2")
@pytest.fixture(scope="function")
def api_client_user3():
return create_api_client_authenticated("user3")
Basta salvar o arquivo conftest.py
e para cada sessão de testes o pytest vai
se certificar que cada uma das fixtures definidas esteja disponível.
No treinamento Python Automation é abordado o tema testes e pytest com maior profundidade, se você quiser saber mais sobre o assunto, recomendo que assista.
Agora podemos escrever os testes -->
Testes de API
Plano de testes
Os casos de uso que iremos testar:
- Como um usuário anonimo consigo listar os usuarios e não posso ver o saldo
- Como um usuário anonimo consigo listar os detalhes de um usuário sem o saldo
- Como usuário admin consigo atualizar o perfil de um usuário
- Como um usuário autenticado consigo atualziar meu próprio perfil
- Como um usuário autenticado não consigo atualizar o perfil de outro usuário
- Como usuário admin consigo transferir qualquer quantidade de pontos para todos os usuários
- Como um usuário autenticado consigo tranferir 20 pontos para outro usuário e ver o saldo
- Como um usuário admin consigo ver o saldo de todos os usuários
- Como um usuário admin consigo ver todas as transações
- Como um usuário autenticado consigo ver apenas minhas transações
Pytest
Agora vamos converter os casos de uso em funções de teste com o Pytest.
EDITE tests/test_api.py
import pytest
USER_RESPONSE_KEYS = {"name", "username", "dept", "avatar", "bio", "currency"}
USER_RESPONSE_WITH_BALANCE_KEYS = USER_RESPONSE_KEYS | {"balance"}
@pytest.mark.order(1)
def test_user_list(
api_client,
api_client_user1, # pyright: ignore
api_client_user2, # pyright: ignore
api_client_user3, # pyright: ignore
):
"""Ensure that all needed users are created and showing on the /user/ API
NOTE: user fixtures are called just to trigger creation of users.
"""
users = api_client.get("/user/").json()
expected_users = ["admin", "user1", "user2", "user3"]
assert len(users) == len(expected_users)
for user in users:
assert user["username"] in expected_users
assert user["dept"] in ["management", "sales"]
assert user["currency"] == "USD"
assert set(user.keys()) == USER_RESPONSE_KEYS
@pytest.mark.order(2)
def test_user_detail(api_client):
"""Ensure that the /user/{username} API is working"""
user = api_client.get("/user/user1/").json()
assert user["username"] == "user1"
assert set(user.keys()) == USER_RESPONSE_KEYS
@pytest.mark.order(3)
def test_update_user_profile_by_admin(api_client_admin):
"""Ensure that admin can patch any user data"""
data = {"avatar": "https://example.com/avatar.png", "bio": "I am a user1"}
api_client_admin.patch("/user/user1/", json=data)
user = api_client_admin.get("/user/user1/").json()
assert user["avatar"] == data["avatar"]
assert user["bio"] == data["bio"]
@pytest.mark.order(3)
def test_update_user_profile_by_user(api_client_user2):
"""Ensure that user can patch their own data"""
data = {"avatar": "https://example.com/avatar.png", "bio": "I am a user2"}
api_client_user2.patch("/user/user2/", json=data)
user = api_client_user2.get("/user/user2/").json()
assert user["avatar"] == data["avatar"]
assert user["bio"] == data["bio"]
@pytest.mark.order(3)
def test_fail_update_user_profile_by_other_user(api_client_user2):
"""User 2 will attempt to patch User 1 profile and it will fail"""
response = api_client_user2.patch("/user/user1/", json={})
assert response.status_code == 403
@pytest.mark.order(4)
def test_add_transaction_for_users_from_admin(api_client_admin):
"""Admin user adds a transaction for all users"""
usernames = ["user1", "user2", "user3"]
for username in usernames:
api_client_admin.post(f"/transaction/{username}/", json={"value": 500})
for username in usernames:
user = api_client_admin.get(f"/user/{username}/?show_balance=true").json()
assert user["balance"] == 500
@pytest.mark.order(5)
def test_user1_transfer_20_points_to_user2(api_client_user1):
"""Ensure that user1 can transfer points to user2"""
api_client_user1.post("/transaction/user2/", json={"value": 20})
user1 = api_client_user1.get("/user/user1/?show_balance=true").json()
assert user1["balance"] == 480
# user1 can see balance of user2 because user1 is a manager
user2 = api_client_user1.get("/user/user2/?show_balance=true").json()
assert user2["balance"] == 520
@pytest.mark.order(6)
def test_user_list_with_balance(api_client_admin):
"""Ensure that admin can see user balance"""
users = api_client_admin.get("/user/?show_balance=true").json()
expected_users = ["admin", "user1", "user2", "user3"]
assert len(users) == len(expected_users)
for user in users:
assert user["username"] in expected_users
assert set(user.keys()) == USER_RESPONSE_WITH_BALANCE_KEYS
@pytest.mark.order(6)
def test_admin_can_list_all_transactions(api_client_admin):
"""Admin can list all transactions"""
transactions = api_client_admin.get("/transaction/").json()
assert transactions["total"] == 4
@pytest.mark.order(6)
def test_regular_user_can_see_only_own_transaction(api_client_user3):
"""Regular user can see only own transactions"""
transactions = api_client_user3.get("/transaction/").json()
assert transactions["total"] == 1
assert transactions["items"][0]["value"] == 500
assert transactions["items"][0]["user"] == "user3"
assert transactions["items"][0]["from_user"] == "admin"
E para executar os tests podemos ir na raiz do projeto FORA DO CONTAINER
Garantimos que o script de testes é eecutável.
$ chmod +x test.sh
Executamos o script:
$ ./test.sh
[+] Running 3/3
⠿ Network dundie-api_default Created 0.1s
⠿ Container dundie-api-db-1 Started 0.7s
⠿ Container dundie-api-api-1 Started 1.7s
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running stamp_revision 9aa820fb7f01 ->
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> f39cbdb1efa7, initial
INFO [alembic.runtime.migration] Running upgrade f39cbdb1efa7 -> b0abf3428204, transaction
INFO [alembic.runtime.migration] Running upgrade b0abf3428204 -> 9aa820fb7f01, ensure_admin_user
===================================== test session starts ======================================
platform linux -- Python 3.10.8, pytest-7.2.0, pluggy-1.0.0 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /home/app/api, configfile: pyproject.toml
plugins: order-1.0.1, anyio-3.6.2
collected 10 items
tests/test_api.py::test_user_list PASSED [ 10%]
tests/test_api.py::test_user_detail PASSED [ 20%]
tests/test_api.py::test_update_user_profile_by_admin PASSED [ 30%]
tests/test_api.py::test_update_user_profile_by_user PASSED [ 40%]
tests/test_api.py::test_fail_update_user_profile_by_other_user PASSED [ 50%]
tests/test_api.py::test_add_transaction_for_users_from_admin PASSED [ 60%]
tests/test_api.py::test_user1_transfer_20_points_to_user2 PASSED [ 70%]
tests/test_api.py::test_user_list_with_balance PASSED [ 80%]
tests/test_api.py::test_admin_can_list_all_transactions PASSED [ 90%]
tests/test_api.py::test_regular_user_can_see_only_own_transaction PASSED [100%]
====================================== 10 passed in 4.62s ======================================
[+] Running 3/3
⠿ Container dundie-api-api-1 Removed 1.2s
⠿ Container dundie-api-db-1 Removed 0.7s
⠿ Network dundie-api_default Removed 0.3s
Se tudo deu certo então todos os testes devem ter passado, caso contrário tente encontrar onde está o erro e corrija antes de prosseguir.
Finalizamos assim a fase 1 do nosso projeto com a maior parte das funcionalidades testadas, vamos partir agora para a fase 2 ->
Task Queue
Enfileiramento de tarefas é uma abordagem que permite distribuir o procesamento de tarefas en diferentes unidades de processamento para obter um melhor desempenho e uma gestão mais fácil da escalabilidade do projeto.
Imagine que temos a tarefa de enviar um email como fizemos anteriormente usando BackGround Tasks do FastAPI, agora assuma que temos que enviar centenas ou até milhares de e-mails de uma só vez.
Para evitar que a aplicação fique sobrecarrega ao executar em um único processo, nós partimos para uma arquiretura de computação distribuida, onde poderemos ter várias instâncias de um worker sendo executados em paralelo.
Alguns conceitos importantes:
BROKER:
É o serviço que gerencia a fila de tarefas, entre os mais conhecidos temos o RabbitMQ, Redis, Kafka, simplificando é o serviço que recebe as tarefas e as coloca em uma fila para ser executada, uma analogia simples é pensar neste componente como um caixa de uma lanchonete que anota os pedidos e os coloca em uma fila distribuindo o trabalho entre os cozinheiros.
Para este projeto vamos usar o REDIS como BROKER.
O Redis é um banco de dados em memória, ele por si só não é considerado um message BROKER, ou seja, ele precisa de um outro processo para gerenciar as mensagens enfileiras nós usaremos REDIS como Broker pois o RQ faz o trabalho de gerenciar as mensagens no REDIS.
Em ambientes produtivos de alta escalabilidade é mais recomendado usar RabbitMQ, Kafka ou serviços especificos de cloud providers como o AWS SQS como Broker.
TASK:
É a tarefa que ficará disponível para ser executada, geralmente usamos decorators em Python ou uma chamada de função para registrar uma task, assim que registrada a task fica disponível e o BROKER pode validar o enfileiramento. Imagine que a task é um item do cardapio de um restaurante.
Existem vários sistemas de gestão de filas para Python cada um com um jeito específico de registrar uma task, alguns conhecidoss são: Celery, Dramatiq, RQ, Huey.
Em nosso projeto vamos usar o RQ pois é o mais simples de configurar e usar.
PRODUCER:
É o mecanismo que envia a tarefa para o BROKER, geralmente em Python usamos uma chamada de função onde passamos qual a task a ser enfileirada e os parametros necessários para a execução, além disso em alguns casos podemos definir coisas como tempo de expiração da tarefa, prioridade, callback e rotear para uma fila especifica. Imagine que o producer é o garçom que anota o pedido e leva até o caixa.
A produção de tarefas pode ser feita em qualquer parte do projeto, usando mecanismos providos pelo RQ iremos usar chamadas de função que enfileiram as mensagens diretamente no REDIS.
CONSUMER ou WORKER:
Com as tasks enfileiradas e devidamente configuradas precisamos de um componente que de fato irá executar a tarefa e tomar conta de coisas como contexto, re-execução, falhas, logs etc.
Imagine que o consumer (ou trabalhador) é o cozinheiro que recebe o pedido e que precisa executar a tarefa de preparo levando em consideração o contexto do pedido, a receita, a prioridade etc.
O RQ já fornece um mecanismo de worker que podemos usar diretamente no terminal ou integrar a um sistema de gestão de processos como o supervisord ou o systemd.
Arquitetura
Agora teremos que começar alterando a arquitetura de componentes da nossa infraestrutura.
Nesta Fase 2 do Projeto adicionaremos o serviço de fila de mensagens e o serviço consumidor de tarefas.
- 1 Serviço de API
- 1 Serviço de Banco de Dados
- 1 Serviço consumidor de tarefas (RQ)
- 1 Serviço de fila de mensagens (Redis)
graph TD; A["API (FastAPI) fa:fa-globe"] B[("Banco de Dados (PG) fa:fa-cubes")] C>"Fila/Broker de mensagens (Redis) fa:fa-bars"] D[["Consumidor/Worker de tarefas (RQ) fa:fa-gears"]] A --> B A --> C D --> C D --> B
Para começar vamos adicionar o serviço de fila de mensagens
usando o REDIS, para isso vamos editar o arquivo
docker-compose.yml
e adicionar o serviço:
redis:
image: redis:6.2.5-alpine
restart: always
ports:
- "6379:6379"
volumes:
- dundie_redis_data:/data
volumes:
dundie_pg_data:
dundie_redis_data:
Após editar o arquivo vamos subir o serviço:
docker compose down
docker compose up -d
O client GUI oficial é o RedisInsight, você pode baixar no site oficial: https://redislabs.com/redis-enterprise/redis-insight/
Se preferir acesssar via terminal pode usar o redis-cli: https://redis.io/docs/ui/cli/
Dentro do RedisInsight vamos criar uma conexão com o REDIS e geralmente ele já irá detectar o REDIS que está sendo executado no localhost.
Podemos então criar chaves:
Tarefas
Usando o Python-RQ qualquer função pode ser enfileirada como uma tarefa, não há necessidade de registrar a task como em outros sistemas de gestão de filas como o Celery, no RQ basta que o worker tenha acesso ao mesmo contexto da função que será executada.
Em nosso projeto já possuimos uma função que envia e-mail, vamos então enfileirar esta função como uma task.
Enfileirando a Task
Para enfileirar uma task vamos precisar de uma conexão com o REDIS, e uma instancia da fila do RQ.
O primeiro passo é editar o nosso arquivo de requirements e adicionar as dependencias do RQ:
Adicione ao arquivo requirements.in
:
rq # Task Queue
O pacote rq já irá instalar o Redis,
precisamos agora atualizar o arquivo requirements.txt
:
Em seu computador (fora do container) execute:
pip install pip-tools
pip-compile requirements.in
cat requirements.txt | grep rq
A saida deve ser algo como:
# rq
# via rq
rq==1.15.1
Agora podemos fazer o rebuild da imagem do serviço API:
docker compose down
docker compose build api
docker compose up -d
Agora podemos entrar no shell e interagir com o Redis só para confirmar que está tudo funcionando:
docker compose exec api dundie shell
E dentro do shell
from redis import Redis
from rq import Queue
q = Queue(connection=Redis("redis"))
from dundie.tasks.user import send_email
result = q.enqueue(send_email)
print(result)
O retorno será algo como:
Job(
'01b32665-ff5d-423a-9149-c25dc9d85ba5',
enqueued_at=datetime.datetime(2023, 7, 14, 13, 53, 29, 353977)
)
Agora podemos abrir o RedisInsight e ver que a chave
rq:job:01b32665-ff5d-423a-9149-c25dc9d85ba5
foi criada.
e dentro dela temos o conteúdo da task:
Portanto já sabemos como faxer para enfileiras uma task no BROKER, essa task ainda não será executada enquanto não tivermos um worker consumindo as tarefas.
Mas primeiro vamos estruturar o nosso projeto para produzir tasks via API.
Começamos editando o arquivo de configuraçoes, default.toml
[default.redis]
host = "redis"
port = 6379
Desta forma caso seja necessário alterar a configuração do REDIS podemos fazer via variáveis de ambiente pois o Dynaconf faz a gestão das configurações.
# exemplo
DUNDIE_REDIS_HOST="meuhost"
Agora criaremos um arquivo novo dundie/queue.py
from redis import Redis
from rq import Queue
from dundie.config import settings
redis = Redis(
host=settings.redis.host,
port=settings.redis.port,
)
queue = Queue(connection=redis)
agora podemos testar no shell:
docker compose exec api dundie shell
from dundie.queue import queue
from dundie.tasks.user import send_email
queue.enqueue(send_email)
E agora no REDIS podemos ver que a chave foi criada com os detalhes da nova task.
apague essas tasks do REDIS antes de continuar.
Agora vamos alterar o nosso endpoint de pedido de recuperação de senha para ao invés de enviar o e-mail com a BackgroundTask, enfileirar uma task no Redis para usar o processamento distribuido.
Edite dundie/routes/user.py
e no topo faça a importação da
queue
.
from dundie.queue import queue
e nele vamos alterar a função
send_password_reset_token
para enfileirar a task
portanto troque a linha:
- background_tasks.add_task(try_to_send_pwd_reset_email, email=email)
+ queue.enqueue(try_to_send_pwd_reset_email, email=email)
Reinicie os serviços e agora vamos testar via API:
docker compose down
docker compose up -d
Acesse
http://localhost:8000/docs#/user/send_password_reset_token_user_pwd_reset_token__post
E dispare um pedido de recuperação de senha.
Ou via CURL:
curl -X 'POST' \
'http://localhost:8000/user/pwd_reset_token/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"email": "bruno@rocha.com"
}'
{
"message": "If we found a user with that email, we sent a password reset token to it."
}
Agora veja se tudo deu certo no RedisInsight:
Já estamos produzindo tasks!!!
Agora vamos criar um worker para consumir as tasks.
Worker ou Consumer
Para testar podemos simplesmente executar o worker no shell:
docker compose exec api bash
~/api$ rq worker --url redis://redis:6379 --with-scheduler
14:23:11 Worker rq:worker:34f3e36d726941ceb65669ff002a79ba started with PID 20, version 1.15.1
14:23:11 Subscribing to channel rq:pubsub:34f3e36d726941ceb65669ff002a79ba
14:23:11 *** Listening on default...
14:23:11 Scheduler for default started with PID 22
14:23:11 Cleaning registries for queue: default
14:23:11 default: dundie.tasks.user.try_to_send_pwd_reset_email(email='bruno@rocha.com') (e8f10b0e-f69e-4dc0-b102-8279140892d8)
14:23:11 default: Job OK (e8f10b0e-f69e-4dc0-b102-8279140892d8)
14:23:11 Result is kept for 500 seconds
No Result é possível ver o ID da task que foi executada e o seu status alterado para finished.
Este resultado foca disponivel por 500 segundos, depois disso ele é limpo automaticamente do Redis.
Saia do terminal pressionando CTRL+C
seguido de docker compose down
Agora nosso próximo passo é criar um serviço para executar o worker em segundo plano dentro de um container.
Vamos editar o arquivo docker-compose.yml
e adicionar um novo serviço
worker:
build:
context: .
dockerfile: Dockerfile.dev
environment:
DUNDIE_DB__uri: "postgresql://postgres:postgres@db:5432/${DUNDIE_DB:-dundie}"
DUNDIE_DB__connect_args: "{}"
SQLALCHEMY_SILENCE_UBER_WARNING: 1
volumes:
- .:/home/app/api
depends_on:
- db
- redis
stdin_open: true
tty: true
command: rq worker --with-scheduler --url redis://redis:6379
E agora vamos executar novamente os serviços e dessa vez teremos o worker iniciado.
docker compose up -d
[+] Running 5/5
⠿ Network dundie-api_default Created 0.0s
⠿ Container dundie-api-redis-1 Started 0.6s
⠿ Container dundie-api-db-1 Started 0.6s
⠿ Container dundie-api-api-1 Started 1.7s
⠿ Container dundie-api-worker-1 Started
Abra o log do worker:
$ docker compose logs worker --follow
dundie-api-worker-1 | 14:33:39 Worker rq:worker:fc005b5371eb43bf90dd5ba688a72e8f started with PID 1, version 1.15.1
dundie-api-worker-1 | 14:33:39 Subscribing to channel rq:pubsub:fc005b5371eb43bf90dd5ba688a72e8f
dundie-api-worker-1 | 14:33:39 *** Listening on default...
dundie-api-worker-1 | 14:33:39 Scheduler for default started with PID 7
E agora dispare mais pedidos de recuperação de senha via API/;
curl -X 'POST' \
'http://localhost:8000/user/pwd_reset_token/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"email": "bruno@rocha.com"
}'
Verá que no log do worker aparecerá a task sendo executada.
dundie-api-worker-1 | 14:36:01 default: dundie.tasks.user.try_to_send_pwd_reset_email(email='bruno@rocha.com') (14e53a7a-7fb9-4bc1-896a-5d0341e6b281)
dundie-api-worker-1 | 14:36:02 default: Job OK (14e53a7a-7fb9-4bc1-896a-5d0341e6b281)
dundie-api-worker-1 | 14:36:02 Result is kept for 500 seconds
Aconselho ler a documentação do RQ em Python RQ Docs
Em nossa próxima aula iremos agendar tarefas com o scheduler do RQ.
WebSockets
O que é
O protocolo ws:// é uma abreviação de WebSocket e é usado para estabelecer uma conexão bidirecional entre um cliente (como um navegador da web) e um servidor web. Aqui estão alguns pontos-chave sobre o protocolo ws://:
-
Bidirecionalidade: Com o protocolo ws://, tanto o cliente quanto o servidor podem enviar mensagens um para o outro em tempo real, sem a necessidade de esperar por uma solicitação do cliente. Isso é diferente do HTTP tradicional, onde o cliente precisa solicitar informações do servidor a cada interação.
-
Comunicação em tempo real: O protocolo ws:// é ideal para aplicativos da web que exigem atualizações em tempo real, como jogos multiplayer, salas de bate-papo ao vivo, feeds de notícias em tempo real e colaboração em tempo real.
-
Eficiência: Em comparação com soluções baseadas em polling (em que o cliente solicita repetidamente informações do servidor), os WebSockets são mais eficientes, pois estabelecem uma conexão persistente entre o cliente e o servidor. Isso reduz a sobrecarga de rede e os atrasos associados às solicitações HTTP repetidas. NOTA: Isto vem com um custo de performance na gestão de conexões.
-
Suporte: A maioria dos navegadores modernos e servidores web suporta o protocolo ws://, tornando-o uma escolha popular para desenvolvedores que desejam criar aplicativos da web interativos e em tempo real.
Em resumo, o protocolo ws:// é uma tecnologia de comunicação em tempo real que permite uma conexão persistente e bidirecional entre clientes e servidores web, facilitando a criação de aplicativos da web interativos e em tempo real.
Importante
Se você criou o seu repositório fork antes de 14/03/2024 pode ser que você tenha que atualizar algumas extensões locais para que seu projeto continue funcionando, devido a mudanças no Pydantic e SQLALchemy. `
Pode ver as alterações e efetuar as mudanças manualmente com o seguinte link: https://github.com/rochacbruno/dundie-api/commit/33fb2747ac2b57a50a202eb67825aaff02036fa5
Se preferir pode aplicar as mudanças localmente com o seguinte script:
curl https://github.com/rochacbruno/dundie-api/commit/33fb2747ac2b57a50a202eb67825aaff02036fa5.patch | git apply -v
Instalação do websockets
A biblioteca websockets precisa estar instalada no ambiente onde vamos servir o websocket.
Edite o arquivo requirements.in
e adicione a biblioteca websockets
ao final do arquivo.
websockets # Web Sockets
Em seu computador (fora do container) execute:
pip install pip-tools
pip-compile requirements.in
cat requirements.txt | grep websockets
websockets==12.0 (pode ser que a versão seja diferente no momento que vc executar)
Agora devemos fazer o rebuild dos containers:
docker compose down
docker compose build api
docker compose up -d
E agora podemos entrar no shell para se certificar de que tudo está funcionando:
❯ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User', 'Transaction', 'Balance', 'add_transaction']
In [1]: from websockets.version import version
In [2]: version
Out[2]: '12.0'
Websocket Server
O primeiro passo é criar do lado servidor um endpoint (URL) que iniciará uma conexão usando o protocolo ws
e o FastAPI já tem suporte nativo a este tipo de protocolo.
Editaremos o arquivo dundie/routes/transaction.py
e vamos criar um websocket para fazer stream de transações,
dessa forma o usuário vai conseguir acompanhar as transações em tempo real sem precisar dar refresh no client (navegador)
Vamos começar com algo simples para entender o funcionamento de um websocket.
from fastapi import APIRouter, Body, Depends, HTTPException, WebSocket # New
...
@router.websocket("/ws")
async def list_transactions_ws(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
O FastAPI ainda não suporta exibir o endpoint websocket no /docs portanto esta URL não será exibida, a boa prática é adicionar documentação sobre este endpoint manualmente em seu endpoint correspondente ou em forma de texsto puro.
Websocket client
Para testar precisaremos escrever um pouco de JavaScript diretamente no navegador e para facilitar recomendo abrir um terminal e um browser lado a lado.
O passo a passo:
- Observamos os logs da API com
docker compose logs api --follow
- No navegador tentamos abrir a URL http://localhost:8000/transaction/ws e obteremos o erro 405, não tem problema!
- Abriremos o painel de inspect do navegador e clicaremos em console onde iremos escrever algumas instruções em JavaScript.
- Criamos um client
ws
no JS comvar ws = new WebSocket("ws://localhost:8000/transaction/ws");
- Definimos um event listener para quando recebermos uma mensagem via ws com
ws.onmessage = function(event) {console.log(event.data)};
e por enquanto nosso listener apenas vai imprimir a mensagem no console. - Enviamos uma mensagem ao servidor com
ws.send("Hello")
- A mensagem é enviada ao servidor e processada, retornando ao client (repare a adição de "Message text was: ...")
Toda esta comunicação acontece utilizando um único socket de comunicação, mantendo o estado da conexão, diferente das conexões HTTP que vimos até agora.
O console em detalhe:
Criando o front-end
O objetivo até aqui é falar sobre o lado back-end apenas, nosso foco está na API e endpoints, porém para obter a experiência completa vamos precisar de um front-end então faremos uma interface muito simples para ser usada de client para nossa API.
O que faremos:
- Criar um novo serviço no docker-compose que vai servir apenas o front-end
- Usar o servidor NGINX para servir o front-end
- Criar o front-end usando uma página estática HTML
Vamos começar simples assim, no futuro iremos deixar este front-end mais robusto, mas no momento o que importa é apenas fazer a conexão com o websocket.
No docker-compose.yaml
vamos adicionar mais um serviço, logo após worker
.
services:
worker:
...
ui:
image: nginx
ports:
- "8001:80"
volumes:
- ./ui:/usr/share/nginx/html
volumes:
...
Repare que estamos mapeando uma pasta local chamada ./ui
que é onde estará o
nosso site estático com páginas html.
mkdir ui
echo "Hello front end" > ui/index.html
Agora vamos dar o restart nos containers.
docker compose down
docker compose up -d
Na primeira execução pode demorar um pouco mais pois a imagem do nginx será baixada, mas assim que os containers estiverem em execução abra a URL http://localhost:8001 onde está sendo servido nosso front-end.
Experimente editar o arquivo ui/index.html
e depois façá refresh na URL para ver se as mudanças serão refletidas.
HTML
Não espere muito do nosso HTML, aqui temos um programdor backend tentando criar uma interface :) será simples para apenas funcionar!
Edite o ui/index.html
e adicione o seguinte código.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket Transactions</title>
</head>
<body>
<h1>Transaction List</h1>
<ul id="transaction-list"></ul>
<script>
// Function to create and append a new <li> element to the list
function appendTransactionToList(transaction) {
var transactionList = document.getElementById("transaction-list");
var listItem = document.createElement("li");
listItem.textContent = transaction.to + " - " + transaction.from + " - " + transaction.value;
transactionList.appendChild(listItem);
}
// Connect to WebSocket endpoint
var ws = new WebSocket("ws://localhost:8000/transaction/ws");
// Event listener for WebSocket connection opened
ws.onopen = function(event) {
console.log("WebSocket connection opened");
};
// Event listener for incoming messages
ws.onmessage = function(event) {
console.log("New message received:", event.data);
// Parse JSON data
var transaction = JSON.parse(event.data);
// Append transaction to list
appendTransactionToList(transaction);
};
// Event listener for WebSocket connection closed
ws.onclose = function(event) {
console.log("WebSocket connection closed");
};
// Event listener for WebSocket connection error
ws.onerror = function(event) {
console.error("WebSocket error:", event);
};
</script>
</body>
</html>
O próximo passo é implementar no endpoint o retorno das ultimas transactions.
@router.websocket("/ws")
async def list_transactions_ws(websocket: WebSocket, session: Session = ActiveSession):
await websocket.accept()
last = 0
while True:
# Read all transactions that have not been seen yet
new_transactions = session.exec(select(Transaction).where(Transaction.id > last).order_by("id"))
for transaction in new_transactions:
data = {
"to": transaction.user.name,
"from": transaction.from_user.name,
"value": transaction.value,
}
await websocket.send_json(data)
# set the last sent ID to avoid duplication
last = transaction.id
# Sleep 1 second (just to see better on UI)
await sleep(1)
Agora podemos salvar e abrir o endereço http://localhost:8001 que inicialmente irá exibir uma tela vazia, porem
pode abrir o inspect
do navegador para ver a mensagem de conexão ao websocket e tambem verificar a mesma informação
nos logs do container de API.
Através do terminal vamos verificar quais users existem no sistema
❯ docker compose exec api dundie user-list
dundie users
┏━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ name ┃ username ┃ dept ┃ email ┃ currency ┃
┡━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Admin │ admin │ management │ admin@dm.com │ USD │
│ user3 │ user3 │ sales │ user3@dm.com │ USD │
│ user1 │ user1 │ management │ user1@dm.com │ USD │
│ user2 │ user2 │ sales │ user2@dm.com │ USD │
└───────┴──────────┴────────────┴──────────────┴──────────┘
E agora com a tela do browser aberta lado a lado vamos adicionar transactions.
PERFORMANCE e DISPONIBILIDADE
Nossa implementação do endpoint list_transactions_ws
não está boa, existem certos problemas que podem influenciar
a performance e a escalabilidade, isso se deve ao fato de que para cada cliente que estiver conectado (aba aberta no browser)
do lado servidor será preciso manter uma conexão ws, uma session com o banco de dados e todo o restante do estado.
Considere um site que recebe milhares de requests simultaneos, isso pode ser um grande problema!
Para resolver estes problemas será preciso mudar a implementação para:
- Evitar a execução de queries SQL, lendo a partir de um cache ou tópico em uma fila de stream (redis/kafka)
- Se usar SQL devemos usar um driver async, o driver atual não é async, teriamos que usar um do SQLAlchemy 2.0
- Usar um pool de conexões websocket
- Tratar o fechamento das conexões
E além dessas mudanças na aplicação tambem será necessário colocar nosso servidor para executar em um load-balancer para que tenha alta disponibilidade.
Nós não vamos resolver os problemas acima agora, em nosso ambiente controlado o máximo que vai acontecer é conexões simultaneas darem timeout, tente por exemplo abrir mais de uma vez a página no browser, em abas separadas, abrindo assim diferentes sockets, tambem tente fazer uma requisição normal a API em http://localhost:8000/docs e você vai perceber que o servidor não será capaz de servir todas essas conexões.
O que podemos fazer neste momento?
Escalar Horizontalmente!
Vamos editar o arquivo Dockerfile.dev
e aumentar a quantidade de workers uvicorn, e também será necessário
remover a funcionalidade de reload automático, pois o uso de --reload
faz com que o uvicorn execute de modo bloqueante.
-CMD ["uvicorn","dundie.app:app","--host=0.0.0.0","--port=8000", "--reload"]
+CMD ["uvicorn","dundie.app:app","--host=0.0.0.0","--port=8000", "--workers=8"]
Após alterar o Dockerfile precisaremos fazer o rebuild da imagem e executar novamente
docker compose down
docker compose build api
docker compose up -d
Agora sim a nossa API estará executando com 8 workers, capazes de servir mais requisições e segurar mais sockets abertos.
api-1 | INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
api-1 | INFO: Started parent process [1]
api-1 | INFO: Started server process [13]
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Application startup complete.
api-1 | INFO: Started server process [10]
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Application startup complete.
api-1 | INFO: Started server process [9]
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Application startup complete.
api-1 | INFO: Started server process [14]
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Application startup complete.
api-1 | INFO: Started server process [12]
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Application startup complete.
api-1 | INFO: Started server process [11]
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Started server process [15]
api-1 | INFO: Application startup complete.
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Application startup complete.
api-1 | INFO: Started server process [8]
api-1 | INFO: Waiting for application startup.
api-1 | INFO: Application startup complete.
Experimentando
Temos 8 workers, isso quer dizer que podemos abrir pelo menos 7 sockets ao mesmo tempo e ainda ter um livre para receber as requisições HTTP.
- Experimente abrir várias abas no endereço http://localhost:8001 (dica: O browser permite clicar com o direito e duplicar a aba)
- Adicione uma transaction no CLI e veja se aparece em todas as abas
docker compose exec api dundie transaction admin 123
- tente abrir http://localhost:8000/docs e executar o endpoint
List Users
Você vai perceber que dentro dessas limitações irá funcionar, mas ainda assim não é o ideal.
Melhorias
Uma das melhorias é criando em nossa aplicação um ConnectionManager para gerenciar a lista de conexões ws
, na documentação
do FastAPI o procedimente é explicado em detalhes https://fastapi.tiangolo.com/advanced/websockets/#handling-disconnections-and-multiple-clients
Testando
Para testar o endpoint WS podemos usar o mesmo client que usamos para testar os endpoints HTTP, vamos adicionar o seguinte
teste ao arquivo tests/test_api.py
@pytest.mark.order(6)
def test_admin_can_list_all_transactions_ws(api_client_admin):
"""Admin can list all 4 transactions on ws endpoint"""
with api_client_admin.websocket_connect("/transaction/ws") as ws:
for i in range(4):
data = ws.receive_json()
assert data.keys() == {"to", "from", "value"}
Executando o teste
❯ ./test.sh
[+] Running 5/6
⠸ Network dundie-api_default Created 1.3s
✔ Container dundie-api-ui-1 Started 0.7s
✔ Container dundie-api-db-1 Started 0.5s
✔ Container dundie-api-redis-1 Started 0.8s
✔ Container dundie-api-api-1 Started 0.8s
✔ Container dundie-api-worker-1 Started 1.2s
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running stamp_revision 9aa820fb7f01 ->
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> f39cbdb1efa7, initial
INFO [alembic.runtime.migration] Running upgrade f39cbdb1efa7 -> b0abf3428204, transaction
INFO [alembic.runtime.migration] Running upgrade b0abf3428204 -> 8af1cd3be673, transaction1
INFO [alembic.runtime.migration] Running upgrade 8af1cd3be673 -> 6f4df3b5e155, transaction2
INFO [alembic.runtime.migration] Running upgrade 6f4df3b5e155 -> 9aa820fb7f01, ensure_admin_user
================================ test session starts =================================
platform linux -- Python 3.10.13, pytest-8.1.1, pluggy-1.4.0 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /home/app/api
configfile: pyproject.toml
plugins: order-1.2.0, anyio-4.3.0
collected 11 items
tests/test_api.py::test_user_list PASSED [ 9%]
tests/test_api.py::test_user_detail PASSED [ 18%]
tests/test_api.py::test_update_user_profile_by_admin PASSED [ 27%]
tests/test_api.py::test_update_user_profile_by_user PASSED [ 36%]
tests/test_api.py::test_fail_update_user_profile_by_other_user PASSED [ 45%]
tests/test_api.py::test_add_transaction_for_users_from_admin PASSED [ 54%]
tests/test_api.py::test_user1_transfer_20_points_to_user2 PASSED [ 63%]
tests/test_api.py::test_user_list_with_balance PASSED [ 72%]
tests/test_api.py::test_admin_can_list_all_transactions PASSED [ 81%]
tests/test_api.py::test_regular_user_can_see_only_own_transaction PASSED [ 90%]
tests/test_api.py::test_admin_can_list_all_transactions_ws PASSED [100%]
================================= 11 passed in 8.08s =================================
[+] Running 6/6
✔ Container dundie-api-worker-1 Removed 1.0s
✔ Container dundie-api-ui-1 Removed 0.3s
✔ Container dundie-api-api-1 Removed 2.4s
✔ Container dundie-api-redis-1 Removed 0.3s
✔ Container dundie-api-db-1 Removed 0.3s
✔ Network dundie-api_default Removed 0.1s
Conclusão
O FastAPI junto com o Starlette permite implementar websockets de maneira bastante simples, mas como é um micro-framework
limita-se a entregar a parte de comunicação entre client e server respeitando o protocolo ws://
.
Qualquer melhoria adicional precisará ser ajustada tanto na plataforma que serve a aplicação, como uvicorn
, caches, load-balancers, proxies,
como também através da utilização de drivers assincronos para acesso ao banco de dados assim como o uso de streams de dados ao invés de
conexões a bancos de dados relacionais.
Middleware e CORS
1 Request -> REsponse
A parte mais importante de uma aplicação web é o fluxo de requisição, ou seja, o cliente (browser) faz uma requisição (Request) ao servidor.
A requisição formatada no protocolo HTTP é enviada ao Servidor, que por sua vez processa os elementos como URL, Verbo, Headers, Payload.
O servidor então prepara a resposta (Response) também no formato HTTP, contendo HEADERS e Body e envia esta resposta ao cliente.
No FastAPI podemos inspecionar o objecto request, e modificar o objeto REsponse de maneira bastante simples.
Adicione em dundie/routes/user.py logo antes da rota list_users.
@router.get("/test")
async def test_view(request: Request, response: Response):
print(request.headers)
print(response.headers)
response.headers["X-Qualquer-Coisa"] = "123"
return {}
Acesse http://localhost:8000/docs#/user/test_view_user_test_get e faça uma requisição.
repare que no terminal será impresso os valores dos headers, e no retorno da API no browser você verá o seu novo header x-qualquer-coisa
.
Existem vários motivos para querermos injetar novos headers no objeto reponse, ou inspecionar atributos do objeto request.
Um deles é para lidar com o conceito de CORS que já veremos a seguir.
Injetar headers HTTP individualmente em cada rota assim como fizemos acima as vezes é útil, agora imagine que precisamos injetar esse header em todas as rotas!
Seria preciso alterar código em muitos lugares, portanto podemos automizar com o uso de middlewares.
Middleware
A palavra middleware pode significar coisas diferentes dependendo do tipo de software que estiver trabalhando, mas no contexto de aplicações web um middleware geralmente se refere a um procedimento que será injetado entre o recebimento do Request e o envio do Response.
No fastapi podemos adicionar um middleware de duas formas, usando um decorator, ou adicionando explicitamente.
Decorator
Começamos definindo uma função que irá receber o request e um callback para gerar o response.
No final do arquivo app.py
@app.middleware("http")
async def add_new_header(request: Request, make_response):
response = await make_response(request)
response.headers["X-Qualquer-Coisa"] = "456"
return response
Agora pode requisitar qualquer endpoint em http://localhost:8000/docs#/ que o novo será adicionado a todos.
Existem uma série de middlewares que podem ser adicionados para diversos fins, como por exemplo, redirecionar todos os usuários para uma determinada URL ou Schema, forçar a autenticação em um SSO etc.
CORS
Este é o Middleware que com certeza qualquer aplicação com front-end irá necessitar, vamos antes entender o que é CORS.
Cross Origin Resource Sharing = Compartilhamento de Recursos entre Origens.
Recurso = URL/endpoint Origin = Request principal iniciado pelo navegador
Bloqueio de compartilhamento entre origens é uma prática de segurança que é implementada em todos os principais navegadores.
Este bloqueio regula que, uma página web (pagina.html) quando for requisitada
só poderá requisitar novos recursos (outros endpoints) se este estiver na mesma origin, ou seja, no mesmo schema:dominio:porta
.
PAra ver como isso funciona na prática vamos criar uma página em nossa ui
ui/users.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Users</title>
</head>
<body>
<h1>Users List</h1>
<ul id="users-list"></ul>
<script>
// Function to create and append a new <li> element to the list
function appendUserToList(user) {
var transactionList = document.getElementById("users-list");
var listItem = document.createElement("li");
listItem.textContent = user.name;
transactionList.appendChild(listItem);
}
// Fetch API user list
fetch("http://localhost:8000/user/")
// parse to JSON
.then(response => response.json())
// loop the list and pass each user to the append function
.then(users => {
users.forEach(user => {
appendUserToList(user);
});
});
</script>
</body>
</html>
Acesse no navegador http://localhost:8001/users.html e abra o console de dev (botao direito -> inspect) e na aba console veja o seguinte erro:
Cross-Origin Request Blocked: The Same Origin Policy disallows reading the remote resource at http://localhost:8000/user/. (Reason: CORS header ‘Access-Control-Allow-Origin’ does not match ‘localhost:8001’).
Como a mensagem explica, a UI está sendo servida em localhost:8001
e a API que queremos acessar está em localhost:8000
, logo são consideradas origens diferentes.
Para resolver este problema precisamos dizer ao navegador que confiamos nessa comunicação.
Vamos fazer isso em nosso middleware, altere o middleware em app.py
@app.middleware("http")
async def add_new_header(request: Request, make_response):
response = await make_response(request)
response.headers["X-Qualquer-Coisa"] = "456"
response.headers["Access-Control-Allow-Origin"] = "http://localhost:8001"
return response
Agora acesse http://localhost:8001/users.html e veja que dessa funcionou sem problemas.
mais de uma origem?
Existem casos em que a app front-end precisa acessar recursos de mais de uma origem diferente,
porém o header Access-Control-Allow-Origin
só aceita um único valor.
Neste caso será necessário uma implementação mais elaborada envolvendo expressões regulares para fazer o match em multiplas origens ou um esquema onde a origem em questão é resolvida em um request pre-flight que é quando o navegador envia um request OPTIONS
perguntando ao
servidor se pode efetuar um request.
Uma opção também é usar o wildcard *
para fazer match com qualquer origem, mas esta opção não é considerada segura e é usada apenas em APIs que são acessadas apenas em redes controladas.
O FastAPI tem uma forma mais sucinta de resolver este problema.
Cors Middleware
Podemos agora comentar o nosso middleware do app.py
, deixaremos o código comentado apenas como referencia.
# @app.middleware("http")
# async def add_new_header(request: Request, make_response):
# response = await make_response(request)
# response.headers["X-Qualquer-Coisa"] = "456"
# response.headers["Access-Control-Allow-Origin"] = "http://localhost:8001"
# return response
Vamos usar então um middleware já embutido no FastAPI, no mesmo arquivo app.py
from fastapi.middleware.cors import CORSMiddleware
...
# após `app= FastAPI(..)
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:8001",
"http://localhost",
"https://server.com",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Este middleware irá automatizar uma série de coisas, fazendo a negociação inicial em pre-flight para construir o valor correto que vai no header
"Access-Control-Allow-Origin"
e cuidando de configurações adicionais como decidir se esse header ser a adicionado para todos os métodos, se a parte de autenticação será incluida etc..
Tudo o que teriamos que fazer manualmente resolvido com um Middleware.
https://fastapi.tiangolo.com/tutorial/cors/
Ahh agora ao acessar http://localhost:8001/users.html tudo deve funcionar perfeitamente.
Front-end e Autenticação
Autenticação consiste em validar as credenciais de um usuário para conceder ou negar acesso a um recurso.
Solicitar as credenciais a cada tentativa de acesso tornaria a experiência do usuário muito desagradável. Por isso, existem dois modelos de persistência de autenticação comumente utilizados quando integrados ao front-end.
Algumas aplicações adotam ambos os modelos simultaneamente, já que é recomendável, e às vezes até necessário, ter mais de um backend de autenticação.
Imagine um cenário em que o aplicativo tem usuários cadastrados diretamente no app (como um usuário administrador, por exemplo), usuários com acesso via token (senhas de app), usuários autenticados através de serviços de diretório como AD/LDAP/Keycloak e provedores de autenticação via OAuth (Google, GitHub, etc.).
Nesse caso, a aplicação pode definir um pipeline de autenticação, chamado de auth backends.
Ex:
[local_users, token, ldap, oath]
No framework Django, por exemplo, configurar esse pipeline de autenticação é bastante simples: basta adicionar as configurações e listar os backends na ordem desejada, sendo que a ordem faz toda a diferença. Já em micro-frameworks como FastAPI, é necessário definir manualmente todo o pipeline.
Modelo de Persistência
Após o usuário autenticar, independentemente da fase do pipeline em que a autenticação ocorreu, é uma boa prática manter o estado de "autenticado" por um período predefinido, evitando que todo o pipeline tenha que ser executado novamente em cada requisição.
Sessão
A forma mais tradicional de fazer isso é através de sessões (Session). O conceito é simples:
- O usuário envia as credenciais para um endpoint /login.
- Em caso de sucesso, o servidor gera um ID único para a sessão, como "123456".
- Esse ID é armazenado em algum tipo de storage (memória, banco de dados, cache).
- Além do ID, o servidor também define um timestamp de expiração, por exemplo, 24h (lease time).
- O cliente (navegador) recebe um cookie contendo o ID da sessão "123456".
Assim, tanto no lado do servidor (no storage) quanto no cliente (no cookie) existe a mesma informação: "123456 válido até 16h".
Toda vez que o cliente faz uma requisição, o cookie é enviado ao servidor, que verifica se os dados do cookie coincidem com os do storage e se a validade não expirou.
Opcionalmente, podem ser armazenadas informações adicionais como IP e dispositivo.
Existem implicações de segurança nesse modelo que podem ser mitigadas com técnicas de proteção como XSS, CSRF, entre outras.
Token
Este modelo é mais indicado quando já existe uma estrutura de autenticação por token (como no nosso caso), e a preferência é deixar a implementação a cargo do front-end.
O fluxo de trabalho é o seguinte:
- O usuário faz login por meio de um formulário em /login.html.
- O formulário envia as credenciais para o endpoint /token e recebe um token e um refresh token.
- O front-end armazena esses dados em um cookie ou local storage.
- A cada requisição, o front-end valida a expiração do token, verifica a necessidade de refresh e adiciona o token nos headers da requisição.
Este modelo não exige o armazenamento de sessões no servidor, já que não há sessão. O próprio token contém as informações de expiração, e essas informações estão assinadas.
Upgrade do ambiente
No terminal fora do container
pip-compile requirements.in --upgrade --resolver=backtracking
docker compose down
docker compose build api
docker compose up
Implementando Session no FastAPI
Session storage
Primeiro precisamos definir onde guardaremos as sessions, para nossa sorte já temos o Redis em nossa infraestrutura e podemos usar.
Começamos estabelecendo algumas configurações em default.toml
[default.session_store]
host = "@get redis.host"
port = "@get redis.port"
db = 1
expiration = "@int @jinja {{60 * 60 * 24 * 7}}"
Criando funções de gestão de session.
dundie/session.py
from __future__ import annotations
from uuid import uuid4
from dundie.config import settings
from redis import Redis
EXPIRATION = settings.session_store.get("expiration", 60 * 60 * 24)
session_store = Redis(
host=settings.session_store.host,
port=settings.session_store.port,
db=settings.session_store.db,
)
def set_session(username) -> str:
"""Creates a new random session"""
session_id = uuid4().hex
session_store.set(session_id, username, ex=EXPIRATION, nx=True)
return session_id
def get_session(session_id) -> bool | str:
"""Get data from a session_id"""
session_data = session_store.get(session_id)
return session_data and session_data.decode()
Quando o usuário fizer login através de um formulário vamos
criar a sessão usando set_session
e vamos isso diretamente no
FastAPI e depois submeter um formulário com HTML.
Session login
E, routes/auth.py
vamos adicionar:
from fastapi import ..., Form, Request
from fastapi.responses import Response
from dundie.session import set_session, session_store
...
# session auth
@router.post("/login")
async def session_login(
response: Response,
username: str = Form(),
password: str = Form(),
):
"""Cookie Based Session Auth Login"""
user = authenticate_user(get_user, username, password)
if not user or not isinstance(user, User):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
session_id = set_session(user.username)
response.set_cookie(key="session_id", value=session_id, domain="localhost")
return {"status": "logged in"}
@router.post("/logout")
async def session_logout(request: Request, response: Response):
"""Cookie Based Session Auth Logout"""
if session_id := request.cookies.get("session_id"):
response.delete_cookie(key="session_id")
session_store.delete(session_id)
return {"status": "logged out"}
Ajustando as dependencias de auth
dundie/auth.py
from dundie.session import get_session
...
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
...
def get_current_user(
request: Request, token: str = Depends(oauth2_scheme), fresh=False # pyright: ignore
) -> User:
"""Get current user authenticated"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if request:
if session_id := request.cookies.get("session_id"):
username = get_session(session_id)
if not username:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Session ID")
user = get_user(username=username)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Session user not found")
return user
if authorization := request.headers.get("authorization"):
try:
token = authorization.split(" ")[1]
except IndexError:
raise credentials_exception
if not token:
raise credentials_exception
try:
payload = jwt.decode(
token, SECRET_KEY, algorithms=[ALGORITHM] # pyright: ignore # pyright: ignore
)
username: str = payload.get("sub") # pyright: ignore
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
if fresh and (not payload["fresh"] and not user.superuser):
raise credentials_exception
return user
Agora em http://localhost:8000/docs
Form HTML
Agora precisamos de um formulário HTML para postar as credenciais de login.
ui/login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Login Form</title>
<style>
#message {
margin-bottom: 20px;
padding: 10px;
border: 1px solid transparent;
display: none;
}
#message.success {
border-color: green;
color: green;
}
#message.error {
border-color: red;
color: red;
}
</style>
</head>
<body>
<h2>Login Form</h2>
<div id="message"></div> <!-- Message div -->
<form id="loginForm">
<label for="username">Username:</label><br>
<input type="text" id="username" name="username" required><br><br>
<label for="password">password:</label><br>
<input type="password" id="password" name="password" required><br><br>
<button type="submit">Submit</button>
</form>
<script>
const messageDiv = document.getElementById('message');
function reset_message_div() {
// Reset the message div
messageDiv.style.display = 'none';
messageDiv.classList.remove('success', 'error');
}
document.getElementById('loginForm').addEventListener('submit', function(event) {
event.preventDefault(); // Prevent the default form submission
const formData = new FormData();
formData.append("username", document.getElementById("username").value);
formData.append("password", document.getElementById("password").value);
reset_message_div();
const url = `http://localhost:8000/login`;
fetch(url, {
method: 'POST',
body: formData,
credentials: 'include'
})
.then(response => {
if (response.status >= 400 && response.status <= 500) {
throw new Error(`Authentication Error ${response.status} ${response.statusText}`);
}
return response.json();
})
.then(data => {
location.href = "/add.html"
})
.catch((error) => {
console.error('Error:', error);
messageDiv.textContent = error;
messageDiv.classList.add('error');
messageDiv.style.display = 'block';
});
});
</script>
</body>
</html>
Agora em http://localhost:8001/login.html
Transaction HTML
ui/add.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Transaction Form</title>
<style>
#message {
margin-bottom: 20px;
padding: 10px;
border: 1px solid transparent;
display: none;
}
#message.success {
border-color: green;
color: green;
}
#message.error {
border-color: red;
color: red;
}
nav#controls {
float: right;
}
</style>
</head>
<body>
<nav id="controls">
<button id="login" onclick="login()">Login</button>
<span>
<button id="logout" onclick="logout()">Logout</button>
</nav>
<h2>Transaction Form</h2>
<div id="message"></div> <!-- Message div -->
<form id="transactionForm">
<label for="username">Username:</label><br>
<input type="text" id="username" name="username" required><br><br>
<label for="value">Value (positive integer):</label><br>
<input type="number" id="value" name="value" min="1" required><br><br>
<button type="submit">Submit</button>
</form>
<script>
function getCookie(name) {
let cookies = document.cookie.split(';');
for(let i = 0; i < cookies.length; i++) {
let cookie = cookies[i].trim();
if (cookie.startsWith(name + '=')) {
return cookie.substring(name.length + 1);
}
}
return null;
}
function checkSessionLogin() {
let sessionCookie = getCookie("session_id");
if (!sessionCookie) {
window.location.href = "/login.html";
}
}
checkSessionLogin();
const messageDiv = document.getElementById('message');
function reset_message_div() {
// Reset the message div
messageDiv.style.display = 'none';
messageDiv.classList.remove('success', 'error');
}
function logout() {
fetch('http://localhost:8000/logout', {
method: 'POST',
credentials: 'include'
})
.then(response => {
location.href = "/login.html"
})
.catch((error) => {
console.error('Error:', error);
messageDiv.textContent = error;
messageDiv.classList.add('error');
messageDiv.style.display = 'block';
});
}
function login() {
location.href = "/login.html"
}
document.getElementById('transactionForm').addEventListener('submit', function(event) {
event.preventDefault(); // Prevent the default form submission
const username = document.getElementById('username').value;
const value = document.getElementById('value').value;
reset_message_div();
if (value <= 0) {
messageDiv.textContent = 'Please enter a positive integer for the value.';
messageDiv.classList.add('error');
messageDiv.style.display = 'block';
return;
}
const url = `http://localhost:8000/transaction/${username}/`;
const data = { value: parseInt(value) };
fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(data),
credentials: 'include'
})
.then(response => {
if (response.status >= 400 && response.status <= 500) {
throw new Error(`Authentication Error ${response.status} ${response.statusText}`);
}
return response.json();
})
.then(data => {
messageDiv.textContent = 'Transaction successful: ' + JSON.stringify(data);
messageDiv.classList.add('success');
messageDiv.style.display = 'block';
})
.catch((error) => {
console.error('Error:', error);
messageDiv.textContent = error;
messageDiv.classList.add('error');
messageDiv.style.display = 'block';
});
});
</script>
</body>
</html>
Agora em http://localhost:8001/add.html
Finalização
Muito obrigado por ter participado do treinamento Python Web API.