Projeto Dundie API

LINUXTips - Python Web API

Olá, este é o material de apoio do projeto desenvolvido durante o treinamento Python Web API para ter acesso aos videos e grupo de alunos + emissão do certificado clique no link e matricule-se.

Durante o treinamento, você vai aprender:

  • Montar o ambiente de desenvolvimento
  • Organizar as pastas do projeto
  • Orquestrar os serviços com containers
  • Modelar o banco de dados
  • Fazer migração de dados
  • Criar uma CLI para gerenciar o projeto
  • Criar uma API com FastAPI
  • Autenticar usuários com JWT
  • Enviar e-mails com background tasks
  • Trabalhar com task queue
  • Planejar testes
  • Preparar um pipeline de testes
  • Escrever testes automatizados
  • Usar protocolo web-sockets
  • Integrar com um Front-End

Se tiver qualquer dúvida não hesite em postar em nosso grupo do Discord, lá você vai obter ajuda do intrutor e também dos demais alunos. (O link para o grupo está no primeiro item lá na plataforma)

Ahh e se você não usa o Discord, não tem problema, pode acessar https://LINUXtips/social e postar lá aba referente ao treinamento.

Faça parte do Discord da LINUXtips cliando em Convite Discord Geral (observação: no discord existem salas públicas e uma sala exclusiva para alunos.)

Siga o autor deste treinamento no twitter Bruno Rocha

Tudo pronto para começar?

Tip

Você pode mudar o tema dessa página, caso prefira pro exemplo dark ou light theme clicando no ícone na barra superior.

commit 99412ec on 2024-09-13 23:44:08

Requisitos

Opção 1: Executando localmente

  • Acesso a um computador com Python
  • Terminal Linux/Mac (ou WSL)
  • Docker e docker compose
  • Editor de código de sua preferência (VSCode, vim, micro, emacs)

Info

Durante as aulas em video o instrutor estará usando o editor vim mas você pode usar o editor de sua preferência.


Opção 2: Executando online no Gitpod

  • Browser (Chrome ou Firefox)
  • Conta no https://gitpod.io

Tip

A conta gratuita do gitpod oferece 50 horas por mês de uso e no ambiente já tem editor (VSCode), terminal Linux e containers, tudo isso rodando diretamenteem seu navegador.


Warning

Os comandos apresentados serão todos executados em um terminal Linux, se estiver no Windows recomendo usar o WSL, uma máquina virtual ou um container Linux, ou por conta própria adaptar os comandos necessários.

Repositório

Obtenha uma cópia do Repositório

  1. Faça login no github e faça um fork do respositório deste projeto clicando em:

Important

Continue apenas após ter feito o fork do repositório para a sua conta do Github.


Warning

Os blocos de código aqui apresentados que contém $ significam que é um comando que deve ser executado no terminal, o simbolo $ não faz parte do comando, este simbolo indica o prompt e provavelmente o seu terminal irá exibir $ ou algum caracter de marcação similar como > ou #.

É recomendado que você digite os comandos ao invés de copiar e colar, mas caso prefira copiar e colar lembre-se que o sinal de $ não precisa ser copiado.


Obtendo os arquivos

Opção 1: Rodando localmente em seu computador

Faça o clone o projeto para uma pasta no seu ambiente local usando git, substituindo USER pelo seu nome de usuário do github:

$ cd pasta/onde/vc/guarda/seus/projetos
$ git clone git@github.com:USER/dundie-api.git
$ cd dundie-api

Help

Para clonar usando git@... você precisa ter seu git local configurado com chave ssh, caso contrário terá que clonar usando o endereço https://github.com/USER/dundie-api


Opção 2: Rodando no seu navegador com gitpod

Caso tenha optado por utilizar o gitpod basta acessar https://gitpod.io e efetuar o login utilizando sua conta do Github.

copie e cole no navegador substituindo USER pelo seu nome de usuário.

https://gitpod.io/#https://github.com/USER/dundie-api

O gitpod irá criar um ambiente virtual para você, com tudo configurado para rodar o projeto e você terá acesso ao editor e ao terminal.

Ambiente de Desenvolvimento

Ambiente Dev é para fins de programação o conjunto de ferramentas, bibliotecas e variáveis que precisam estar disponíveis para desenvolver, testar e manter o projeto.

Preparando o Ambiente

Opção 1: Rodando no seu computador local

Dentro da pasta dundie-api e crie um ambiente virtual.

$ python -m venv .venv

E ative a virtualenv

No Linux/Mac ou Windows WSL

$ source .venv/bin/activate

No Windows Power Shell

$ .\venv\Scripts\activate.ps1

Success

O ambiente virtual ativado fará com que seu terminal exiba (.venv) juntamente do prompt, você ainda pode digitar no terminal which python para confirmar se o ambiente está mesmo ativado, o output deverá ser [...]/.venv/bin/python

Opção 2: Rodando online com o gitpod

No gitpod.io não é preciso criar um abiente virtual, o ambiente já vem configurado isoladamente.

Instalando as dependências

Com o ambiente pronto podemos agora instalar as dependências básicas do projeto que estão contidas no arquivo requirements-dev.txt.

Confirme que o arquivo já contém a lista de todas as ferramentas que usaremos para fins de desenvolvimento e debugging, confira o conteúdo do arquivo abrindo em seu editor ou através do comando cat no terminal Linux.

$ cat requirements-dev.txt

ipython         # terminal
ipdb            # debugger
sdb             # debugger remoto
pip-tools       # lock de dependencias
pytest          # execução de testes
pytest-order    # ordenação de testes
httpx==0.26.0   # requests async para testes
black           # auto formatação
flake8          # linter

Instalaremos as dependencias com a ferramenta pip que é um módulo do Python.

  1. Atualizamos o pip

    $ python -m pip install --upgrade pip
    
  2. Instalamos as dependencias de desenvolvimento

    $ python -m pip install -r requirements-dev.txt
    
  3. Instalamos o projeto em modo de desenvolvimento.

    Esta instalação permite maior facilidade nos testes e auto-complete do editor de código

    $ python -m pip install -e ".[dev]"
    

Info

Os metadados de instalação estão definidos no arquivo pyproject.toml, neste arquivo estão listados os atributos do projeto, os arquivos e módulos que fazem parte, a versão e as dependencias.

O problema que vamos resolver

logo

dunder

A Dunder Mifflin é a maior e mais bem sucedida empresa de papéis do mundo


dunder

Este é Michael Scott, gerente regional da Dunder Mifflin de Scranton, Pensilvania.

Ele se acha o melhor gerente do mundo, mas na verdade é bem sem graça e as vezes até inconveniente.



Este é o time de funcionários que o Michael gerencia, eles estão com cara de alegres (com excessão da Angela), porém nem todos estão felizes, eles estão na verdade bastante desmotivados e começaram a reclamar das piadas sem graça do Michael e da falta de reconhecimento da empresa.

dunder
Equipe: Michael, Pam, Jim, Andy, Angela, Dwight, Kelly

Projeto Dundie Rewards

A Dunder Mifflin nos contratou para desenvolver uma API para o novo projeto Dundie Rewards, que consiste em um sistema de recompensas onde cada funcionário terá uma conta e irá ganhar pontos.

Esses pontos poderão ser atribuidos pelo Michael quando ele achar que um funcionário merece, automaticamente em datas especiais ou em batimento de metas, e os funcionários poderão trocar pontos entre eles como forma de reconhecimento do trabalho dos colegas.

Ao final do ano, os funcionários vão poder trocar seus pontos por dinheiro para gastar como quiserem.

Vamos começar este projeto desenvolvendo a estrutura de pastas, a orquestração do ambiente, a modelagem de dados e a API.

Futuramente na fase 2 iremos integrar com um front-end.

Funcionalidades

Usuários

  • Registro de novos funcionários via CLI e via API
    • CLI: dundie create-user [Nome] [Email] [Password Inicial] [departamento]
    • API: POST /user/ data={nome:.., email:.., departamento:.., password:..}
    • Usuários do departamento management são considerados super usuáripos
    • O username é gerado a partir do slug do nome do usuário Pam Besly -> pam-besly
  • Autenticação de usuários com JWT token
    • Para usar a API será necessário uma chamada GET /token data={username, password} e todas as chamadas subsequentes precisarão do token informado nos HEADERS.
  • O usuário pode requisitar um token para alteração de senha

Transações

  • Transações não podem ser deletadas ou canceladas, cada transação é um item no histórico da conta do usuário em questão.
  • Qualquer usuário com um token poderá fazer uma chamada para POST /transaction/username/ data={value=100} esta chamada adiciona um novo registro na tabela Transaction contento user_id: <usuario_dono_da_conta>, from_id: <usuário que fez o depósito>, value: <integer>, date: <timestamp>
  • O usuário só poderá fazer uma transação caso o saldo da sua própria conta seja suficiente para cobrir o custo da transação.
  • O saldo de um usuário é a soma de todas as suas transações.

Endpoints:

  • POST /transaction/username/ - Registra uma transação para um usuário
    • Acesso: Geral
    • Validação: O usuário autenticado é o from_id da transação e deve ter saldo suficiente.
  • GET /transaction/username/ - Retorna as transações de um usuário incluindo seu balance (saldo todal)
    • Acesso: Manager ou username == current_user
  • GET /transaction/ - Retorna todas as transações

API Spec

Arquitetura

Fase 1

  • 1 Serviço de API
  • 1 Serviço de Banco de Dados
graph LR;
    A["API (FastAPI) fa:fa-globe"]
    B[("Banco de Dados (PG) fa:fa-cubes")]
    A --> B

Fase 2

  • 1 Serviço de API
  • 1 Serviço de Banco de Dados
  • 1 Serviço consumidor de tarefas (RQ)
  • 1 Serviço de fila de mensagens (Redis)
graph TD;
    A["API (FastAPI) fa:fa-globe"]
    B[("Banco de Dados (PG) fa:fa-cubes")]
    C>"Fila de mensagens (Redis) fa:fa-bars"]
    D[["Consumidor de tarefas (RQ) fa:fa-gears"]]
    A --> B
    A --> C
    D --> C
    D --> B

Agora vamos ver como será a estrutura dos arquivos -->

Estrutura de arquivos

Esta é estrutura deste repositório, os arquivos com * são os que você vai precisar editar ao longo deste guia.

$ tree --filesfirst -L 3 -I docs
.
├── docker-compose.yaml          # Container Orchestration
├── Dockerfile.dev               # Container Dev Image
├── MANIFEST.in                  # Arquivos do projeto
├── pyproject.toml               # Metadados do projeto
├── requirements-dev.txt         # Dev tools
├── requirements.in              # Dependencies
├── settings.toml                # Config por ambiente
├── setup.py                     # Setuptools bootstrap
├── test.sh                      # CI Pipeline
├── dundie                       # Main Package
│   ├── app.py*                  # FastAPI app
│   ├── auth.py*                 # Token JWT
│   ├── cli.py*                  # CLI app
│   ├── config.py                # Config management
│   ├── db.py*                   # Database connection
│   ├── default.toml             # Default settings
│   ├── __init__.py
│   ├── security.py*             # Password Hashing
│   ├── VERSION.txt              # SCM versioning
│   ├── models
│   │   ├── __init__.py*
│   │   ├── transaction.py*      # Models for transaction
│   │   └── user.py*             # Models for User
│   ├── routes
│   │    ├── auth.py*             # Token and Auth URLs
│   │    ├── __init__.py*
│   │    ├── transaction.py*      # Transaction URLs
│   │    └── user.py*             # User URLs
│   └── tasks
│       ├── __init__.py*
│       ├── transaction.py*      # Transaction Taks
│       └── user.py*             # User Tasks
├── postgres
│   ├── create-databases.sh      # DB startup
│   └── Dockerfile               # DB image
└── tests
    ├── conftest.py*             # Pytest config
    ├── __init__.py
    └── test_api.py*             # API tests

Info

Todos os arquivos acima já estão criados no repositório, você vai precisar apenas editar, alguns arquivos como o .secrets.toml (para guardar dados sensiveis) você irá criar localmente pois este arquivo não deverá ser comitado ao repositório.

Criando uma API base

Vamos editar o arquivo dundie/app.py e colocar a minima aplicação FastAPI só para que possamos rodar o container e testar se tudo está funcionando.

dundie/app.py

from fastapi import FastAPI

app = FastAPI(
    title="dundie",
    version="0.1.0",
    description="dundie is a rewards API",
)

Salve as alterações e agora vamos partir para a definição do container ->

Criando um container

Vamos agora verificar o Dockerfile.dev que está na raiz do repositório e será a imagem responsável por executar nossa api.

Dockerfile.dev

# Build the app image
FROM python:3.10

# Create directory for the app user
RUN mkdir -p /home/app

# Create the app user
RUN groupadd app && useradd -g app app

# Create the home directory
ENV APP_HOME=/home/app/api
RUN mkdir -p $APP_HOME
WORKDIR $APP_HOME

# install
COPY . $APP_HOME
RUN pip install -r requirements-dev.txt
RUN pip install -e .

RUN chown -R app:app $APP_HOME
USER app

CMD ["uvicorn","dundie.app:app","--host=0.0.0.0","--port=8000","--reload"]

O arquivo acima define o passo a passo para construir uma imagem de container customizada a partir da python:3.10, neste script de cosntrução da imagem estamos criando diretórios, ajustando permissões, copiando arquivos da aplicação e isntalando dependencias, além de definirmos o comando principal de execução do programa.

Com esta definição pronta o próximo passo é construir a imagem do container:

docker build -f Dockerfile.dev -t dundie:latest .

Agora em nosso sistema teremos uma imagem chamada dundie com a tag latest e podemos executar.

$ docker run --rm -it -v $(pwd):/home/app/api -p 8000:8000 dundie

INFO:     Will watch for changes in these directories: ['/home/app/api']
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [1] using StatReload
INFO:     Started server process [8]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Acesse: http://0.0.0.0:8000/docs e terá acesso a página default da OpenAPI spec que acompanha o FastAPI.

API

Ainda não temos rotas definidas portanto podemos passar o próximo passo.

Orquestrando serviços

Os serviços:

graph LR;
    A["API (FastAPI) fa:fa-globe"]
    B[("Banco de Dados (PG) fa:fa-cubes")]
    A --> B

Docker compose

Para iniciar a nossa API + o Banco de dados vamos precisar de um orquestrador de containers, em produção isso será feito com Kubernetes mas no ambiente de desenvolvimento podemos usar o docker compose.

No arquivo docker-compose.yaml

  • Definimos 2 serviços api e db
  • Informamos os parametros de build com os dockerfiles
  • Na api abrimos a porta 8000
  • Na api passamos 2 variáveis de ambiente DUNDIE_DB__uri e DUNDIE_DB_connect_args para usarmos na conexão com o DB
  • Marcamos que a api depende do db para iniciar.
  • No db informamos o setup básico do postgres e pedimos para criar 2 bancos de dados, um para a app e um para testes.

docker-compose.yaml

version: '3.9'

services:
  api:
    build:
      context: .
      dockerfile: Dockerfile.dev
    ports:
      - "8000:8000"
    environment:
      DUNDIE_DB__uri: "postgresql://postgres:postgres@db:5432/${DUNDIE_DB:-dundie}"
      DUNDIE_DB__connect_args: "{}"
      SQLALCHEMY_SILENCE_UBER_WARNING: 1
    volumes:
      - .:/home/app/api
    depends_on:
      - db
    stdin_open: true
    tty: true
  db:
    build: postgres
    image: dundie_postgres-13-alpine-multi-user
    volumes:
      - dundie_pg_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DBS=dundie, dundie_test
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres

volumes:
  dundie_pg_data:

O próximo passo é estando na raiz do repositório executar:

docker compose up -d

Info

Na primeira execução poderá demorar mais termpo pois as imagens serão construidas.

Warning

Se o comando docker compose não funcionar tente usar docker-compose (com um traço - )

Para verificar os serviços que estão rodando:

$ docker compose ps
       Name                     Command               State           Ports
    api_1             /bin/sh -c uvicorn dundie...     Up             8000
    db_1             docker-entrypoint.sh postgres     Up             5432

Tip

Os serviços ficarão em execução em segundo plano, se quiser manter o terminal aberto para acompanhar os logs pode omitir o -d ou então abrir um segundo terminal e executar docker compose logs --follow

Modelagem

Definindo os models com Pydantic

Esta será a modelagem do banco de dados completo, mas começaremos apenas com a tabela de usuários.

database

https://dbdesigner.page.link/GqDU95ApwZs7a9RH9

Vamos modelar o banco de dados definido acima usando o SQLModel, que é uma biblioteca que integra o SQLAlchemy e o Pydantic e funciona muito bem com o FastAPI.

Vamos começar a estruturar os model principal para armazenar os usuários

EDITE o arquivo dundie/models/user.py

"""User related data models"""
from typing import Optional
from sqlmodel import Field, SQLModel


class User(SQLModel, table=True):
    """Represents the User Model"""

    id: Optional[int] = Field(default=None, primary_key=True)
    email: str = Field(unique=True, nullable=False)
    username: str = Field(unique=True, nullable=False)
    avatar: Optional[str] = None
    bio: Optional[str] = None
    password: str = Field(nullable=False)
    name: str = Field(nullable=False)
    dept: str = Field(nullable=False)
    currency: str = Field(nullable=False)

    @property
    def superuser(self):
        """"Users belonging to management dept are admins."""
        return self.dept == "management"

Para que seja possivel importar e o SQLAlchemy reconhecer o nosso objeto EDITE arquivo dundie/models/__init__.py adicione

from sqlmodel import SQLModel
from .user import User

__all__ = ["User", "SQLModel"]

NOTA as tabelas Balance e Transaction iremos definir posteriormente.

Agora podemos nos conectar com o banco de dados ->

Configurações

Agora que temos pelo menos uma tabela mapeada para uma classe precisamos estabelecer conexão com o banco de dados e para isso precisamos carregar configurações

Verifique o arquivo dundie/default.toml

[default]

[default.db]
uri = ""
connect_args = {check_same_thread=false}
echo = false

Lembra que no docker-compose.yaml passamos as variáveis DUNDIE_DB... aquelas variáveis vão sobrescrever os valores definidos no default.toml, por exemplo, DUNDIE_DB__uri=... irá preencher o valor uri na seção [default.db] do arquivo default.toml

Para carregar as configurações vamos usar o plugin dynaconf que já está instalado e só precisamos carregar criando uma instancia de settings que será o objeto usado durante toda a aplicação para acessar as configurações:

Veja em dundie/config.py como estamos inicializando o plugin de configurações.

"""Settings module"""
import os

from dynaconf import Dynaconf

HERE = os.path.dirname(os.path.abspath(__file__))

settings = Dynaconf(
    envvar_prefix="dundie",
    preload=[os.path.join(HERE, "default.toml")],
    settings_files=["settings.toml", ".secrets.toml"],
    environments=["development", "production", "testing"],
    env_switcher="dundie_env",
    load_dotenv=False,
)

No arquivo acima estamos definindo que o objeto settings irá carregar variáveis do arquivo default.toml e em seguida dos arquivos settings.toml e .secrets.toml e que será possivel usar DUNDIE_ como prefixo nas variáveis de ambiente para sobrescrever os valores.

Agora já podemos acessar esses valores e criar a conexão com o banco de dados -->

Conexão com o DB

Para conectar com o banco de dados, precisamos criar um objeto engine, este objeto armazena as configurações como o endereço do banco, usuário e senha. O objeto engine é o responsável por executar as consultas SQL que usaremos para definir as tabelas e também para consultar e alterar dados.

EDITE o arquivo dundie/db.py e deixe conforme o código abaixo:

dundie/db.py

"""Database connection"""
from sqlmodel import create_engine
from .config import settings

engine = create_engine(
    settings.db.uri,  # pyright: ignore
    echo=settings.db.echo,  # pyright: ignore
    connect_args=settings.db.connect_args,  # pyright: ignore
)

Criamos um objeto engine que aponta para uma conexão com o banco de dados e para isso usamos as variáveis que lemos do settings, o objeto settings será capaz de carregar essas variáveis do ambiente ou dos arquivos .toml que definimos para configuração.

Note

O comentário # pyright: ignore só é necessário caso você esteja usando um editor com LSP que faz verificação de tipos, como o VSCode ou Neovim. Este comentário faz com que o LSP ignore a checagem de tipos para essas linhas, e é útil pois como esses valores são dinâmicos podemos receber qualquer tipo.

Migrations

Portanto agora já temos uma tabela mapeada e um conexão com o banco de dados precisamos agora garantir que a estrutura da tabela existe dentro do banco de dados.

Para isso vamos usar a biblioteca alembic que gerencia migrações, ou seja, alterações na estrutura das tabelas e automação de alteração em dados.

Começamos na raiz do repositório, no seu terminal rodando:

alembic init migrations

O alembic irá criar um arquivo chamado alembic.ini e uma pasta chamada migrations que servirá para armazenar o histórico de alterações do banco de dados.

Começaremos editando o arquivo migrations/env.py

Atenção nos comentários do snippet abaixo explicando exatamente onde efetuar cada uma das edições

# 1 - No topo do arquivo adicionamos
from dundie import models
from dundie.db import engine
from dundie.config import settings


# 2 - Perto da linha 23 mudamos de
# target_metadata = None
# para:
target_metadata = models.SQLModel.metadata

# 3 - Na função `run_migrations_offline()` mudamos
# url = config.get_main_option("sqlalchemy.url")
# para:
url = settings.db.uri

# 4 - Na função `run_migration_online` mudamos
# connectable = engine_from_config...
# para:
connectable = engine

Agora precisamos fazer só mais um ajuste edite migrations/script.py.mako e em torno da linha 10 adicione

#from alembic import op
#import sqlalchemy as sa
import sqlmodel  # linha NOVA

Agora sim podemos começar a usar o alembic para gerenciar as migrations, precisamos executar este comando dentro do shell do container.

Executando comandos dentro do container

IMPORTANTE

Todos os comandos a partir de agora serão executados no shell dentro do container e para fazer isso usaremos sempre docker compose exec antes que qualquer comando.

Experimente: docker compose exec api /bin/bash

$ docker compose exec api /bin/bash
app@c5dd026e8f92:~/api$ # este é o shell dentro do container api

# digite exit para sair

Podemos redirecionar comandos diretamente para dentro do container com docker compose exec api [comando a ser executado]

Gerando e aplicando migrations

Agora para gerar um registro inicial de migration usaremos o comando alembic revision --autogenerate e isso será executado dentro do container conforme exemplo abaixo:

$ docker compose exec api alembic revision --autogenerate -m "initial"
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.autogenerate.compare] Detected added table 'user'
  Generating /home/app/api/migrations/versions/ee59b23815d3_initial.py ...  done

Repare que o alembic identificou o nosso model User e gerou uma migration inicial que fará a criação desta tabela no banco de dados.

Podemos aplicar a migration rodando dentro do container com alembic upgrade head:

$ docker compose exec api alembic upgrade head
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> ee59b23815d3, initial

E neste momento a tabela será criada no Postgres e já podemos começar a interagir via SQL client ou através da classe User que modelamos anteriormente.

DICA

Pode usar um client como https://antares-sql.app para se conectar ao banco de dados, usar o psql na linha de comando ou abrir o shell do ipython dentro do container.

Acessando o banco de dados através do shell

$ docker compose exec api ipython
# Agora está no ipython dentro do shell do container
In [1]: 

Digite

from sqlmodel import Session, select
from dundie.db import engine
from dundie.models import User

with Session(engine) as session:
    print(list(session.exec(select(User))))

O resultado será uma lista vazia [] indicando que ainda não temos nenhum usuário no banco de dados.

Digite exit para sair do ipython.

Conclusão

Foi preciso muito boilerplate para conseguir interagir com banco de dados através do shell portanto para facilitar a nossa vida vamos adicionar uma aplicação cli onde vamos poder executar tarefas administrativas via linha de comando como criar ou listar usuários. -->

Criando a CLI

Command Line Interface é uma parte importante de todo serviço, é através dessa ferramanta que geralmente os administradores do serviço interagem com ele, seja para realizar tarefas de manutenção, configuração ou recuperar o sistema em caso de falhas.

Vamos criar uma CLI para o nosso serviço, para isso vamos usar o typer, que é uma das melhores bibliotecas para criar CLIs em Python.

Começaremos adicionando um comando shell que abrirá um shell interativo com os objetos da aplicação e um outro comando user-list para listar todos os usuários cadastrados.

EDITE dundie/cli.py

import typer
from rich.console import Console
from rich.table import Table
from sqlmodel import Session, select

from .config import settings
from .db import engine
from .models import User

main = typer.Typer(name="dundie CLI", add_completion=False)


@main.command()
def shell():
    """Opens interactive shell"""
    _vars = {
        "settings": settings,
        "engine": engine,
        "select": select,
        "session": Session(engine),
        "User": User,
    }
    typer.echo(f"Auto imports: {list(_vars.keys())}")
    try:
        from IPython import start_ipython

        start_ipython(
            argv=["--ipython-dir=/tmp", "--no-banner"], user_ns=_vars
        )
    except ImportError:
        import code

        code.InteractiveConsole(_vars).interact()


@main.command()
def user_list():
    """Lists all users"""
    table = Table(title="dundie users")
    fields = ["name", "username", "dept", "email", "currency"]
    for header in fields:
        table.add_column(header, style="magenta")

    with Session(engine) as session:
        users = session.exec(select(User))
        for user in users:
            table.add_row(*[getattr(user, field) for field in fields])

    Console().print(table)

Tip

Não se esqueça de salvar os arquivo modificado :)

E agora podemos executar.

$ docker compose exec api dundie --help

 Usage: dundie [OPTIONS] COMMAND [ARGS]...

╭─ Options ──────────────────────────────────────────────────╮
│ --help          Show this message and exit.                │
╰────────────────────────────────────────────────────────────╯
╭─ Commands ─────────────────────────────────────────────────╮
│ shell              Opens interactive shell                 │
│ user-list          Lists all users                         │
╰────────────────────────────────────────────────────────────╯

E cada um dos comandos:

O comando user-list para listar todos os usuários (que por enquanto irá retornar uma tabela vazia)

$ docker compose exec api dundie user-list
                dundie users
┏━━━━━━┳━━━━━━━━━━┳━━━━━━┳━━━━━━━┳━━━━━━━━━━┓
┃ name ┃ username ┃ dept ┃ email ┃ currency ┃
┡━━━━━━╇━━━━━━━━━━╇━━━━━━╇━━━━━━━╇━━━━━━━━━━┩
└──────┴──────────┴──────┴───────┴──────────┘

e o comando shell que irá abrir um shell interativo com os objetos da aplicação.

$ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User']

In [1]: session.exec(select(User))
Out[1]: <sqlalchemy.engine.result.ScalarResult at 0x7fb539d5e170>

In [2]: settings.db
Out[2]: <Box: {'connect_args': {}, 'uri': 'postgresql://postgres:postgres@db:5432/dundie', 'echo': False}>

Ainda não temos usuários cadastrados pois ainda está faltando uma parte importante que é o hash de senhas para os usuários, vamos resolver -->

Configurando SECRET_KEY

Precisamos ser capazes de encryptar tokens e gerar hash para as senhas dos usuários e para isso temos alguns requisitos, primeiro precisamos de uma chave secreta em nosso arquivo de settings, esta chave será usada em nosso algoritmo de criptografia quando começarmos a gerar tokens.

EDITE dundie/default.toml e adicione ao final

dundie/default.toml

[default.security]
# Set secret key in .secrets.toml
# SECRET_KEY = ""
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 600

Como explicado no próprio comentário do arquivo default.toml, vamos colocar uma secret key separada no arquivo .secrets.toml na raiz do repositório, isso é recomendável pois podemos adicionar o arquivo .secrets.toml ao .gitignore para que ele não seja enviado para o repositório e desta maneira evitamos expor a chave secreta.

CRIE o arquivo .secrets.toml (na raiz do repositório)

[development]
dynaconf_merge = true

[development.security]
# openssl rand -hex 32
SECRET_KEY = "ONLYFORDEVELOPMENT"

NOTA

Repare que estamos agora usando a seção environment e isso tem a ver com o modo como o dynaconf gerencia os settings, esses valores serão carregados apenas durante a execução em fase de desenvolvimento, em produção o dynaconf carrega apenas valores das variáveis de ambiente (recomendado) ou de uma seção similar nomeada [production].

DICA

Você pode gerar uma secret key mais segura se quiser usando:

$ python -c "print(__import__('secrets').token_hex(32))"
b9483cc8a0bad1c2fe31e6d9d6a36c4a96ac23859a264b69a0badb4b32c538f8

# OU no Linux

$ openssl rand -hex 32
b9483cc8a0bad1c2fe31e6d9d6a36c4a96ac23859a264b69a0badb4b32c538f8

Garantindo que os settings sempre tenham uma SECRET_KEY

Como a secret key será de extrema importância para o funcionamento da API precisamos garantir que esta chave de configuração esteja sempre presente antes do sistema inicializar.

EDITE dundie/config.py

# No topo faça o import de `Validator`
from dynaconf import Dynaconf, Validator  

# No final adicione a validação

settings.validators.register(  # pyright: ignore
    Validator("security.SECRET_KEY", must_exist=True, is_type_of=str),
)

settings.validators.validate()  # pyright: ignore

A partir de agora caso a SECRET_KEY não esteja disponível a aplicação não irá inicializar.

Agora sim podemos adicionar o código para geração de hash do password -->

Criando um hash

Para fazer com que o password dos usuários seja salvo como um hash ao invés de plain-text vamos criar uma função para criar o hash e outra para verificar.

Além disso vamos criar uma classe herdando de str e customizar o método validate desta forma podemos usar esta classe na definição de campo do nosso model User e o pydantic vai chamar o método validate para transformar o valor do campo em um hash.

EDITE Agora vamos o dundie/security.py e adicione alguns elementos

"""Security utilities"""
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password, hashed_password) -> bool:
    """Verifies a hash against a password"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password) -> str:
    """Generates a hash from plain text"""
    return pwd_context.hash(password)


class HashedPassword(str):
    """Takes a plain text password and hashes it.
    use this as a field in your SQLModel
    class User(SQLModel, table=True):
        username: str
        password: HashedPassword
    """

    @classmethod
    def __get_validators__(cls):
        # one or more validators may be yielded which will be called in the
        # order to validate the input, each validator will receive as an input
        # the value returned from the previous validator
        yield cls.validate

    @classmethod
    def validate(cls, v):
        """Accepts a plain text password and returns a hashed password."""
        if not isinstance(v, str):
            raise TypeError("string required")

        hashed_password = get_password_hash(v)
        # you could also return a string here which would mean model.password
        # would be a string, pydantic won't care but you could end up with some
        # confusion since the value's type won't match the type annotation
        # exactly
        return cls(hashed_password)

EDITE agora o arquivo dundie/models/user.py

No topo na linha 4

from dundie.security import HashedPassword

E no model mudamos o campo password na linha 18 para

password: HashedPassword

E no final de dundie/models/user.py uma função para gerar os usernames, transformando nomes completos como Bruno Rocha em um slug como bruno-rocha

def generate_username(name: str) -> str:
    """Generates a slug username from a name"""
    return name.lower().replace(" ", "-")

Agora sim está tudo pronto para adicionarmos ao nosso CLI um comando para criar novos usuários -->

Comando para criar usuários

EDITE dundie/cli.py e adicione no topo do arquivo:

from dundie.models.user import generate_username

e no final adicione um novo comando:

@main.command()
def create_user(
    name: str,
    email: str,
    password: str,
    dept: str,
    username: str | None = None,
    currency: str = "USD",
):
    """Create user"""
    with Session(engine) as session:
        user = User(
            name=name,
            email=email,
            password=password,  # pyright: ignore
            dept=dept,
            username=username or generate_username(name),
            currency=currency,
        )
        session.add(user)
        session.commit()
        session.refresh(user)
        typer.echo(f"created {user.username} user")
        return user

A função create_user será exposta na CLI como o subcomando create-user, ou seja, _ será substituido por - então no terminal execute para ver a mensagem de ajuda dundie create-user --help:

$ docker compose exec api dundie create-user --help
                                                    
 Usage: dundie create-user [OPTIONS] NAME EMAIL PASSWORD DEPT            
                                                    
 Create user                                        
                                                    
╭─ Arguments ──────────────────────────────────────╮
│ *    name          TEXT  [default: None]         │
│                          [required]              │
│ *    email         TEXT  [default: None]         │
│                          [required]              │
│ *    password      TEXT  [default: None]         │
│                          [required]              │
│ *    dept          TEXT  [default: None]         │
│                          [required]              │
╰──────────────────────────────────────────────────╯
╭─ Options ────────────────────────────────────────╮
│ --username        TEXT  [default: None]          │
│ --currency        TEXT  [default: USD]           │
│ --help                  Show this message and    │
│                         exit.                    │
╰──────────────────────────────────────────────────╯

E então execute o comando para criar o usuário para o gerente Michael Scott

Tip

No terminal quando uma linha fica muito longa podemos adicionar uma quebra de linha com \ e o terminal vai entender que é uma única linha.

E no caso de argumentos com espaço como o nome "Michael Scott" precisamos usar aspas para o terminal entender que é um único argumento.

Crie o usuário:

$ docker compose exec api dundie create-user \
"Michael Scott" mscott@dm.com boss123 management 

created michael-scott user

E para listar o usuário criado:

$ docker compose exec api dundie user-list

                              dundie users                               
┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ name          ┃ username      ┃ dept       ┃ email         ┃ currency ┃
┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Michael Scott │ michael-scott │ management │ mscott@dm.com │ USD      │
└───────────────┴───────────────┴────────────┴───────────────┴──────────┘

Agora podemos finalmente começar a criar a nossa API -->

Definindo Serializers

Agora vamos criar endpoints na API para efetuar as operações que fizemos através da CLI, teremos as seguintes rotas:

  • GET /user/ - Lista todos os usuários
  • POST /user/ - Cadastro de novo usuário
  • GET /user/{username}/ - Detalhe de um usuário

Serializers

A primeira coisa que precisamos é definir serializers, que são models intermediários usados para serializar e de-serializar dados de entrada e saída da API e eles são necessários pois não queremos export o model do banco de dados diretamente na API e também queremos a possibilidade de serializar campos opcionais dependendo do nível de acesso do usuário, por exemplo, admins poderão ver mais campos que usuários regulares.

EDITE dundie/models/user.py

No topo na linha 5

from pydantic import BaseModel, root_validator

No final após a linha 20

class UserResponse(BaseModel):
    """Serializer for User Response"""

    name: str
    username: str
    dept: str
    avatar: Optional[str] = None
    bio: Optional[str] = None
    currency: str


class UserRequest(BaseModel):
    """Serializer for User request payload"""

    name: str
    email: str
    dept: str
    password: str
    currency: str = "USD"
    username: Optional[str] = None
    avatar: Optional[str] = None
    bio: Optional[str] = None

    @root_validator(pre=True)
    def generate_username_if_not_set(cls, values):
        """Generates username if not set"""
        if values.get("username") is None:
            values["username"] = generate_username(values["name"])
        return values

Podemos testar os serializers em nosso shell só para ter certeza do funcionamento correto.

$ docker compose exec api dundie shell    
Auto imports: ['settings', 'engine', 'select', 'session', 'User']

In [1]: from dundie.models.user import UserRequest

In [2]: new = UserRequest(
   ...:     name="Bruno Rocha",
   ...:     email="bruno@dm.com",
   ...:     dept="Sales",
   ...:     password="1234",
   ...:  )

In [3]: new.username
Out[3]: 'bruno-rocha'

In [4]: new.currency
Out[4]: 'USD'

In [5]: db_user = User.from_orm(new)

In [6]: session.add(db_user)

In [7]: session.commit()


In [12]: session.exec(select(User).where(User.username=="bruno-rocha")).first()

Out[12]: User(bio=None, email='bruno@dm.com', username='bruno-rocha', name='Bruno Rocha', currency=
'USD', id=2, avatar=None, password='$2b$12$v/1h3sKAFCOuiKuXsThAXOBuny46TPYzKyoaBVisCFHlwaxPlKWpu', 
dept='Sales')

Como pode ver acima podemos criar usuários via API e serializar usando o UserRequest e só a partir dele criar a instancia de User que iremos salvar no banco de dados.

E da mesma forma podemos fazer o caminho inverso, serializando do banco de dados para a API em JSON.

In [19]: bruno = session.exec(select(User).where(User.username=="bruno-rocha")).first()

In [20]: from dundie.models.user import UserResponse

In [21]: UserResponse.parse_obj(bruno).json()
Out[21]: '{"name": "Bruno Rocha", "username": "bruno-rocha", "dept": "Sales", "avatar": null, "bio"
: null, "currency": "USD"}'

Injeção de dependência

O FastAPI tem um excelente sistema de injeção de dependêcias, isto é util por exemplo para quando quisermos que um endpoint seja acessivel apenas por usuários administradores, ao invés de colocarmos lógica para verificar o nivel de permissão do usuário diretamente em cada view, podemos criar uma dependência.

Outro exemplo (que vamos aplicar agora) é precisarmos garantir que cada endpoint, ao ter sua função view invocada, uma sessão de conexão com o banco de dados já esteja disponível.

Vamos criar uma dependência chamada ActiveSession que posteriormente será usada na definição das views.

No topo de dundie/db.py nas linhas 2 e 3

from fastapi import Depends
from sqlmodel import Session, create_engine

No final de dundie/db.py após a linha 13

def get_session():
    with Session(engine) as session:
        yield session


ActiveSession = Depends(get_session)

O objeto que ActiveSession é uma dependência para rotas do FastAPI quando usarmos este objeto como parâmetro de uma view o FastAPI vai executar de forma lazy este objeto e passar o retorno da função atrelada a ele como argumento da nossa view.

Neste caso teremos sempre uma conexão com o banco de dados dentro de cada view que marcarmos com session: Session = ActiveSession.

Veremos a seguir como usar esta dependência -->

Criando as Views

Agora criaremos as views (funções) para expor os serializers com os usuários que acessaremos usando a sessão de conexão ao banco de dados.

edite dundie/routes/user.py

from typing import List

from fastapi import APIRouter
from fastapi.exceptions import HTTPException
from sqlmodel import Session, select

from dundie.db import ActiveSession
from dundie.models.user import User, UserRequest, UserResponse

router = APIRouter()


@router.get("/", response_model=List[UserResponse])
async def list_users(*, session: Session = ActiveSession):
    """List all users."""
    users = session.exec(select(User)).all()
    return users


@router.get("/{username}/", response_model=UserResponse)
async def get_user_by_username(
    *, session: Session = ActiveSession, username: str
):
    """Get user by username"""
    query = select(User).where(User.username == username)
    user = session.exec(query).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user


@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(*, session: Session = ActiveSession, user: UserRequest):
    """Creates new user"""
    db_user = User.from_orm(user)  # transform UserRequest in User
    session.add(db_user)
    session.commit()
    session.refresh(db_user)
    return db_user

Acima criamos 3 views, uma para listar todos os usuários, uma para listar apenas um usuário e outra para criar um novo usuário.

repare que a rota de criação de um novo usuário temos 2 detalhes diferentes: 1) estamos usando o método post e 2) estamos retornando o status code 201 (criado) em caso de sucesso.

Em todas as views estamos usando os serializers e a injeção de dependência.

Apesar de termos criados as funções, o FastAPI ainda não sabe disso, o próximo passo será fazer o roteamento de URL -->

Roteamento de URL

Com as funções já prontas agora podemos fazer o roteamento, que é o processo de mapear uma URL como /user/ a uma função como a user_list que criarmos, ou seja, sempre que o servidor receber um request na URL /user/ irá executar a função mapeada passando todo o contexto HTTP e o FastAPI se encarrega de fazer a injeção das dependências.

Vamos começar criando um router principal que irá agrerar todas as rotas:

EDITE dundie/routes/__init__.py

from fastapi import APIRouter

from .user import router as user_router

main_router = APIRouter()

main_router.include_router(user_router, prefix="/user", tags=["user"])

E agora EDITE dundie/app.py

NO topo na linha 4

from .routes import main_router

Logo depois de app = FastAPI(...) após a linha 11

app.include_router(main_router)

E agora sim pode acessar a API e verá as novas rotas prontas para serem usadas, http://0.0.0.0:8000/docs/

user routes

Pode tentar pela web interface ou com um http client puro:

# rest_nvim
GET http://localhost:8000/user/
#+END
HTTP/1.1 200 OK
date: Fri, 23 Dec 2022 18:04:23 GMT
server: uvicorn
content-length: 220
content-type: application/json

#+RESPONSE
[
  {
    "name": "Michael Scott",
    "username": "michael-scott",
    "dept": "management",
    "avatar": null,
    "bio": null,
    "currency": "USD"
  },
  {
    "name": "Bruno Rocha",
    "username": "bruno-rocha",
    "dept": "Sales",
    "avatar": null,
    "bio": null,
    "currency": "USD"
  }
]
#+END

Ou diretamente via cURL

curl -X 'GET' -k 'http://localhost:8000/user/'

Ou criar um usuário

curl -X 'POST' -H 'Content-Type: application/json' \
  --data-raw '{"email": "pam@dm.com", "dept": "Accounting", "password": "jimjim", "name": "Pam Besly"}' \
  -k 'http://localhost:8000/user/'

Pegar um usuário pelo username

curl -X 'GET' -k 'http://localhost:8000/user/michael-scott/'
{
  "name": "Michael Scott",
  "username": "michael-scott",
  "dept": "management",
  "avatar": null,
  "bio": null,
  "currency": "USD"
}

Listar todos

curl -X 'GET' \
  'http://0.0.0.0:8000/user/' \
  -H 'accept: application/json'
[
  {
    "name": "Michael Scott",
    "username": "michael-scott",
    "dept": "management",
    "avatar": null,
    "bio": null,
    "currency": "USD"
  },
  {
    "name": "Bruno Rocha",
    "username": "bruno-rocha",
    "dept": "Sales",
    "avatar": null,
    "bio": null,
    "currency": "USD"
  },
  {
    "name": "Dwight Schrute",
    "username": "dwight-schrute",
    "dept": "Sales",
    "avatar": null,
    "bio": null,
    "currency": "USD"
  },
  {
    "name": "Pam Besly",
    "username": "pam-besly",
    "dept": "Accounting",
    "avatar": null,
    "bio": null,
    "currency": "USD"
  },
  {
    "name": "Jim Halpert",
    "username": "jim-halpert",
    "dept": "Sales",
    "avatar": null,
    "bio": null,
    "currency": "USD"
  }
]

Só tem um pequeno problema: Qualquer um consegue criar usuários em nossa API sem estar autenticado e isso não é desejável, vamos resolver este problema implementando autenticação -->

Gerando tokens

Agora que já podemos criar usuários é importante conseguirmos autenticar os usuários pois desta forma podemos limitar o acesso a alguns endpoints.

Esse será arquivo com a maior quantidade de código boilerplate.

EDITE o arquivo dundie/auth.py vamos criar as classes e funções necessárias para a implementação de JWT que é a autenticação baseada em token e vamos usar o algoritmo selecionado no arquivo de configuração.

Neste arquivo vamos criar os seguintes objetos:

  • Um esquema de autenticação baseado em oauth, este objeto é usado pelo FastAPI para exibir um formulário de login e outros controles de autenticação na página /docs.
  • Classes para serializar os 3 tipos de token que criaremos (token, refresh_token, reset_password_token)
  • Fução que cria um token usando o algoritmo especificado
  • Função que recebe o token e valida sua autenticidade
  • Funções para retornar o objeto User sempre que precisarmos saber qual usuário está autenticado
  • Dependência para injetarmos em todas as funções que necessitem de autenticação

dundie/auth.py

"""Token based auth"""
from datetime import datetime, timedelta
from typing import Callable, Optional, Union
from functools import partial

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from sqlmodel import Session, select

from dundie.config import settings
from dundie.db import engine
from dundie.models.user import User
from dundie.security import verify_password

SECRET_KEY = settings.security.secret_key  # pyright: ignore
ALGORITHM = settings.security.algorithm  # pyright: ignore


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


# Models


class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str


class RefreshToken(BaseModel):
    refresh_token: str


class TokenData(BaseModel):
    username: Optional[str] = None


# Functions


def create_access_token(
    data: dict,
    expires_delta: Optional[timedelta] = None,
    scope: str = "access_token",
) -> str:
    """Creates a JWT Token from user data

    scope: access_token or refresh_token
    """
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire, "scope": scope})
    encoded_jwt = jwt.encode(
        to_encode,
        SECRET_KEY,  # pyright: ignore
        algorithm=ALGORITHM,  # pyright: ignore
    )
    return encoded_jwt


create_refresh_token = partial(create_access_token, scope="refresh_token")


def authenticate_user(
    get_user: Callable, username: str, password: str
) -> Union[User, bool]:
    """Authenticate the user"""
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.password):
        return False
    return user


def get_user(username) -> Optional[User]:
    """Get user from database"""
    query = select(User).where(User.username == username)
    with Session(engine) as session:
        return session.exec(query).first()


def get_current_user(
    token: str = Depends(oauth2_scheme),
    request: Request = None,  # pyright: ignore
    fresh=False
) -> User:
    """Get current user authenticated"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    if request:
        if authorization := request.headers.get("authorization"):
            try:
                token = authorization.split(" ")[1]
            except IndexError:
                raise credentials_exception

    try:
        payload = jwt.decode(
            token,
            SECRET_KEY,  # pyright: ignore
            algorithms=[ALGORITHM]  # pyright: ignore
        )
        username: str = payload.get("sub")  # pyright: ignore

        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(username=token_data.username)
    if user is None:
        raise credentials_exception
    if fresh and (not payload["fresh"] and not user.superuser):
        raise credentials_exception

    return user


# FastAPI dependencies

async def get_current_active_user(
    current_user: User = Depends(get_current_user),
) -> User:
    """Wraps the sync get_active_user for sync calls"""
    return current_user


AuthenticatedUser = Depends(get_current_active_user)


async def validate_token(token: str = Depends(oauth2_scheme)) -> User:
    """Validates user token"""
    user = get_current_user(token=token)
    return user

NOTA

O objeto AuthenticatedUser é uma dependência do FastAPI e é através dele que iremos garantir que nossas rotas estejas protegidas com token.

Agora só falta registrarmos as URLs responsáveis por gerar a validar o token -->

Criando Endpoints de Auth

Agora precisamos mapear a função de geração e validação de tokens na API, expondo a URL /token para que possamos gerar um token para um usuário.

EDITE dundie/routes/auth.py

from datetime import timedelta

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

from dundie.auth import (
    RefreshToken,
    Token,
    User,
    authenticate_user,
    create_access_token,
    create_refresh_token,
    get_user,
    validate_token,
)
from dundie.config import settings

ACCESS_TOKEN_EXPIRE_MINUTES = settings.security.access_token_expire_minutes  # pyright: ignore
REFRESH_TOKEN_EXPIRE_MINUTES = settings.security.refresh_token_expire_minutes  # pyright: ignore

router = APIRouter()


@router.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
):
    user = authenticate_user(get_user, form_data.username, form_data.password)
    if not user or not isinstance(user, User):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # pyright: ignore
    access_token = create_access_token(
        data={"sub": user.username, "fresh": True},
        expires_delta=access_token_expires,
    )

    refresh_token_expires = timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)  # pyright: ignore
    refresh_token = create_refresh_token(
        data={"sub": user.username}, expires_delta=refresh_token_expires
    )

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
    }


@router.post("/refresh_token", response_model=Token)
async def refresh_token(form_data: RefreshToken):
    user = await validate_token(token=form_data.refresh_token)

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # pyright: ignore
    access_token = create_access_token(
        data={"sub": user.username, "fresh": False},
        expires_delta=access_token_expires,
    )

    refresh_token_expires = timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)  # pyright: ignore
    refresh_token = create_refresh_token(
        data={"sub": user.username}, expires_delta=refresh_token_expires
    )

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
    }

E agora vamos adicionar essas URLS ao router principal

EDITE dundie/routes/__init__.py

No topo na linha 3

from .auth import router as auth_router

E depois na linha 9

main_router.include_router(auth_router, tags=["auth"])

Vamos testar a aquisição de um token via curl. (ou pode utilizar algum outro cliente HTTP de sua preferencia como o Imsomnia ou Postman)

curl -X 'POST' \
  'http://localhost:8000/token' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'username=michael-scott&password=boss123'

passe como parametro os dados de um usuário que tenha criado via linha de comando.

Resposta:

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImZyZXNoIjp0cnVlLCJleHAiOjE2Njg2Mjg0NjgsInNjb3BlIjoiYWNjZXNzX3Rva2VuIn0.P-F3onD2vFFIld_ls1irE9rOgLNk17SNDASls31lgkU",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTY2ODY2MjY2OCwic2NvcGUiOiJyZWZyZXNoX3Rva2VuIn0.AWV8QtySYmcukxTgTa9GedLK00o6wrbyMt9opW42eyQ",
  "token_type": "bearer"
}

agora sim já conseguimos gerar o token, só falta proteger as URLs -->

Protegendo rotas

Apenas super usuários terão permissão para criar novos usuários, portanto vamos proteger a view POST /user/ com autenticação via TOKEN

EDITE dundie/auth.py e adicione no final uma dependencia para garantir que o usuário autenticado é super usuário.

async def get_current_super_user(
    current_user: User = Depends(get_current_user),
) -> User:
    if not current_user.superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN, detail="Not a super user"
        )
    return current_user


SuperUser = Depends(get_current_super_user)

Agora vamos usar essa dependencia para garantir o super usuário em nossa rota EDITE dundie/routes/user.py

No topo próximo a linha 9

from dundie.auth import SuperUser

E no roteamento da view create_user como parametro para o decorator .post passamos uma lista de dependencias que sejam satisfeitas pelo FAstAPI antes de executar o código da view, ou seja, o código só será executado caso o usuário autenticado via token seja um superusuário.


@router.post("/", response_model=UserResponse, status_code=201, dependencies=[SuperUser])
async def create_user(*, session: Session = ActiveSession, user: UserRequest):
    ...

Como adicionamos dependencies=[SuperUser] no roteamento e isso é o suficiente para o FastAPI detectar que existe pelo menos uma URL que necessita de autenticação e agora já teremos os controles de autenticação na API.

Auth

Ao tentar criar um usuário sem autenticar teremos agora um erro HTTP_401_UNAUTHORIZED e se o usuário autenticado não for um superuser termos o erro HTTP_403_FORBIDDEN

Os requests vão precisar do token, portanto o usuário primeiro precisa pedir um token na URL /token e depois usar este token na requisição protegida

curl -X 'POST' \
  'http://localhost:8000/user/?fresh=false' \
  -H 'accept: application/json' \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJtaWNoYWVsLXNjb3R0IiwiZnJlc2giOnRydWUsImV4cCI6MTY3MTgyOTc2NCwic2NvcGUiOiJhY2Nlc3NfdG9rZW4ifQ.wdIUyJS9TX2Ku8BMI_AIJhAXQb-TSHmX11qKs5C4PF0' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Kevin Malone",
  "email": "kevin@dm.com",
  "dept": "Sales",
  "password": "stacy"
}'

Erros HTTP

Quando as coisas dão errado precisamos informar o cliente HTTP com o código de status correto, erros podem acontecer quando ocorre uma falha inesperada no servidor, quando o recurdo solicitado não existe ou quando o cliente efetua uma requisição inválida.

Vamos tentar duplicar a criação de um usuário fazendo novamente a mesma chamada POST e a mensagem que receberemos é:

Faça essa chamada mais de 1x:

curl -X 'POST' -H 'Content-Type: application/json' \
  --data-raw '{"email": "pam@dm.com", "dept": "Accounting", "password": "jimjim", "name": "Pam Besly"}' \
  -k 'http://localhost:8000/user/'
HTTP/1.1 500 Internal Server Error

Internal Server Error

A mensagem de erro não ajuda muito a sabermos o que ocorreu de fato e portanto podemos curtomizar este comportamento.

Quando temos este caso expecifico o código de erro correto é o 409 Conflict que innforma que o estado interno está em conflito com o estado que está sendo enviado no request, ou seja, estamos tentando criar um usuário que já existe.

Para customizar este comportamento podemos editar o arquivo routes/user.py

# No topo
from sqlalchemy.exc import IntegrityError

# Na função `create_user`
async def create_user(.......):
    ...
    try:
        session.commit()
    except IntegrityError:
        raise HTTPException(status_code=409, detail="User already exists")

A exception IntegrityError será levantada para qualquer problemas encontrado no banco de dados portanto não é ainda a melhor opção, precisamos ser mais especificos para ajudar quem está usando a API, portanto vamos fazer as seguintes modificações:

  1. Continuar tratando a IntegrityError porém com o código 500 e mensagem de erro genérica.
  2. Adicionar um guard para garantir que o usuário a ser criado não existe.
@router.post(
    "/", response_model=UserResponse, status_code=201, dependencies=[SuperUser]
)
async def create_user(*, session: Session = ActiveSession, user: UserRequest):
    """Creates new user"""
    if session.exec(select(User).where(User.username == user.username)).first():
        raise HTTPException(status_code=409, detail="Username already taken")

    db_user = User.from_orm(user)  # transform UserRequest in User
    session.add(db_user)
    try:
        session.commit()
    except IntegrityError:
        raise HTTPException(status_code=500, detail="Database IntegrityError")

    session.refresh(db_user)
    return db_user

E agora sim teremos o retorno esperado

HTTP/1.1 409 Conflict
Username already taken

E no caso de um outro erro de integridade ai invés de mostrar apenas o erro 500 genérico informamos especificamente que se trata de um problema no banco de dados, porém sem expor o erro diretamente.

NOTA

Uma boa prática seria colocar um logger ou um analisador de exceptions como o NewRelic ou o Sentry, faremos isso em outra parte do treinamento.

Vamos agora continuar implementando as rotas de usuário -->

Update User

Agora vamos adicionar uma rota para que o usuário possa alterar o próprio perfil.

O usuário será capaz de mudar apenas os campos bio e avatar
bio será um texto e avatar a URL de uma imagem, e é claro, o usuário só poderá alterar o seu próprio perfil.

Vamos começar criando o serializer que irá receber essas informações a serem alteradas:

EDITE models/user.py e adicione:

class UserProfilePatchRequest(BaseModel):
    avatar: Optional[str] = None
    bio: Optional[str] = None

E agora EDITE routes/user.py e adicione ao final.

@router.patch("/{username}/", response_model=UserResponse)
async def update_user(
    *,
    session: Session = ActiveSession,
    patch_data: UserProfilePatchRequest,
    current_user: User = AuthenticatedUser,
    username: str
):
    user = session.exec(select(User).where(User.username == username)).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    if user.id != current_user.id and not current_user.superuser:
        raise HTTPException(status_code=403, detail="You can only update your own profile")

    # Update
    user.avatar = patch_data.avatar
    user.bio = patch_data.bio

    session.add(user)
    session.commit()
    session.refresh(user)
    return user

Agora podemos testar a rota fazendo a alteração do perfil do usuário michael-scott, lembre-se que primeiro será necessário obter um token válido com uma chamada na rota /token

curl -X 'PATCH' \
 -H 'Authorization: Bearer ...' \
 -H 'Content-Type: application/json' 
 --data-raw '{"avatar": "https://test.com/MichaelScott.png", "bio": "I am the boss"}' \
 -k 'http://localhost:8000/user/michael-scott/'

O usuário também precisará alterar a senha caso ele esqueça, vamos implementar esta funcionalidade -->

Change Password

O endpoint de alteração de senha precisa ficar separado do perfil pois este endpoint precisa de alguns detalhes extras:

  1. O usuário precisa preencher a senha e a confirmação
  2. A mudança pode ser feita pelo próprio usuário, pelo superuser ou através de um token requisitado por email (funcionalidade de esqueci a senha)

Começamos adicionando o serializer para receber o request da alteração do password.

EDITE models/user.py


# No topo
from fastapi import HTTPException, status
from dundie.security import get_password_hash

...

# No final
class UserPasswordPatchRequest(BaseModel):
    password: str
    password_confirm: str

    @root_validator(pre=True)
    def check_passwords_match(cls, values):
        """Checks if passwords match"""
        if values.get("password") != values.get("password_confirm"):
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Passwords do not match"
            )
        return values

    @property
    def hashed_password(self) -> str:
        """Returns hashed password"""
        return get_password_hash(self.password)

Para editar o password do usuário teremos as seguintes condições

# O usuário pode editar o próprio password
current_user == user

# O usuário pode editar o password de outro usuário se for superuser
current_user.supersuser is True

# ou se o usuário tiver um token válido
Query("?pwd_reset_token") is valid

Vamos implementar a lógica acima como uma dependencia do FastAPI, usaremos esta dependencia na rota de alteração de senha e a dependência irá garantir que pelo menos um dos requisitos seja satisfeito.

EDITE dundie/auth.py e no final:

async def get_user_if_change_password_is_allowed(
    *,
    request: Request,
    pwd_reset_token: Optional[str] = None,  # from path?pwd_reset_token=xxxx
    username: str,  # from /path/{username}
) -> User:
    """Returns User if one of the conditions is met.
    1. There is a pwd_reset_token passed as query parameter and it is valid OR
    2. authenticated_user is supersuser OR
    3. authenticated_user is User
    """
    target_user = get_user(username)  # The user we want to change the password
    if not target_user:
        raise HTTPException(status_code=404, detail="User not found")

    try:
        valid_pwd_reset_token = get_current_user(token=pwd_reset_token or "") == target_user
    except HTTPException:
        valid_pwd_reset_token = False

    try:
        authenticated_user = get_current_user(token="", request=request)
    except HTTPException:
        authenticated_user = None

    if any(
        [
            valid_pwd_reset_token,
            authenticated_user and authenticated_user.superuser,
            authenticated_user and authenticated_user.id == target_user.id,
        ]
    ):
        return target_user

    raise HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail="You are not allowed to change this user's password",
    )


CanChangeUserPassword = Depends(get_user_if_change_password_is_allowed)

Agora temos CanChangeUserPassword como dependencia para usar em uma rota do FastAPI isso vai garantir que a URL só será executada se todas as condições da dependencia foram resolvidas.

E agora em routes/user.py vamos criar uma rota com o método POST

NOTA

O ideal para seguir a semantica REST seria criar este método como PATCH porém formulários HTML permitem apenas GET e POST e para facilitar o trabalho do front-end vamos usar POST.

@router.post("/{username}/password/", response_model=UserResponse)
async def change_password(
    *,
    session: Session = ActiveSession,
    patch_data: UserPasswordPatchRequest,
    user: User = CanChangeUserPassword
):
    user.password = patch_data.hashed_password  # pyright: ignore
    session.add(user)
    session.commit()
    session.refresh(user)
    return user

Agora podemos testar a rota de alteração de senha, autenticando com o token válido para o usuário michael-scott por exemplo:

curl -X 'POST' \
 -H 'Authorization: Bearer ...' \
 -H 'Content-Type: application/json' 
 --data-raw '{"password": "boss1234", "password_confirm": "boss1234"}' \
 -k 'http://localhost:8000/user/michael-scott/password/'

O usuário michael-scott sendo um superuser, também tem permissão para alterar senha de outros usuários,

Agora imagine que um usuário esqueceu a própria senha, mas ao invés de pedir para o gerente ele quer ele mesmo alterar a senha, para isso vamos criar um endpoint para enviar um email com um token válido para alterar a senha e acessar o mesmo endpoint. -->

Esqueci minha senha

O próximo passo para completar a gestão de usuários é criarmos uma URL onde o usuário irá informar o seu email e o sistema vai verificar se existe um usuário com este e-mail cadastrado e então enviar um e-mail com o token para permitir a alteração de senha.

Nós já temos uma função que é capaz de gerar um token em dundie/auth.py chamada create_access_token

E vamos usar esta função para gerar o token de alteração de senha.

O fluxo será o seguinte:

  1. Usuário requisita um token de senha em :
POST /user/pwd_reset_token/

{
  "email": "michael-scott@dm.com"
}
Response: 200 Ok
"Email will be sent if user is registered"
  1. A view roteada em /user/pwd_reset_token vai fazer o seguinte:
  • Invocar a função: try_to_send_pwd_reset_email(email).
  1. A função try_to_send_pwd_reset_email irá fazer o seguinte:

  2. Procurar o usuário pelo e-mail

  3. Criar um token com expiração curta (o tempo de expiração será definido nos settings)

  4. Renderizar um template com o link para redefinir senha

  5. Enviar o e-mail

Enviando Email

Vamos começar criando uma função que irá receber alguns parametros e enviar um e-mail, teremos uma versão da função que de fato envia um e-mail via HTTP, e teremos outra que apenas escreve a mensagem em um arquivo de log simulando o envio de e-mail que será útil para testes.

EDITE dundie/tasks/user.py

import smtplib
from datetime import timedelta
from time import sleep

from sqlmodel import Session, select

from dundie.auth import create_access_token
from dundie.config import settings
from dundie.db import engine
from dundie.models.user import User


def send_email(email: str, message: str):
    if settings.email.debug_mode is True:  # pyright: ignore
        _send_email_debug(email, message)
    else:
        _send_email_smtp(email, message)


def _send_email_debug(email: str, message: str):
    """Mock email sending by printing to a file"""
    with open("email.log", "a") as f:
        sleep(3)  # pretend it takes 3 seconds
        f.write(f"--- START EMAIL {email} ---\n" f"{message}\n" "--- END OF EMAIL ---\n")


def _send_email_smtp(email: str, message: str):
    """Connect to SMTP server and send email"""
    with smtplib.SMTP_SSL(
        settings.email.smtp_server, settings.email.smtp_port  # pyright: ignore  # pyright: ignore
    ) as server:
        server.login(settings.email.smtp_user, settings.email.smtp_password)  # pyright: ignore
        server.sendmail(
            settings.email.smtp_sender,  # pyright: ignore
            email,
            message.encode("utf8"),
        )


MESSAGE = """\
From: Dundie <{sender}>
To: {to}
Subject: Password reset for Dundie

Please use the following link to reset your password:
{url}?pwd_reset_token={pwd_reset_token}

This link will expire in {expire} minutes.
"""

def try_to_send_pwd_reset_email(email):
    """Given an email address sends email if user is found"""
    with Session(engine) as session:
        user = session.exec(select(User).where(User.email == email)).first()
        if not user:
            return

        sender = settings.email.smtp_sender  # pyright: ignore
        url = settings.security.PWD_RESET_URL  # pyright: ignore
        expire = settings.security.RESET_TOKEN_EXPIRE_MINUTES  # pyright: ignore

        pwd_reset_token = create_access_token(
            data={"sub": user.username},
            expires_delta=timedelta(minutes=expire),  # pyright: ignore
            scope="pwd_reset",
        )

        send_email(
            email=user.email,
            message=MESSAGE.format(
                sender=sender,
                to=user.email,
                url=url,
                pwd_reset_token=pwd_reset_token,
                expire=expire,
            ),
        )

O próximo passo é editar o arquivo dundie/default.toml e adicionar os settings necessários para o serviço de emails.

[default.security]
...
RESET_TOKEN_EXPIRE_MINUTES = 10
PWD_RESET_URL = "https://dm.com/reset_password"

[default.email]
debug_mode = true
smtp_sender = "no-reply@dm.com"
smtp_server = "localhost"
smtp_port = 1025
smtp_user = "<replace in .secrets.toml>"
smtp_password = "<replace in .secrets.toml>"

Agora podemos abrir um terminal e testar essas funções

$ docker compose exec api dundie shell

Auto imports: ['settings', 'engine', 'select', 'session', 'User']

In [1]: from dundie.tasks.user import try_to_send_pwd_reset_email

In [2]: try_to_send_pwd_reset_email("mscott@dm.com")  # wait 3 seconds

In [3]: open("email.log").readlines()
Out[3]: 
['--- START EMAIL mscott@dm.com ---\n',
 'From: Dundie <no-reply@dm.com>\n',
 'To: mscott@dm.com\n',
 'Subject: Password reset for Dundie\n',
 '\n',
 'Please use the following link to reset your password:\n',
 'https://dm.com/reset_password?pwd_reset_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJtaWNoYWVsLXNjb3R0IiwiZXhwIjoxNjcyNjc3OTk1LCJzY29wZSI6InB3ZF9yZXNldCJ9.nAZNxHYniofTSCzBh38gPi5Qd0FoKONw1Ge6Yp40l5s\n',
 '\n',
 'This link will expire in 10 minutes.\n',
 '\n',
 '--- END OF EMAIL ---\n']

Cada e-mail enviado será adicionado ao arquivo email.log enquanto settings.email.debug_mode estiver ativado, futuramente podemos colocar os dados de um servidor smtp de verdade.

Agora a parte principal é criar uma rota que permitirá ao usuário solicitar o token de alteração de senha e disparar a task em background para o envio do e-mail.

EDITE dundie/routes/user.py e no final vamos adicionar.

# import
from dundie.tasks.user import try_to_send_pwd_reset_email


# view
@router.post("/pwd_reset_token/")
async def send_password_reset_token(*, email: str = Body(embed=True)):
    """Sends an email with the token to reset password."""
    try_to_send_pwd_reset_email(email)
    return {
        "message": "If we found a user with that email, we sent a password reset token to it."
    }

DICA

Neste endpoint estamos recebendo email apenas no corpo do request, ao invés de criarmos um serializer apenas para armazenar esta informação podemos usar o serializer genérico Body que permite receber o valor de um campo diretamente no corpo do request.

No caso de um endereço de em-mail pode ser interessante criar um serializer para efetuar a verificando que o e-mail é valido, mas isso fica como melhoria para depois.

Testando:

curl -X 'POST' -H 'Content-Type: application/json' \
--data-raw '{"email": "mscott@dm.com"}' -k \
'http://localhost:8000/user/pwd_reset_token/'
POST http://localhost:8000/user/pwd_reset_token/
#+END
HTTP/1.1 200 OK
date: Mon, 02 Jan 2023 16:42:56 GMT
server: uvicorn
content-length: 87
content-type: application/json

#+RESPONSE
{
  "message": "If we found a user with that email, we sent a password reset token to it."
}
#+END

Você pode agora verificar o conteúdo do arquivo email.log para ver se a mensagem foi realmente enviada.

$ cat email.log
...

Tarefa

No arquivo dundie/tasks/user.py estamos criando uma string MESSAGE para usar como template para o e-mail enviado, mas seria ideal salvar essa string em um arquivo separado, por exemplo pwd_reset_email_template.jinja e então usar o jinja2 para renderizar o template, lembre-se que usamos o Jinja2 no Day1 do treinamento.

Consegue fazer esta alteração?


NOTA

Por questões de privacidade nós não podemos confirmar se a operação deu certo, o usuário terá que verificar na caixa de e-mail que em nosso caso é o arquivo de log.

Mas repare que ao chamar a URL precisamos esperar 3 segundos pela resposta, o ideal é que o request seja imediato e a função taks.try_to_send_pwd_reset_email seja executada em background. -->

Enviando emails assíncronos

Ao chamar a URL /user/pwd_reset_token/ a resposta demorou 3 segundos pois estamos bloqueando o request até o e-mail ser enviado, o ideal é que isso seja feito em background, vamos transformar a chamada de try_to_send_pwd_reset_email em uma task.

O FastAPI tem uma classe chamada BackgroundTasks que nos permite adicionar funções que serão executadas em background, vamos importá-la e adicioná-la como um parâmetro da nossa rota.

As funções que adicionarmos a background_tasks serão adicionadas ao event-loop assíncrono gerenciado pelo FastAPI, desta forma não será necessário esperar o tempo de envio do e-mail, podemos retornar a resposta para o cliente e mesmo depois do request terminado a tarefa continuará executando do lado servidor.

EDITE dundie/routes/user.py

# No topo
from fastpi import BackgroundTasks
...


# Na função
@router.post("/pwd_reset_token/")
async def send_password_reset_token(
    *,
    email: str = Body(embed=True),
    background_tasks: BackgroundTasks,  # NEW
):
    background_tasks.add_task(try_to_send_pwd_reset_email, email=email)  # NEW
    return {
        "message": "If we found a user with that email, we sent a password reset token to it."
    }

Pode testar repetindo a mesma chamada anterior (ou usando a UI em /docs/)

curl -X 'POST' -H 'Content-Type: application/json' \
--data-raw '{"email": "mscott@dm.com"}' -k \
'http://localhost:8000/user/pwd_reset_token/'

Repare que agora a resposta é instantânea e o e-mail é enviado em background.


Assim terminamos a API de gestão de usuários e Auth por enquanto

DICA

A tareda de envio de e-mail é bastante simples e não muito complexa em questões de resiliência a falhas, se o e-mail falhar o envio, o usuário simplesmente não vai receber e portanto irá tentar novamente.

Existem casos onde as tarefas precisam de um pouco mais de robustes e controles como retry e auditoria, neste caso usaremos um gerenciador de filas como Celery ou Python-RQ, mas isso veremos adiante.

Com estas rotas agora já podemos ter um front-end integrado para a gestão de usuários, mas o nosso próximo passo será cuidar da API de transações -->

Modelagem de dados

A modelagem dos dados será a seguinte (considerando que já concluimos a tabela de User), fica faltando implementar as outras 2 tabelas.

database

https://dbdesigner.page.link/GqDU95ApwZs7a9RH9

Portanto criaremos os models para Transaction e Balance

EDITE dundie/models/transaction.py

from datetime import datetime
from typing import TYPE_CHECKING, Optional

from sqlmodel import Field, Relationship, SQLModel

if TYPE_CHECKING:
    from dundie.models.user import User


class Transaction(SQLModel, table=True):
    """Represents the Transaction Model"""

    id: Optional[int] = Field(default=None, primary_key=True)
    user_id: int = Field(foreign_key="user.id", nullable=False)
    from_id: int = Field(foreign_key="user.id", nullable=False)
    value: int = Field(nullable=False)
    date: datetime = Field(default_factory=datetime.utcnow, nullable=False)

    # Populates a `.incomes` on `User`
    user: Optional["User"] = Relationship(
        back_populates="incomes",
        sa_relationship_kwargs={"primaryjoin": 'Transaction.user_id == User.id'},
    )
    # Populates a `.expenses` on `User`
    from_user: Optional["User"] = Relationship(
        back_populates="expenses",
        sa_relationship_kwargs={"primaryjoin": 'Transaction.from_id == User.id'},
    )


class Balance(SQLModel, table=True):
    """Store the balance of a user account"""

    user_id: int = Field(
        foreign_key="user.id",
        nullable=False,
        primary_key=True,
        unique=True,
    )
    value: int = Field(nullable=False)
    updated_at: datetime = Field(
        default_factory=datetime.utcnow,
        nullable=False,
        sa_column_kwargs={"onupdate": datetime.utcnow}
    )

    # Populates a `._balance` on `User`
    user: Optional["User"] = Relationship(back_populates="_balance")

Como estamos includindo relacionamentos entre os models Transaction, Balance e User precisamos implementar as respectivas mudanças no dundie/models/user.py

  • incomes irá retornar uma query com todos os pontos que o usuário ganhou
  • expenses irá retornar uma query com todos os pontos que o usuário gastou
  • E balance irá retornar o saldo atual do usuário, para balance estamos usando um relacionamento one-to-one, por isso precisamos usar o lazy="dynamic" e uma property do Python para garantir que caso o saldo seja inexistente sempre tenhamos um valor de retorno padrão.
from typing import TYPE_CHECKING, Optional


class User(...):
    ...

    # Populates a `.user` on `Transaction`
    incomes: Optional[list["Transaction"]] = Relationship(
        back_populates="user",
        sa_relationship_kwargs={"primaryjoin": 'User.id == Transaction.user_id'},
    )
    # Populates a `.from_user` on `Transaction`
    expenses: Optional[list["Transaction"]] = Relationship(
        back_populates="from_user",
        sa_relationship_kwargs={"primaryjoin": 'User.id == Transaction.from_id'},
    )
    # Populates a `.user` on `Balance`
    _balance: Optional["Balance"] = Relationship(
        back_populates="user",
        sa_relationship_kwargs={"lazy": "dynamic"}
    )

    @property
    def balance(self) -> int:
        """Returns the current balance of the user"""
        if (user_balance := self._balance.first()) is not None:  # pyright: ignore
            return user_balance.value
        return 0

E por fim adicionamos o novo model ao contexto do dundie/models/__init__.py

from sqlmodel import SQLModel
from .user import User
from .transaction import Transaction, Balance

__all__ = ["User", "SQLModel", "Transaction", "Balance"]

Criando as migrations

Com os models criados pediamos ao alembic para criar o arquivo de migration com as mudanças que precisam ser aplicadas ao Postgresql.

$ docker compose exec api alembic revision --autogenerate -m "transaction"
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.

INFO  [alembic.autogenerate.compare] Detected added table 'balance'
INFO  [alembic.autogenerate.compare] Detected added table 'transaction'

INFO  [alembic.ddl.postgresql] Detected sequence named 'user_id_seq' as owned by integer column 'user(id)', assuming SERIAL and omitting
  Generating /home/app/api/migrations/versions/8af1cd3be673_transaction.py ...  done

E em sequencia aplicamos para criar as tabelas no banco de dados:

$ docker compose exec api alembic upgrade head
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade f39cbdb1efa7 -> b0abf3428204, transaction

Neste momento nossas tabelas transaction e balance já devem estar criadas.

Antares SQL

Agora vamos criar a lógica de negócios que cuidará da adição de novas transações -->

Transaction Business Logic

A operação de adicionar uma transação será feita com uma requisição POST ao endpoint /transaction/{username}/ e o corpo da transaction será:

{
  "value": "integer",
}

O usuário autenticado através de token será usado para popular o campo from_id e o campo date será preenchido automaticamente.

NOTE Via CLI também será possivel adicionar transaction, e neste caso o from_id será o user admin (que precisamos garantir a criação via migrations)

Fluxo de operação

graph TD
    A[Requisição POST] --> B["Obter usuários user_id e from_id"]
    B --> B1[[Chegagem de saldo e permissões]]
    B1 --> C["Procedimento permitido?"]
    C --> D{Sim}
    C --> E{Não}
    D --> F[Adicionar Transaction]
    F --> G[Atualizar saldo]
    G --> H[Dar commit na session]
    E --> I[Retornar com erro]

Atualizando Saldo

Vamos criar uma função com a lógica necessária para adicionar transaction e atualizar o saldo baseando-se nas regras anteriores.

EDITE dundie/tasks/transaction.py

from typing import Optional
from sqlmodel import Session
from dundie.db import engine
from dundie.models import User, Transaction, Balance


class TransactionError(Exception):
    """Can't add transaction"""


def add_transaction(
    *,
    user: User,
    from_user: User,
    value: int,
    session: Optional[Session] = None
):
    """Adds a new transaction to the specified user.

    params:
        user: The user to add transaction to.
        from_user: The user where amount is coming from or superuser
        value: The value being added
    """
    if not from_user.superuser and from_user.balance < value:
        raise TransactionError("Insufficient balance")

    session = session or Session(engine)

    transaction = Transaction(user=user, from_user=from_user, value=value)  # pyright: ignore
    session.add(transaction)
    session.commit()
    session.refresh(user)
    session.refresh(from_user)

    for holder in (user, from_user):
        total_income = sum([t.value for t in holder.incomes])  # pyright: ignore
        total_expense = sum([t.value for t in holder.expenses])  # pyright: ignore
        balance = session.get(
            Balance, holder.id
        ) or Balance(user=holder, value=0)  # pyright: ignore
        balance.value = total_income - total_expense
        session.add(balance)

    session.commit()

A função add_transaction recebe como parâmetros o user que receberá a transação, o from_user que é o usuário que está enviando a transação e o value que é o valor da transação, esta função será executado tanto via CLI quanto via REST API.

Para que funcione via CLI precisamos garantir que o sistema sempre tenha um usuário admin padrão para o sistema, vamos garantir a existência deste usuário -->

Data Migrations

Até agora usamos o alembic para criar migrations de estrutura (DDL) em operações de criação ou alteração de campos e tabelas.

Entretanto, em alguns casos precisamos criar dados para alimentar a aplicação, como por exemplo criar um usuário administrador para que possamos acessar a aplicação.

Sempre que precisar garantir a existência de dados alimentados em tabelas do sistema usaremos o conceito de Data Migrations.

Começamos criando uma migration vazia para efetuarmos a operação de adição do usuário.

$ docker compose exec api alembic revision -m "ensure_admin_user"
  Generating /home/app/api/migrations/versions/9aa820fb7f01_ensure_admin_user.py ...  done

Repare que dessa vez não usamos --autogenerate pois essa migration estará vazia, e neste caso vamos manualmente escrever o código que desejamos que seja executado.

Edite o arquivo criado em migrations/versions/9aa820fb7f01_ensure_admin_user.py

OBS O arquivo criado ai no seu sistema pode ter um nome diferente, mas o conteúdo é o mesmo.

"""ensure_admin_user

Revision ID: 9aa820fb7f01
Revises: 6f4df3b5e155
Create Date: 2023-01-06 13:13:37.907183

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel
from dundie.models.user import User  # NEW
from sqlmodel import Session  # NEW

# revision identifiers, used by Alembic.
revision = '9aa820fb7f01'
down_revision = '6f4df3b5e155'
branch_labels = None
depends_on = None


def upgrade() -> None:  # NEW
    bind = op.get_bind()
    session = Session(bind=bind)

    admin = User(
        name="Admin",
        username="admin",
        email="admin@dm.com",
        dept="management",
        currency="USD",
        password="admin",  # pyright: ignore
    )
    # if admin user already exists it will raise IntegrityError
    try:
        session.add(admin)
        session.commit()
    except sa.exc.IntegrityError:
        session.rollback()


def downgrade() -> None:
    pass

Tarefa

A migration acima irá setar o password como admin e é muito importante que você defina um password mais complexo ou que faça a alteração corretamente em ambientes de produção.

Uma dica é que tenha no arquivo de settings um campo para definir o password inicial de admin e então na migration ao invés de passarmos admin podemos ler de settings.DEFAULT_ADMIN_PASSWORD, para essa variável existir ela precisa estar no arquivo default.toml e então poderá ser sobrescrita usando variável de ambiente DUNDIE_DEFAULT_ADMIN_PASSWORD.

Essa alteração eu vou deixar para você fazer, pode ser depois, no final deste tutorial todas as tarefas estarão listadas.

Agora salve o arquivo e aplique a migration.

$ docker compose exec api alembic upgrade head                   
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade 6f4df3b5e155 -> 9aa820fb7f01, ensure_admin_user 

Agora toda vez que as migrations forem aplicadas, ação que será executada sempre que houver nova atualização ou deploy, garantimos que o usuário admin será criado.

E agora que temos a certeza que o admin vai sempre existir podemos partir para a criação de um comando na CLI -->

Transaction CLI

Agora vamos criar um comando para adicionar saldo via CLI, sempre que feito via CLI o usuário de origem dos pontos from_id será o admin.

O comando vai simplesmente chamar a função add_transaction que criamos anteriormente mas colocaremos lógica adicional para garantir a existencia dos usuários e para formatar os dados aprensentados em uma tabela no terminal.

EDITE dundie/cli.py e adicione um novo comando no final do arquivo.

# No topo

from dundie.tasks.transaction import add_transaction
from dundie.models.transaction import Transaction, Balance

# No comando `shell` adicione novos objetos
def shell():
    ...
    _vars = {
      ...
      "Transaction": Transaction,
      "Balance": Balance,
      "add_transaction": add_transaction,
    }

# Crie o comando que adiciona transactions 
@main.command()
def transaction(
    username: str,
    value: int,
):
    """Adds specified value to the user"""

    table = Table(title="Transaction")
    fields = ["user", "before", "after"]
    for header in fields:
        table.add_column(header, style="magenta")

    with Session(engine) as session:
        from_user = session.exec(select(User).where(User.username == "admin")).first()
        if not from_user:
            typer.echo("admin user not found")
            exit(1)
        user = session.exec(select(User).where(User.username == username)).first()
        if not user:
            typer.echo(f"user {username} not found")
            exit(1)

        from_user_before = from_user.balance
        user_before = user.balance
        add_transaction(user=user, from_user=from_user, session=session, value=value)
        table.add_row(from_user.username, str(from_user_before), str(from_user.balance))
        table.add_row(user.username, str(user_before), str(user.balance))

        Console().print(table)

E para usar podemos fazer o seguinte no terminal:

$ docker compose exec api dundie transaction jim-halpert 900

          Transaction           
┏━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━┓
┃ user        ┃ before ┃ after ┃
┡━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━┩
│ admin       │ 0      │ -900  │
│ jim-halpert │ 0      │ 900   │
└─────────────┴────────┴───────┘

O usuário admin será ficará com saldo negativo e não tem limite de transferencia, assim como qualquer usuário que seja super-user.

Tarefa

Ao chamarmos o comando dundie transaction assim como o user-list o retorno é mostrado em uma tabela formatada no terminal, em alguns casos seria interessante poder passar um argumento --format=json e obter o retorno em formato JSON para posterior tratamento.

Consegue adicionar essa funcionalidade?

Agora vamos partir para a API adicionando a mesma funcionalidade -->

Transaction API

Agora podemos finalmente criar o endpoint na API que vai fornecer a mesma funcionalidade, porém com algumas diferenças entre a implementação que fizemos no CLI, no caso da API o from_user será o usuário que estiver autenticado.

EDITE dundie/routes/transaction.py e adicione o seguinte código:

from fastapi import APIRouter, Body, HTTPException
from dundie.auth import AuthenticatedUser
from dundie.db import ActiveSession
from dundie.models import User
from dundie.tasks.transaction import add_transaction, TransactionError
from sqlmodel import select, Session

router = APIRouter()


@router.post('/{username}/', status_code=201)
async def create_transaction(
    *,
    username: str,
    value: int = Body(embed=True),
    current_user: User = AuthenticatedUser,
    session: Session = ActiveSession
):
    """Adds a new transaction to the specified user."""
    user = session.exec(select(User).where(User.username == username)).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    try:
        add_transaction(user=user, from_user=current_user, value=value, session=session)
    except TransactionError as e:
        raise HTTPException(status_code=400, detail=str(e))

    # At this point there was no error, so we can return
    return {"message": "Transaction added"}

Agora podemos adicionar essas rotas no router principal editando dundie/routes/__init__.py

from fastapi import APIRouter
from .auth import router as auth_router
from .user import router as user_router
from .transaction import router as transaction_router

main_router = APIRouter()

main_router.include_router(auth_router, tags=["auth"])
main_router.include_router(user_router, prefix="/user", tags=["user"])
main_router.include_router(transaction_router, prefix="/transaction", tags=["transaction"])

Neste momento o endpoint já deve aparecer na API.

OBS o parâmetro fresh que vemos na API se refere a possibilidade de renovar o token de autenticação.

E podemos testar fazendo uma requisição HTTP.

Lembre-se de trocar o token pelo token gerado a partir da URL /token/, por exemplo, gerando um token para o usuário admin ou outro superuser permitirá a adição de pontos infinitos, usando token de um usuário comum permitirá apenas o envio de pontos comportados pelo saldo do usuário.

Adicionando 300 pontos ao usuário Bruno Rocha.

$ curl -X 'POST' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer TOKEN_AQUI' \
  --data-raw '{"value": 300}' \
  -k 'http://localhost:8000/transaction/bruno-rocha/'

Adicione algumas transactions e vamos partir para a API de filtragem e exibição -->

Filtrando dados

Agora que já podemos trocar pontos entre usuários vamos criar endpoints onde será possivel consultar as transações e saldos.

  • POST /transaction/{username}/ (feito) - adiciona transação
  • GET /transaction/ - Lista todas as transações
    • Se superuser exibe todas, caso contrário apenas as próprias.
    • permite filtros: ?from_user=username, ?user=username
    • permite ordenação: ?order_by=from_user,user,value,date
    • permite paginação: ?page=1&size=10

Antes de criarmos o endpoint precisamos criar um model de saida, TransactionResponse para evitar o retorno do próprio model do banco de dados e se fizermos isso em dundie/models/transaction.py teremos um problema de circular imports.

- # !!!! Exemplo em dundie/models/transaction.py
- from dundie.models.user import User   # <- CIRCULAR IMPORT 

Para contornar este problema de maneira simples, vamos agora criar um novo arquivo, desta forma isolamos o import e evitamos o import circular.

OBS Neste momento vamos colocar apenas o serializar para Transaction neste novo módulo mas futuramente podemos mover todos os serializers definidos em models/user.py e models/transaction.py para este mesmo módulo também.

Neste serializer vamos utilizar root_validator para criar campos que são calculados no momento da serialização.

CRIE o arquivo dundie/models/serializers.py

VocÊ pode criar usando touch dundie/models/serializers.py ou usando seu IDE ou navegador de arquivos.

from datetime import datetime
from typing import Optional

from pydantic import BaseModel, root_validator
from sqlmodel import Session

from dundie.db import engine

from .user import User


class TransactionResponse(BaseModel, extra="allow"):
    id: int
    value: int
    date: datetime

    # These 2 fields will be calculated at response time.
    user: Optional[str] = None
    from_user: Optional[str] = None

    @root_validator(pre=True)
    def get_usernames(cls, values: dict):
        with Session(engine) as session:
            user = session.get(User, values["user_id"])
            values["user"] = user and user.username
            from_user = session.get(User, values["from_id"])
            values["from_user"] = from_user and from_user.username
        return values

Podemos testar no shell com:

$ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User', 
               'Transaction', 'Balance', 'add_transaction']

In [1]: from dundie.models.serializers import TransactionResponse

In [2]: t = session.get(Transaction, 1)

In [3]: TransactionResponse.parse_obj(t)
Out[3]: TransactionResponse(
    value=100, 
    date=datetime.datetime(2023, 1, 6, 12, 21, 55, 30204), 
    user='bruno-rocha', 
    from_user='michael-scott', 
    user_id=2, 
    from_id=1, 
    id=1
)

Agora vamos EDITAR o arquivo dundie/routes/transaction.py

# No topo

from fastapi_pagination import Page, Params
from fastapi_pagination.ext.sqlmodel import paginate
from dundie.models import Transaction
from dundie.models.serializers import TransactionResponse
from sqlmodel import text
from sqlalchemy.orm import aliased

# No final do arquivo
@router.get("/", response_model=Page[TransactionResponse])
async def list_transactions(
    *,
    current_user: User = AuthenticatedUser,
    session: Session = ActiveSession,
    params: Params = Depends(),
    from_user: Optional[str] = None,
    user: Optional[str] = None,
    order_by: Optional[str] = None,
):
    query = select(Transaction)

    # Optional `AND` filters
    if user:
        query = query.join(
            User, Transaction.user_id == User.id
        ).where(User.username == user)
    if from_user:
        FromUser = aliased(User)  # aliased needed to desambiguous the join
        query = query.join(
            FromUser, Transaction.from_id == FromUser.id
        ).where(FromUser.username == from_user)

    # Mandatory access filter
    # regular users can only see their own transactions
    if not current_user.superuser:
        query = query.where(
            Transaction.user_id == current_user.id | Transaction.from_id == current_user.id
        )

    # Ordering based on &order_by=date (asc) or -date (desc)
    if order_by:
        order_text = text(
          order_by.replace("-", "") + " " + ("desc" if "-" in order_by else "asc")
        )
        query = query.order_by(order_text)

    # wrap response_model in a pagination object {"items": [], total, page, size }
    return paginate(query=query, session=session, params=params)

Agora temos um novo endpoint listando todas as transactions e com os filtros que especificamos.

$ curl 'GET' -H 'Content-Type: application/json' \
    -H 'Authorization: Bearer TOKEN_AQUI' \
    -k 'http://localhost:8000/transaction/\
    ?page=1&size=2&from_user=michael-scott&user=bruno-rocha&order_by=-date'
#+RESPONSE
{
  "items": [
    {
      "id": 12,
      "value": 300,
      "date": "2023-01-10T17:08:29.953452",
      "user": "bruno-rocha",
      "from_user": "michael-scott",
      "from_id": 1,
      "user_id": 2
    },
    {
      "id": 11,
      "value": 112,
      "date": "2023-01-10T17:07:49.296277",
      "user": "bruno-rocha",
      "from_user": "michael-scott",
      "from_id": 1,
      "user_id": 2
    }
  ],
  "total": 6,
  "page": 1,
  "size": 2
}
#+END

NOTA

Repare que como usamos o plugin fastapi_pagination agora o formato da resposta está diferente contendo items, total, page e size

Revisão da API

Agora que já temos bastante funcionalidade na API vamos revisar e identificar o que está faltando.

Auth

  • POST /token - login via formulário para gerar acccess token
  • POST /refresh_token - Obter um novo token sem a necessidade de fazer login novamente

User

  • GET /user/ - Lista todos os usuários
  • GET /user/{username} - Lista um usuário específico
  • POST🔒 /user/ - Cria um novo usuário
  • PATCH🔒 /user/{username} - Altera informações do usuário
  • POST /user/{username}/password - Altera a senha do usuário (?pwd_reset_token ou 🔒)
  • POST /user/pwd_reset_token/ - Solicita um token via email para resetar a senha (?email)

Transaction

  • POST🔒 /transaction/ - Cria uma nova transaction de from_user para user
  • GET🔒 /transaction/ - Lista transactions do usuário logado (ou todas em caso de superuser)
    • filters: user, from_user
    • sort: order_by=date (asc) ou -date (desc)
    • pagination: page, size

Mas ainda falta informação neste retorno, onde está o saldo total do usuário? vamos resolver -->

Expondo saldo do usuário

Na listagem de usuário está faltando exibir o saldo total do usuário, esta é uma informação sensivel e portanto estará disponível apenas em alguns casos.

  • ?show_balance=true for passado na URL das rotas GET de /user/
  • O usuário logado é superuser ou
  • O usuário logado está acessando sua própria conta

EDITE o arquivo dundie/auth.py e vamos adicionar mais uma dependencia baseada em autenticação.


async def show_balance_field(
    *,
    request: Request,
    show_balance: Optional[bool] = False,  # from /user/?show_balance=true
) -> bool:
    """Returns True if one of the conditions is met.
    1. show_balance is True AND
    2. authenticated_user.superuser OR
    3. authenticated_user.username == username
    """
    if not show_balance:
        return False

    username = request.path_params.get("username")

    try:
        authenticated_user = get_current_user(token="", request=request)
    except HTTPException:
        authenticated_user = None

    if any(
        [
            authenticated_user and authenticated_user.superuser,
            authenticated_user and authenticated_user.username == username,
        ]
    ):
        return True

    return False


ShowBalanceField = Depends(show_balance_field)

Agora precisamos de um serializer contendo o campo balance e posteriormente no endpoint usaremos este serializer como retorno apenas quando a dependência acima for satisfeita, usando uma abordagem chamada conditional response model

EDITE dundie/models/user.py

# Logo abaixo da classe UserResponse

class UserResponseWithBalance(UserResponse):
    balance: Optional[int] = None

    @root_validator(pre=True)
    def set_balance(cls, values: dict):
        """Sets the balance of the user"""
        instance = values["_sa_instance_state"].object
        values["balance"] = instance.balance
        return values

Agora EDITE o dundie/routes/user.py e vamos usar a dependencia nos endpoints list_users e get_user_by_username e além de adicionar a dependencia vamos alterar o responde_model tornando o condicional.

# IMPORTS 
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from pydantic import parse_obj_as
from dundie.auth import ShowBalanceField
from dundie.models.user import UserResponseWithBalance

# list_users 

@router.get(
    "/",
    response_model=List[UserResponse] | List[UserResponseWithBalance],
    response_model_exclude_unset=True,
)
async def list_users(
    *, session: Session = ActiveSession, show_balance_field: bool = ShowBalanceField
):
    """List all users.

    NOTES:
    - This endpoint can be accessed with a token authentication
    - show_balance query parameter takes effect only for authenticated superuser.
    """
    users = session.exec(select(User)).all()
    if show_balance_field:
        users_with_balance = parse_obj_as(List[UserResponseWithBalance], users)
        return JSONResponse(jsonable_encoder(users_with_balance))
    return users

# get user by username 
@router.get(
    "/{username}/",
    response_model=UserResponse | UserResponseWithBalance,
    response_model_exclude_unset=True,
)
async def get_user_by_username(
    *, session: Session = ActiveSession, username: str, show_balance_field: bool = ShowBalanceField
):
    """Get user by username"""
    query = select(User).where(User.username == username)
    user = session.exec(query).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    if show_balance_field:
        user_with_balance = parse_obj_as(UserResponseWithBalance, user)
        return JSONResponse(jsonable_encoder(user_with_balance))
    return user

Você pode testar essa funcionalidade fazendo chamadas a URL /user/ e /user/{username} e verificar que quando o argumento ?show_balance=true for passado na URL, o serializer de retorno irá conter o campo do saldo, mas isso só será feito se o usuário for o superuser, ou o próprio usuário autenticado.

A API está pronta!

Não é bem assim... nada está pronto enquanto não tiver cobertura de testes -->

Definindo um pipeline

O Pipeline de testes será

  1. Garantir que o ambiente está em execução com o docker compose
  2. Garantir que existe um banco de dados dundie_test e que este banco está vazio.
  3. Executar as migrations com alembic e garantir que funcionou
  4. Executar os testes com Pytest
  5. Apagar o banco de dados de testes

Vamos adicionar um comando reset_db no cli

CUIDADO

Muito cuidado com esse comando, ele apaga todo o conteúdo do banco de dados!!!

edite dundie/cli.py

# imports
from .db import engine, SQLModel


# Final
@main.command()
def reset_db(
    force: bool = typer.Option(
        False, "--force", "-f", help="Run with no confirmation"
    )
):
    """Resets the database tables"""
    force = force or typer.confirm("Are you sure?")
    if force:
        SQLModel.metadata.drop_all(engine)

O comando acima poderá ser executado com a flag -f que irá pualr a etapa de confirmação.

Em um ambiente de CI geralmente usamos Github Actions ou Jenkins para executar esses passos, em nosso caso vamos usar um script em bash para executar essas tarefas, no treinamento Python Automation que também faz parte do pacote Python Expert será abordado como automatizar esses processos usando os principais serviços de CI.

Confira o conteúdo do arquivo test.sh na raiz do repositório.

#!/usr/bin/bash

# Start environment with docker compose
DUNDIE_DB=dundie_test docker compose up -d

# wait 5 seconds
sleep 5

# Ensure database is clean
docker compose exec api dundie reset-db -f
docker compose exec api alembic stamp base

# run migrations
docker compose exec api alembic upgrade head

# run tests
docker compose exec api pytest -v -l --tb=short --maxfail=1 tests/

# Stop environment
docker compose down
  1. definimos um banco de dados diferente usando a variável DUNDIE_DB
  2. iniciamos o ambiente com o docker compose
  3. esperamos 5 segundos para garantir que o banco de dados está pronto
  4. resetamos o banco de dados
  5. executamos as migrations para garantir que temos todas as tabelas e dados
  6. executamos os testes usando pytest
  7. finalizamos o ambiente

NOTA

Em caso de falha nos testes o ambiente não será parado, permitindo assim o debugging com o ambiente em execução.

O próximo passo é configurar o nosso test-runner, o pytest -->

Configurando Pytest

Para os tests vamos utilizar o Pytest para testar algumas rotas da API, o Pytest pode ser configurado através de hooks e fixtures que ficam no arquivo tests/conftest.py, fixtures são geralmente funções que provêm funcionalidades que serão injetadas (via injeção de dependencias) em cada teste que escrevermos, portanto se queremos testar multiplos usuários, começamos criando fixtures que preparam clientes HTTP autenticados com os tokens de cada um desses usuários.

Uma outra coisa importante que faremos é apontar o banco de dados para o banco de dados de teste que iniciamos no script test.sh

Setup

  1. Obter um token para o usuário admin
  2. Criar usuário1
  3. Obter um token para o usuário1
  4. Criar usuario2
  5. Obter um token para o usuario2
  6. Criar usuario3
  7. Obter um token para o usuario3

Durante o setup teremos fixtures do Pytest já configuradas com clientes HTTP para acessar a API com qualquer um dos usuários ou de forma anonima.

Começamos configurando o Pytest

EDITE tests/conftest.py

import os

import pytest
from fastapi.testclient import TestClient
from sqlalchemy.exc import IntegrityError

from dundie.app import app
from dundie.cli import create_user

os.environ["DUNDIE_DB__uri"] = "postgresql://postgres:postgres@db:5432/dundie_test"


@pytest.fixture(scope="function")
def api_client():
    """Unauthenticated test client"""
    return TestClient(app)


def create_api_client_authenticated(username, dept="sales", create=True):
    """Creates a new api client authenticated for the specified user."""
    if create:
        try:
            create_user(name=username, email=f"{username}@dm.com", password=username, dept=dept)
        except IntegrityError:
            pass

    client = TestClient(app)
    token = client.post(
        "/token",
        data={"username": username, "password": username},
        headers={"Content-Type": "application/x-www-form-urlencoded"},
    ).json()["access_token"]
    client.headers["Authorization"] = f"Bearer {token}"
    return client


@pytest.fixture(scope="function")
def api_client_admin():
    return create_api_client_authenticated("admin", create=False)


@pytest.fixture(scope="function")
def api_client_user1():
    return create_api_client_authenticated("user1", dept="management")


@pytest.fixture(scope="function")
def api_client_user2():
    return create_api_client_authenticated("user2")


@pytest.fixture(scope="function")
def api_client_user3():
    return create_api_client_authenticated("user3")

Basta salvar o arquivo conftest.py e para cada sessão de testes o pytest vai se certificar que cada uma das fixtures definidas esteja disponível.

INFO

No treinamento Python Automation é abordado o tema testes e pytest com maior profundidade, se você quiser saber mais sobre o assunto, recomendo que assista.

Agora podemos escrever os testes -->

Testes de API

Plano de testes

Os casos de uso que iremos testar:

  • Como um usuário anonimo consigo listar os usuarios e não posso ver o saldo
  • Como um usuário anonimo consigo listar os detalhes de um usuário sem o saldo
  • Como usuário admin consigo atualizar o perfil de um usuário
  • Como um usuário autenticado consigo atualziar meu próprio perfil
  • Como um usuário autenticado não consigo atualizar o perfil de outro usuário
  • Como usuário admin consigo transferir qualquer quantidade de pontos para todos os usuários
  • Como um usuário autenticado consigo tranferir 20 pontos para outro usuário e ver o saldo
  • Como um usuário admin consigo ver o saldo de todos os usuários
  • Como um usuário admin consigo ver todas as transações
  • Como um usuário autenticado consigo ver apenas minhas transações

Pytest

Agora vamos converter os casos de uso em funções de teste com o Pytest.

EDITE tests/test_api.py

import pytest

USER_RESPONSE_KEYS = {"name", "username", "dept", "avatar", "bio", "currency"}
USER_RESPONSE_WITH_BALANCE_KEYS = USER_RESPONSE_KEYS | {"balance"}


@pytest.mark.order(1)
def test_user_list(
    api_client,
    api_client_user1,  # pyright: ignore
    api_client_user2,  # pyright: ignore
    api_client_user3,  # pyright: ignore
):
    """Ensure that all needed users are created and showing on the /user/ API

    NOTE: user fixtures are called just to trigger creation of users.
    """
    users = api_client.get("/user/").json()
    expected_users = ["admin", "user1", "user2", "user3"]
    assert len(users) == len(expected_users)
    for user in users:
        assert user["username"] in expected_users
        assert user["dept"] in ["management", "sales"]
        assert user["currency"] == "USD"
        assert set(user.keys()) == USER_RESPONSE_KEYS


@pytest.mark.order(2)
def test_user_detail(api_client):
    """Ensure that the /user/{username} API is working"""
    user = api_client.get("/user/user1/").json()
    assert user["username"] == "user1"
    assert set(user.keys()) == USER_RESPONSE_KEYS


@pytest.mark.order(3)
def test_update_user_profile_by_admin(api_client_admin):
    """Ensure that admin can patch any user data"""
    data = {"avatar": "https://example.com/avatar.png", "bio": "I am a user1"}
    api_client_admin.patch("/user/user1/", json=data)
    user = api_client_admin.get("/user/user1/").json()
    assert user["avatar"] == data["avatar"]
    assert user["bio"] == data["bio"]


@pytest.mark.order(3)
def test_update_user_profile_by_user(api_client_user2):
    """Ensure that user can patch their own data"""
    data = {"avatar": "https://example.com/avatar.png", "bio": "I am a user2"}
    api_client_user2.patch("/user/user2/", json=data)
    user = api_client_user2.get("/user/user2/").json()
    assert user["avatar"] == data["avatar"]
    assert user["bio"] == data["bio"]


@pytest.mark.order(3)
def test_fail_update_user_profile_by_other_user(api_client_user2):
    """User 2 will attempt to patch User 1 profile and it will fail"""
    response = api_client_user2.patch("/user/user1/", json={})
    assert response.status_code == 403


@pytest.mark.order(4)
def test_add_transaction_for_users_from_admin(api_client_admin):
    """Admin user adds a transaction for all users"""
    usernames = ["user1", "user2", "user3"]

    for username in usernames:
        api_client_admin.post(f"/transaction/{username}/", json={"value": 500})

    for username in usernames:
        user = api_client_admin.get(f"/user/{username}/?show_balance=true").json()
        assert user["balance"] == 500


@pytest.mark.order(5)
def test_user1_transfer_20_points_to_user2(api_client_user1):
    """Ensure that user1 can transfer points to user2"""
    api_client_user1.post("/transaction/user2/", json={"value": 20})
    user1 = api_client_user1.get("/user/user1/?show_balance=true").json()
    assert user1["balance"] == 480

    # user1 can see balance of user2 because user1 is a manager
    user2 = api_client_user1.get("/user/user2/?show_balance=true").json()
    assert user2["balance"] == 520


@pytest.mark.order(6)
def test_user_list_with_balance(api_client_admin):
    """Ensure that admin can see user balance"""
    users = api_client_admin.get("/user/?show_balance=true").json()
    expected_users = ["admin", "user1", "user2", "user3"]
    assert len(users) == len(expected_users)
    for user in users:
        assert user["username"] in expected_users
        assert set(user.keys()) == USER_RESPONSE_WITH_BALANCE_KEYS


@pytest.mark.order(6)
def test_admin_can_list_all_transactions(api_client_admin):
    """Admin can list all transactions"""
    transactions = api_client_admin.get("/transaction/").json()
    assert transactions["total"] == 4


@pytest.mark.order(6)
def test_regular_user_can_see_only_own_transaction(api_client_user3):
    """Regular user can see only own transactions"""
    transactions = api_client_user3.get("/transaction/").json()
    assert transactions["total"] == 1
    assert transactions["items"][0]["value"] == 500
    assert transactions["items"][0]["user"] == "user3"
    assert transactions["items"][0]["from_user"] == "admin"

E para executar os tests podemos ir na raiz do projeto FORA DO CONTAINER

Garantimos que o script de testes é eecutável.

$ chmod +x test.sh

Executamos o script:

$ ./test.sh

[+] Running 3/3
 ⠿ Network dundie-api_default  Created                                                     0.1s
 ⠿ Container dundie-api-db-1   Started                                                     0.7s
 ⠿ Container dundie-api-api-1  Started                                                     1.7s
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision 9aa820fb7f01 -> 
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> f39cbdb1efa7, initial
INFO  [alembic.runtime.migration] Running upgrade f39cbdb1efa7 -> b0abf3428204, transaction
INFO  [alembic.runtime.migration] Running upgrade b0abf3428204 -> 9aa820fb7f01, ensure_admin_user
===================================== test session starts ======================================
platform linux -- Python 3.10.8, pytest-7.2.0, pluggy-1.0.0 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /home/app/api, configfile: pyproject.toml
plugins: order-1.0.1, anyio-3.6.2
collected 10 items                                                                             

tests/test_api.py::test_user_list PASSED                                                 [ 10%]
tests/test_api.py::test_user_detail PASSED                                               [ 20%]
tests/test_api.py::test_update_user_profile_by_admin PASSED                              [ 30%]
tests/test_api.py::test_update_user_profile_by_user PASSED                               [ 40%]
tests/test_api.py::test_fail_update_user_profile_by_other_user PASSED                    [ 50%]
tests/test_api.py::test_add_transaction_for_users_from_admin PASSED                      [ 60%]
tests/test_api.py::test_user1_transfer_20_points_to_user2 PASSED                         [ 70%]
tests/test_api.py::test_user_list_with_balance PASSED                                    [ 80%]
tests/test_api.py::test_admin_can_list_all_transactions PASSED                           [ 90%]
tests/test_api.py::test_regular_user_can_see_only_own_transaction PASSED                 [100%]

====================================== 10 passed in 4.62s ======================================
[+] Running 3/3
 ⠿ Container dundie-api-api-1  Removed                                                     1.2s
 ⠿ Container dundie-api-db-1   Removed                                                     0.7s
 ⠿ Network dundie-api_default  Removed                                                     0.3s

Se tudo deu certo então todos os testes devem ter passado, caso contrário tente encontrar onde está o erro e corrija antes de prosseguir.


Finalizamos assim a fase 1 do nosso projeto com a maior parte das funcionalidades testadas, vamos partir agora para a fase 2 ->

Task Queue

Enfileiramento de tarefas é uma abordagem que permite distribuir o procesamento de tarefas en diferentes unidades de processamento para obter um melhor desempenho e uma gestão mais fácil da escalabilidade do projeto.

Imagine que temos a tarefa de enviar um email como fizemos anteriormente usando BackGround Tasks do FastAPI, agora assuma que temos que enviar centenas ou até milhares de e-mails de uma só vez.

Para evitar que a aplicação fique sobrecarrega ao executar em um único processo, nós partimos para uma arquiretura de computação distribuida, onde poderemos ter várias instâncias de um worker sendo executados em paralelo.

Alguns conceitos importantes:

BROKER:

É o serviço que gerencia a fila de tarefas, entre os mais conhecidos temos o RabbitMQ, Redis, Kafka, simplificando é o serviço que recebe as tarefas e as coloca em uma fila para ser executada, uma analogia simples é pensar neste componente como um caixa de uma lanchonete que anota os pedidos e os coloca em uma fila distribuindo o trabalho entre os cozinheiros.

Para este projeto vamos usar o REDIS como BROKER.

Warning

O Redis é um banco de dados em memória, ele por si só não é considerado um message BROKER, ou seja, ele precisa de um outro processo para gerenciar as mensagens enfileiras nós usaremos REDIS como Broker pois o RQ faz o trabalho de gerenciar as mensagens no REDIS.

Em ambientes produtivos de alta escalabilidade é mais recomendado usar RabbitMQ, Kafka ou serviços especificos de cloud providers como o AWS SQS como Broker.

TASK:

É a tarefa que ficará disponível para ser executada, geralmente usamos decorators em Python ou uma chamada de função para registrar uma task, assim que registrada a task fica disponível e o BROKER pode validar o enfileiramento. Imagine que a task é um item do cardapio de um restaurante.

Existem vários sistemas de gestão de filas para Python cada um com um jeito específico de registrar uma task, alguns conhecidoss são: Celery, Dramatiq, RQ, Huey.

Em nosso projeto vamos usar o RQ pois é o mais simples de configurar e usar.

PRODUCER:

É o mecanismo que envia a tarefa para o BROKER, geralmente em Python usamos uma chamada de função onde passamos qual a task a ser enfileirada e os parametros necessários para a execução, além disso em alguns casos podemos definir coisas como tempo de expiração da tarefa, prioridade, callback e rotear para uma fila especifica. Imagine que o producer é o garçom que anota o pedido e leva até o caixa.

A produção de tarefas pode ser feita em qualquer parte do projeto, usando mecanismos providos pelo RQ iremos usar chamadas de função que enfileiram as mensagens diretamente no REDIS.

CONSUMER ou WORKER:

Com as tasks enfileiradas e devidamente configuradas precisamos de um componente que de fato irá executar a tarefa e tomar conta de coisas como contexto, re-execução, falhas, logs etc.

Imagine que o consumer (ou trabalhador) é o cozinheiro que recebe o pedido e que precisa executar a tarefa de preparo levando em consideração o contexto do pedido, a receita, a prioridade etc.

O RQ já fornece um mecanismo de worker que podemos usar diretamente no terminal ou integrar a um sistema de gestão de processos como o supervisord ou o systemd.

Arquitetura

Agora teremos que começar alterando a arquitetura de componentes da nossa infraestrutura.

Nesta Fase 2 do Projeto adicionaremos o serviço de fila de mensagens e o serviço consumidor de tarefas.

  • 1 Serviço de API
  • 1 Serviço de Banco de Dados
  • 1 Serviço consumidor de tarefas (RQ)
  • 1 Serviço de fila de mensagens (Redis)
graph TD;
    A["API (FastAPI) fa:fa-globe"]
    B[("Banco de Dados (PG) fa:fa-cubes")]
    C>"Fila/Broker de mensagens (Redis) fa:fa-bars"]
    D[["Consumidor/Worker de tarefas (RQ) fa:fa-gears"]]
    A --> B
    A --> C
    D --> C
    D --> B

Para começar vamos adicionar o serviço de fila de mensagens usando o REDIS, para isso vamos editar o arquivo docker-compose.yml e adicionar o serviço:

  redis:
    image: redis:6.2.5-alpine
    restart: always
    ports:
      - "6379:6379"
    volumes:
      - dundie_redis_data:/data

volumes:
  dundie_pg_data:
  dundie_redis_data:

Após editar o arquivo vamos subir o serviço:

docker compose down
docker compose up -d

O client GUI oficial é o RedisInsight, você pode baixar no site oficial: https://redislabs.com/redis-enterprise/redis-insight/

Se preferir acesssar via terminal pode usar o redis-cli: https://redis.io/docs/ui/cli/

Redis Insight1

Dentro do RedisInsight vamos criar uma conexão com o REDIS e geralmente ele já irá detectar o REDIS que está sendo executado no localhost.

Podemos então criar chaves:

Redis Insight2

Tarefas

Usando o Python-RQ qualquer função pode ser enfileirada como uma tarefa, não há necessidade de registrar a task como em outros sistemas de gestão de filas como o Celery, no RQ basta que o worker tenha acesso ao mesmo contexto da função que será executada.

Em nosso projeto já possuimos uma função que envia e-mail, vamos então enfileirar esta função como uma task.

Enfileirando a Task

Para enfileirar uma task vamos precisar de uma conexão com o REDIS, e uma instancia da fila do RQ.

O primeiro passo é editar o nosso arquivo de requirements e adicionar as dependencias do RQ:

Adicione ao arquivo requirements.in:

rq               # Task Queue

O pacote rq já irá instalar o Redis, precisamos agora atualizar o arquivo requirements.txt:

Em seu computador (fora do container) execute:

pip install pip-tools
pip-compile requirements.in
cat requirements.txt | grep rq

A saida deve ser algo como:

   # rq
   # via rq
rq==1.15.1

Agora podemos fazer o rebuild da imagem do serviço API:

docker compose down
docker compose build api
docker compose up -d

Agora podemos entrar no shell e interagir com o Redis só para confirmar que está tudo funcionando:

docker compose exec api dundie shell

E dentro do shell

from redis import Redis
from rq import Queue
q = Queue(connection=Redis("redis"))
from dundie.tasks.user import send_email
result = q.enqueue(send_email)
print(result)

O retorno será algo como:

Job(
  '01b32665-ff5d-423a-9149-c25dc9d85ba5',
  enqueued_at=datetime.datetime(2023, 7, 14, 13, 53, 29, 353977)
)

Agora podemos abrir o RedisInsight e ver que a chave rq:job:01b32665-ff5d-423a-9149-c25dc9d85ba5 foi criada. e dentro dela temos o conteúdo da task:

Redis Insight3

Portanto já sabemos como faxer para enfileiras uma task no BROKER, essa task ainda não será executada enquanto não tivermos um worker consumindo as tarefas.

Mas primeiro vamos estruturar o nosso projeto para produzir tasks via API.

Começamos editando o arquivo de configuraçoes, default.toml

[default.redis]
host = "redis"
port = 6379

Desta forma caso seja necessário alterar a configuração do REDIS podemos fazer via variáveis de ambiente pois o Dynaconf faz a gestão das configurações.

# exemplo
DUNDIE_REDIS_HOST="meuhost"

Agora criaremos um arquivo novo dundie/queue.py

from redis import Redis
from rq import Queue
from dundie.config import settings

redis = Redis(
    host=settings.redis.host,
    port=settings.redis.port,
)

queue = Queue(connection=redis)

agora podemos testar no shell:

docker compose exec api dundie shell
from dundie.queue import queue
from dundie.tasks.user import send_email
queue.enqueue(send_email)

E agora no REDIS podemos ver que a chave foi criada com os detalhes da nova task.

apague essas tasks do REDIS antes de continuar.

Agora vamos alterar o nosso endpoint de pedido de recuperação de senha para ao invés de enviar o e-mail com a BackgroundTask, enfileirar uma task no Redis para usar o processamento distribuido.

Edite dundie/routes/user.py e no topo faça a importação da queue.

from dundie.queue import queue

e nele vamos alterar a função send_password_reset_token para enfileirar a task portanto troque a linha:

- background_tasks.add_task(try_to_send_pwd_reset_email, email=email)
+ queue.enqueue(try_to_send_pwd_reset_email, email=email)

Reinicie os serviços e agora vamos testar via API:

docker compose down
docker compose up -d

Acesse

http://localhost:8000/docs#/user/send_password_reset_token_user_pwd_reset_token__post

E dispare um pedido de recuperação de senha.

Ou via CURL:

curl -X 'POST' \
  'http://localhost:8000/user/pwd_reset_token/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "email": "bruno@rocha.com"
}'
{
  "message": "If we found a user with that email, we sent a password reset token to it."
}

Agora veja se tudo deu certo no RedisInsight:

Redis Insight4

Já estamos produzindo tasks!!!

Agora vamos criar um worker para consumir as tasks.

Worker ou Consumer

Para testar podemos simplesmente executar o worker no shell:

docker compose exec api bash
~/api$ rq worker --url redis://redis:6379 --with-scheduler
14:23:11 Worker rq:worker:34f3e36d726941ceb65669ff002a79ba started with PID 20, version 1.15.1
14:23:11 Subscribing to channel rq:pubsub:34f3e36d726941ceb65669ff002a79ba
14:23:11 *** Listening on default...
14:23:11 Scheduler for default started with PID 22
14:23:11 Cleaning registries for queue: default
14:23:11 default: dundie.tasks.user.try_to_send_pwd_reset_email(email='bruno@rocha.com') (e8f10b0e-f69e-4dc0-b102-8279140892d8)
14:23:11 default: Job OK (e8f10b0e-f69e-4dc0-b102-8279140892d8)
14:23:11 Result is kept for 500 seconds

No Result é possível ver o ID da task que foi executada e o seu status alterado para finished.

Redis Insight5

Este resultado foca disponivel por 500 segundos, depois disso ele é limpo automaticamente do Redis.

Saia do terminal pressionando CTRL+C seguido de docker compose down

Agora nosso próximo passo é criar um serviço para executar o worker em segundo plano dentro de um container.

Vamos editar o arquivo docker-compose.yml e adicionar um novo serviço

  worker:
    build:
      context: .
      dockerfile: Dockerfile.dev
    environment:
      DUNDIE_DB__uri: "postgresql://postgres:postgres@db:5432/${DUNDIE_DB:-dundie}"
      DUNDIE_DB__connect_args: "{}"
      SQLALCHEMY_SILENCE_UBER_WARNING: 1
    volumes:
      - .:/home/app/api
    depends_on:
      - db
      - redis
    stdin_open: true
    tty: true
    command: rq worker --with-scheduler --url redis://redis:6379

E agora vamos executar novamente os serviços e dessa vez teremos o worker iniciado.

docker compose up -d
[+] Running 5/5
 ⠿ Network dundie-api_default     Created                                                                      0.0s
 ⠿ Container dundie-api-redis-1   Started                                                                      0.6s
 ⠿ Container dundie-api-db-1      Started                                                                      0.6s
 ⠿ Container dundie-api-api-1     Started                                                                      1.7s
 ⠿ Container dundie-api-worker-1  Started

Abra o log do worker:

$ docker compose logs worker --follow
dundie-api-worker-1  | 14:33:39 Worker rq:worker:fc005b5371eb43bf90dd5ba688a72e8f started with PID 1, version 1.15.1
dundie-api-worker-1  | 14:33:39 Subscribing to channel rq:pubsub:fc005b5371eb43bf90dd5ba688a72e8f
dundie-api-worker-1  | 14:33:39 *** Listening on default...
dundie-api-worker-1  | 14:33:39 Scheduler for default started with PID 7

E agora dispare mais pedidos de recuperação de senha via API/;

curl -X 'POST' \
  'http://localhost:8000/user/pwd_reset_token/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "email": "bruno@rocha.com"
}'

Verá que no log do worker aparecerá a task sendo executada.

dundie-api-worker-1  | 14:36:01 default: dundie.tasks.user.try_to_send_pwd_reset_email(email='bruno@rocha.com') (14e53a7a-7fb9-4bc1-896a-5d0341e6b281)
dundie-api-worker-1  | 14:36:02 default: Job OK (14e53a7a-7fb9-4bc1-896a-5d0341e6b281)
dundie-api-worker-1  | 14:36:02 Result is kept for 500 seconds

Aconselho ler a documentação do RQ em Python RQ Docs

Em nossa próxima aula iremos agendar tarefas com o scheduler do RQ.

WebSockets

O que é

O protocolo ws:// é uma abreviação de WebSocket e é usado para estabelecer uma conexão bidirecional entre um cliente (como um navegador da web) e um servidor web. Aqui estão alguns pontos-chave sobre o protocolo ws://:

  • Bidirecionalidade: Com o protocolo ws://, tanto o cliente quanto o servidor podem enviar mensagens um para o outro em tempo real, sem a necessidade de esperar por uma solicitação do cliente. Isso é diferente do HTTP tradicional, onde o cliente precisa solicitar informações do servidor a cada interação.

  • Comunicação em tempo real: O protocolo ws:// é ideal para aplicativos da web que exigem atualizações em tempo real, como jogos multiplayer, salas de bate-papo ao vivo, feeds de notícias em tempo real e colaboração em tempo real.

  • Eficiência: Em comparação com soluções baseadas em polling (em que o cliente solicita repetidamente informações do servidor), os WebSockets são mais eficientes, pois estabelecem uma conexão persistente entre o cliente e o servidor. Isso reduz a sobrecarga de rede e os atrasos associados às solicitações HTTP repetidas. NOTA: Isto vem com um custo de performance na gestão de conexões.

  • Suporte: A maioria dos navegadores modernos e servidores web suporta o protocolo ws://, tornando-o uma escolha popular para desenvolvedores que desejam criar aplicativos da web interativos e em tempo real.

Em resumo, o protocolo ws:// é uma tecnologia de comunicação em tempo real que permite uma conexão persistente e bidirecional entre clientes e servidores web, facilitando a criação de aplicativos da web interativos e em tempo real.

diag

Importante

Info

Se você criou o seu repositório fork antes de 14/03/2024 pode ser que você tenha que atualizar algumas extensões locais para que seu projeto continue funcionando, devido a mudanças no Pydantic e SQLALchemy. `

Pode ver as alterações e efetuar as mudanças manualmente com o seguinte link: https://github.com/rochacbruno/dundie-api/commit/33fb2747ac2b57a50a202eb67825aaff02036fa5

Se preferir pode aplicar as mudanças localmente com o seguinte script:

curl https://github.com/rochacbruno/dundie-api/commit/33fb2747ac2b57a50a202eb67825aaff02036fa5.patch | git apply -v

Instalação do websockets

A biblioteca websockets precisa estar instalada no ambiente onde vamos servir o websocket.

Edite o arquivo requirements.in e adicione a biblioteca websockets ao final do arquivo.

websockets                 # Web Sockets

Em seu computador (fora do container) execute:

pip install pip-tools
pip-compile requirements.in
cat requirements.txt | grep websockets
websockets==12.0  (pode ser que a versão seja diferente no momento que vc executar)

Agora devemos fazer o rebuild dos containers:

docker compose down
docker compose build api
docker compose up -d

E agora podemos entrar no shell para se certificar de que tudo está funcionando:

❯ docker compose exec api dundie shell
Auto imports: ['settings', 'engine', 'select', 'session', 'User', 'Transaction', 'Balance', 'add_transaction']

In [1]: from websockets.version import version

In [2]: version
Out[2]: '12.0'

Websocket Server

O primeiro passo é criar do lado servidor um endpoint (URL) que iniciará uma conexão usando o protocolo ws e o FastAPI já tem suporte nativo a este tipo de protocolo.

Editaremos o arquivo dundie/routes/transaction.py e vamos criar um websocket para fazer stream de transações, dessa forma o usuário vai conseguir acompanhar as transações em tempo real sem precisar dar refresh no client (navegador)

Vamos começar com algo simples para entender o funcionamento de um websocket.

from fastapi import APIRouter, Body, Depends, HTTPException, WebSocket  # New

...

@router.websocket("/ws")
async def list_transactions_ws(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message text was: {data}")

O FastAPI ainda não suporta exibir o endpoint websocket no /docs portanto esta URL não será exibida, a boa prática é adicionar documentação sobre este endpoint manualmente em seu endpoint correspondente ou em forma de texsto puro.

Websocket client

Para testar precisaremos escrever um pouco de JavaScript diretamente no navegador e para facilitar recomendo abrir um terminal e um browser lado a lado.

O passo a passo:

  1. Observamos os logs da API com docker compose logs api --follow
  2. No navegador tentamos abrir a URL http://localhost:8000/transaction/ws e obteremos o erro 405, não tem problema!
  3. Abriremos o painel de inspect do navegador e clicaremos em console onde iremos escrever algumas instruções em JavaScript.
  4. Criamos um client ws no JS com var ws = new WebSocket("ws://localhost:8000/transaction/ws");
  5. Definimos um event listener para quando recebermos uma mensagem via ws com ws.onmessage = function(event) {console.log(event.data)}; e por enquanto nosso listener apenas vai imprimir a mensagem no console.
  6. Enviamos uma mensagem ao servidor com ws.send("Hello")
  7. A mensagem é enviada ao servidor e processada, retornando ao client (repare a adição de "Message text was: ...")

Toda esta comunicação acontece utilizando um único socket de comunicação, mantendo o estado da conexão, diferente das conexões HTTP que vimos até agora.

inspect

O console em detalhe:

Inspect zoom

Criando o front-end

O objetivo até aqui é falar sobre o lado back-end apenas, nosso foco está na API e endpoints, porém para obter a experiência completa vamos precisar de um front-end então faremos uma interface muito simples para ser usada de client para nossa API.

O que faremos:

  1. Criar um novo serviço no docker-compose que vai servir apenas o front-end
  2. Usar o servidor NGINX para servir o front-end
  3. Criar o front-end usando uma página estática HTML

Vamos começar simples assim, no futuro iremos deixar este front-end mais robusto, mas no momento o que importa é apenas fazer a conexão com o websocket.

No docker-compose.yaml vamos adicionar mais um serviço, logo após worker.

services:
  worker:
    ...

  ui:
    image: nginx
    ports:
      - "8001:80"
    volumes:
      - ./ui:/usr/share/nginx/html

volumes:
    ...

Repare que estamos mapeando uma pasta local chamada ./ui que é onde estará o nosso site estático com páginas html.

mkdir ui
echo "Hello front end" > ui/index.html

Agora vamos dar o restart nos containers.

docker compose down
docker compose up -d

Na primeira execução pode demorar um pouco mais pois a imagem do nginx será baixada, mas assim que os containers estiverem em execução abra a URL http://localhost:8001 onde está sendo servido nosso front-end.

nginx

Experimente editar o arquivo ui/index.html e depois façá refresh na URL para ver se as mudanças serão refletidas.

HTML

Não espere muito do nosso HTML, aqui temos um programdor backend tentando criar uma interface :) será simples para apenas funcionar!

Edite o ui/index.html e adicione o seguinte código.

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket Transactions</title>
</head>
<body>
<h1>Transaction List</h1>
<ul id="transaction-list"></ul>

<script>
// Function to create and append a new <li> element to the list
function appendTransactionToList(transaction) {
    var transactionList = document.getElementById("transaction-list");
    var listItem = document.createElement("li");
    listItem.textContent = transaction.to + " - " + transaction.from + " - " + transaction.value;
    transactionList.appendChild(listItem);
}

// Connect to WebSocket endpoint
var ws = new WebSocket("ws://localhost:8000/transaction/ws");

// Event listener for WebSocket connection opened
ws.onopen = function(event) {
    console.log("WebSocket connection opened");
};

// Event listener for incoming messages
ws.onmessage = function(event) {
    console.log("New message received:", event.data);
    // Parse JSON data
    var transaction = JSON.parse(event.data);
    // Append transaction to list
    appendTransactionToList(transaction);
};

// Event listener for WebSocket connection closed
ws.onclose = function(event) {
    console.log("WebSocket connection closed");
};

// Event listener for WebSocket connection error
ws.onerror = function(event) {
    console.error("WebSocket error:", event);
};
</script>
</body>
</html>

O próximo passo é implementar no endpoint o retorno das ultimas transactions.

@router.websocket("/ws")
async def list_transactions_ws(websocket: WebSocket, session: Session = ActiveSession):
    await websocket.accept()
    last = 0
    while True:
        # Read all transactions that have not been seen yet
        new_transactions = session.exec(select(Transaction).where(Transaction.id > last).order_by("id"))
        for transaction in new_transactions:
            data = {
                "to": transaction.user.name,
                "from": transaction.from_user.name,
                "value": transaction.value,
            }
            await websocket.send_json(data)

            # set the last sent ID to avoid duplication
            last = transaction.id

            # Sleep 1 second (just to see better on UI)
            await sleep(1)

Agora podemos salvar e abrir o endereço http://localhost:8001 que inicialmente irá exibir uma tela vazia, porem pode abrir o inspect do navegador para ver a mensagem de conexão ao websocket e tambem verificar a mesma informação nos logs do container de API.

Através do terminal vamos verificar quais users existem no sistema

❯ docker compose exec api dundie user-list
                       dundie users
┏━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ name  ┃ username ┃ dept       ┃ email        ┃ currency ┃
┡━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Admin │ admin    │ management │ admin@dm.com │ USD      │
│ user3 │ user3    │ sales      │ user3@dm.com │ USD      │
│ user1 │ user1    │ management │ user1@dm.com │ USD      │
│ user2 │ user2    │ sales      │ user2@dm.com │ USD      │
└───────┴──────────┴────────────┴──────────────┴──────────┘

E agora com a tela do browser aberta lado a lado vamos adicionar transactions.

socketing

Danger

PERFORMANCE e DISPONIBILIDADE

Nossa implementação do endpoint list_transactions_ws não está boa, existem certos problemas que podem influenciar a performance e a escalabilidade, isso se deve ao fato de que para cada cliente que estiver conectado (aba aberta no browser) do lado servidor será preciso manter uma conexão ws, uma session com o banco de dados e todo o restante do estado.

Considere um site que recebe milhares de requests simultaneos, isso pode ser um grande problema!

Para resolver estes problemas será preciso mudar a implementação para:

  • Evitar a execução de queries SQL, lendo a partir de um cache ou tópico em uma fila de stream (redis/kafka)
  • Se usar SQL devemos usar um driver async, o driver atual não é async, teriamos que usar um do SQLAlchemy 2.0
  • Usar um pool de conexões websocket
  • Tratar o fechamento das conexões

E além dessas mudanças na aplicação tambem será necessário colocar nosso servidor para executar em um load-balancer para que tenha alta disponibilidade.

Nós não vamos resolver os problemas acima agora, em nosso ambiente controlado o máximo que vai acontecer é conexões simultaneas darem timeout, tente por exemplo abrir mais de uma vez a página no browser, em abas separadas, abrindo assim diferentes sockets, tambem tente fazer uma requisição normal a API em http://localhost:8000/docs e você vai perceber que o servidor não será capaz de servir todas essas conexões.

O que podemos fazer neste momento?

Escalar Horizontalmente!

Vamos editar o arquivo Dockerfile.dev e aumentar a quantidade de workers uvicorn, e também será necessário remover a funcionalidade de reload automático, pois o uso de --reload faz com que o uvicorn execute de modo bloqueante.

-CMD ["uvicorn","dundie.app:app","--host=0.0.0.0","--port=8000", "--reload"]
+CMD ["uvicorn","dundie.app:app","--host=0.0.0.0","--port=8000", "--workers=8"]

Após alterar o Dockerfile precisaremos fazer o rebuild da imagem e executar novamente

docker compose down
docker compose build api
docker compose up -d

Agora sim a nossa API estará executando com 8 workers, capazes de servir mais requisições e segurar mais sockets abertos.

api-1     | INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
api-1     | INFO:     Started parent process [1]
api-1     | INFO:     Started server process [13]
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Application startup complete.
api-1     | INFO:     Started server process [10]
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Application startup complete.
api-1     | INFO:     Started server process [9]
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Application startup complete.
api-1     | INFO:     Started server process [14]
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Application startup complete.
api-1     | INFO:     Started server process [12]
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Application startup complete.
api-1     | INFO:     Started server process [11]
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Started server process [15]
api-1     | INFO:     Application startup complete.
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Application startup complete.
api-1     | INFO:     Started server process [8]
api-1     | INFO:     Waiting for application startup.
api-1     | INFO:     Application startup complete.

Experimentando

Temos 8 workers, isso quer dizer que podemos abrir pelo menos 7 sockets ao mesmo tempo e ainda ter um livre para receber as requisições HTTP.

  • Experimente abrir várias abas no endereço http://localhost:8001 (dica: O browser permite clicar com o direito e duplicar a aba)
  • Adicione uma transaction no CLI e veja se aparece em todas as abas docker compose exec api dundie transaction admin 123
  • tente abrir http://localhost:8000/docs e executar o endpoint List Users

Você vai perceber que dentro dessas limitações irá funcionar, mas ainda assim não é o ideal.

Melhorias

Uma das melhorias é criando em nossa aplicação um ConnectionManager para gerenciar a lista de conexões ws, na documentação do FastAPI o procedimente é explicado em detalhes https://fastapi.tiangolo.com/advanced/websockets/#handling-disconnections-and-multiple-clients

Testando

Para testar o endpoint WS podemos usar o mesmo client que usamos para testar os endpoints HTTP, vamos adicionar o seguinte teste ao arquivo tests/test_api.py

@pytest.mark.order(6)
def test_admin_can_list_all_transactions_ws(api_client_admin):
    """Admin can list all 4 transactions on ws endpoint"""
    with api_client_admin.websocket_connect("/transaction/ws") as ws:
        for i in range(4):
            data = ws.receive_json()
            assert data.keys() == {"to", "from", "value"}

Executando o teste

❯ ./test.sh
[+] Running 5/6
 ⠸ Network dundie-api_default     Created               1.3s
 ✔ Container dundie-api-ui-1      Started               0.7s
 ✔ Container dundie-api-db-1      Started               0.5s
 ✔ Container dundie-api-redis-1   Started               0.8s
 ✔ Container dundie-api-api-1     Started               0.8s
 ✔ Container dundie-api-worker-1  Started               1.2s
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision 9aa820fb7f01 ->
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> f39cbdb1efa7, initial
INFO  [alembic.runtime.migration] Running upgrade f39cbdb1efa7 -> b0abf3428204, transaction
INFO  [alembic.runtime.migration] Running upgrade b0abf3428204 -> 8af1cd3be673, transaction1
INFO  [alembic.runtime.migration] Running upgrade 8af1cd3be673 -> 6f4df3b5e155, transaction2
INFO  [alembic.runtime.migration] Running upgrade 6f4df3b5e155 -> 9aa820fb7f01, ensure_admin_user
================================ test session starts =================================
platform linux -- Python 3.10.13, pytest-8.1.1, pluggy-1.4.0 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /home/app/api
configfile: pyproject.toml
plugins: order-1.2.0, anyio-4.3.0
collected 11 items

tests/test_api.py::test_user_list PASSED                                       [  9%]
tests/test_api.py::test_user_detail PASSED                                     [ 18%]
tests/test_api.py::test_update_user_profile_by_admin PASSED                    [ 27%]
tests/test_api.py::test_update_user_profile_by_user PASSED                     [ 36%]
tests/test_api.py::test_fail_update_user_profile_by_other_user PASSED          [ 45%]
tests/test_api.py::test_add_transaction_for_users_from_admin PASSED            [ 54%]
tests/test_api.py::test_user1_transfer_20_points_to_user2 PASSED               [ 63%]
tests/test_api.py::test_user_list_with_balance PASSED                          [ 72%]
tests/test_api.py::test_admin_can_list_all_transactions PASSED                 [ 81%]
tests/test_api.py::test_regular_user_can_see_only_own_transaction PASSED       [ 90%]
tests/test_api.py::test_admin_can_list_all_transactions_ws PASSED              [100%]

================================= 11 passed in 8.08s =================================
[+] Running 6/6
 ✔ Container dundie-api-worker-1  Removed                                        1.0s
 ✔ Container dundie-api-ui-1      Removed                                        0.3s
 ✔ Container dundie-api-api-1     Removed                                        2.4s
 ✔ Container dundie-api-redis-1   Removed                                        0.3s
 ✔ Container dundie-api-db-1      Removed                                        0.3s
 ✔ Network dundie-api_default     Removed                                        0.1s

Conclusão

O FastAPI junto com o Starlette permite implementar websockets de maneira bastante simples, mas como é um micro-framework limita-se a entregar a parte de comunicação entre client e server respeitando o protocolo ws://.

Qualquer melhoria adicional precisará ser ajustada tanto na plataforma que serve a aplicação, como uvicorn, caches, load-balancers, proxies, como também através da utilização de drivers assincronos para acesso ao banco de dados assim como o uso de streams de dados ao invés de conexões a bancos de dados relacionais.

Middleware e CORS

1 Request -> REsponse

A parte mais importante de uma aplicação web é o fluxo de requisição, ou seja, o cliente (browser) faz uma requisição (Request) ao servidor.

A requisição formatada no protocolo HTTP é enviada ao Servidor, que por sua vez processa os elementos como URL, Verbo, Headers, Payload.

O servidor então prepara a resposta (Response) também no formato HTTP, contendo HEADERS e Body e envia esta resposta ao cliente.

No FastAPI podemos inspecionar o objecto request, e modificar o objeto REsponse de maneira bastante simples.

Adicione em dundie/routes/user.py logo antes da rota list_users.

@router.get("/test")
async def test_view(request: Request, response: Response):

    print(request.headers)
    print(response.headers)
    response.headers["X-Qualquer-Coisa"] = "123"
    return {}

Acesse http://localhost:8000/docs#/user/test_view_user_test_get e faça uma requisição.

repare que no terminal será impresso os valores dos headers, e no retorno da API no browser você verá o seu novo header x-qualquer-coisa.

Existem vários motivos para querermos injetar novos headers no objeto reponse, ou inspecionar atributos do objeto request.

Um deles é para lidar com o conceito de CORS que já veremos a seguir.

Injetar headers HTTP individualmente em cada rota assim como fizemos acima as vezes é útil, agora imagine que precisamos injetar esse header em todas as rotas!

Seria preciso alterar código em muitos lugares, portanto podemos automizar com o uso de middlewares.

Middleware

A palavra middleware pode significar coisas diferentes dependendo do tipo de software que estiver trabalhando, mas no contexto de aplicações web um middleware geralmente se refere a um procedimento que será injetado entre o recebimento do Request e o envio do Response.

No fastapi podemos adicionar um middleware de duas formas, usando um decorator, ou adicionando explicitamente.

Decorator

Começamos definindo uma função que irá receber o request e um callback para gerar o response.

No final do arquivo app.py

@app.middleware("http")
async def add_new_header(request: Request, make_response):
    response = await make_response(request)
    response.headers["X-Qualquer-Coisa"] = "456"
    return response

Agora pode requisitar qualquer endpoint em http://localhost:8000/docs#/ que o novo será adicionado a todos.

Existem uma série de middlewares que podem ser adicionados para diversos fins, como por exemplo, redirecionar todos os usuários para uma determinada URL ou Schema, forçar a autenticação em um SSO etc.

CORS

Este é o Middleware que com certeza qualquer aplicação com front-end irá necessitar, vamos antes entender o que é CORS.

Cross Origin Resource Sharing = Compartilhamento de Recursos entre Origens.

Recurso = URL/endpoint Origin = Request principal iniciado pelo navegador

Bloqueio de compartilhamento entre origens é uma prática de segurança que é implementada em todos os principais navegadores.

Este bloqueio regula que, uma página web (pagina.html) quando for requisitada só poderá requisitar novos recursos (outros endpoints) se este estiver na mesma origin, ou seja, no mesmo schema:dominio:porta.

PAra ver como isso funciona na prática vamos criar uma página em nossa ui

ui/users.html

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Users</title>
</head>
<body>
<h1>Users List</h1>
<ul id="users-list"></ul>

<script>
// Function to create and append a new <li> element to the list
function appendUserToList(user) {
    var transactionList = document.getElementById("users-list");
    var listItem = document.createElement("li");
    listItem.textContent = user.name;
    transactionList.appendChild(listItem);
}

// Fetch API user list
fetch("http://localhost:8000/user/")
// parse to JSON
.then(response => response.json())
// loop the list and pass each user to the append function
.then(users => {
    users.forEach(user => {
        appendUserToList(user);
    });
});
</script>
</body>
</html>

Acesse no navegador http://localhost:8001/users.html e abra o console de dev (botao direito -> inspect) e na aba console veja o seguinte erro:

Cross-Origin Request Blocked: The Same Origin Policy disallows reading the remote resource at http://localhost:8000/user/. (Reason: CORS header ‘Access-Control-Allow-Origin’ does not match ‘localhost:8001’).

Como a mensagem explica, a UI está sendo servida em localhost:8001 e a API que queremos acessar está em localhost:8000, logo são consideradas origens diferentes.

Para resolver este problema precisamos dizer ao navegador que confiamos nessa comunicação.

Vamos fazer isso em nosso middleware, altere o middleware em app.py

@app.middleware("http")
async def add_new_header(request: Request, make_response):
    response = await make_response(request)
    response.headers["X-Qualquer-Coisa"] = "456"
    response.headers["Access-Control-Allow-Origin"] = "http://localhost:8001"
    return response

Agora acesse http://localhost:8001/users.html e veja que dessa funcionou sem problemas.

mais de uma origem?

Existem casos em que a app front-end precisa acessar recursos de mais de uma origem diferente, porém o header Access-Control-Allow-Origin só aceita um único valor.

Neste caso será necessário uma implementação mais elaborada envolvendo expressões regulares para fazer o match em multiplas origens ou um esquema onde a origem em questão é resolvida em um request pre-flight que é quando o navegador envia um request OPTIONS perguntando ao servidor se pode efetuar um request.

Uma opção também é usar o wildcard * para fazer match com qualquer origem, mas esta opção não é considerada segura e é usada apenas em APIs que são acessadas apenas em redes controladas.

O FastAPI tem uma forma mais sucinta de resolver este problema.

Cors Middleware

Podemos agora comentar o nosso middleware do app.py, deixaremos o código comentado apenas como referencia.

# @app.middleware("http")
# async def add_new_header(request: Request, make_response):
#     response = await make_response(request)
#     response.headers["X-Qualquer-Coisa"] = "456"
#     response.headers["Access-Control-Allow-Origin"] = "http://localhost:8001"
#     return response

Vamos usar então um middleware já embutido no FastAPI, no mesmo arquivo app.py

from fastapi.middleware.cors import CORSMiddleware
...
# após `app= FastAPI(..)

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost:8001",
        "http://localhost",
        "https://server.com",
    ],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Este middleware irá automatizar uma série de coisas, fazendo a negociação inicial em pre-flight para construir o valor correto que vai no header "Access-Control-Allow-Origin" e cuidando de configurações adicionais como decidir se esse header ser a adicionado para todos os métodos, se a parte de autenticação será incluida etc..

Tudo o que teriamos que fazer manualmente resolvido com um Middleware.

https://fastapi.tiangolo.com/tutorial/cors/

Ahh agora ao acessar http://localhost:8001/users.html tudo deve funcionar perfeitamente.

Front-end e Autenticação

Autenticação consiste em validar as credenciais de um usuário para conceder ou negar acesso a um recurso.

Solicitar as credenciais a cada tentativa de acesso tornaria a experiência do usuário muito desagradável. Por isso, existem dois modelos de persistência de autenticação comumente utilizados quando integrados ao front-end.

Algumas aplicações adotam ambos os modelos simultaneamente, já que é recomendável, e às vezes até necessário, ter mais de um backend de autenticação.

Imagine um cenário em que o aplicativo tem usuários cadastrados diretamente no app (como um usuário administrador, por exemplo), usuários com acesso via token (senhas de app), usuários autenticados através de serviços de diretório como AD/LDAP/Keycloak e provedores de autenticação via OAuth (Google, GitHub, etc.).

Nesse caso, a aplicação pode definir um pipeline de autenticação, chamado de auth backends.

Ex:

[local_users, token, ldap, oath]

No framework Django, por exemplo, configurar esse pipeline de autenticação é bastante simples: basta adicionar as configurações e listar os backends na ordem desejada, sendo que a ordem faz toda a diferença. Já em micro-frameworks como FastAPI, é necessário definir manualmente todo o pipeline.

Modelo de Persistência

Após o usuário autenticar, independentemente da fase do pipeline em que a autenticação ocorreu, é uma boa prática manter o estado de "autenticado" por um período predefinido, evitando que todo o pipeline tenha que ser executado novamente em cada requisição.

Sessão

A forma mais tradicional de fazer isso é através de sessões (Session). O conceito é simples:

  • O usuário envia as credenciais para um endpoint /login.
  • Em caso de sucesso, o servidor gera um ID único para a sessão, como "123456".
  • Esse ID é armazenado em algum tipo de storage (memória, banco de dados, cache).
  • Além do ID, o servidor também define um timestamp de expiração, por exemplo, 24h (lease time).
  • O cliente (navegador) recebe um cookie contendo o ID da sessão "123456".

Assim, tanto no lado do servidor (no storage) quanto no cliente (no cookie) existe a mesma informação: "123456 válido até 16h".

Toda vez que o cliente faz uma requisição, o cookie é enviado ao servidor, que verifica se os dados do cookie coincidem com os do storage e se a validade não expirou.

Opcionalmente, podem ser armazenadas informações adicionais como IP e dispositivo.

Warning

Existem implicações de segurança nesse modelo que podem ser mitigadas com técnicas de proteção como XSS, CSRF, entre outras.

Token

Este modelo é mais indicado quando já existe uma estrutura de autenticação por token (como no nosso caso), e a preferência é deixar a implementação a cargo do front-end.

O fluxo de trabalho é o seguinte:

  • O usuário faz login por meio de um formulário em /login.html.
  • O formulário envia as credenciais para o endpoint /token e recebe um token e um refresh token.
  • O front-end armazena esses dados em um cookie ou local storage.
  • A cada requisição, o front-end valida a expiração do token, verifica a necessidade de refresh e adiciona o token nos headers da requisição.

Este modelo não exige o armazenamento de sessões no servidor, já que não há sessão. O próprio token contém as informações de expiração, e essas informações estão assinadas.

Upgrade do ambiente

No terminal fora do container

pip-compile requirements.in --upgrade --resolver=backtracking

docker compose down
docker compose build api
docker compose up

Implementando Session no FastAPI

Session storage

Primeiro precisamos definir onde guardaremos as sessions, para nossa sorte já temos o Redis em nossa infraestrutura e podemos usar.

Começamos estabelecendo algumas configurações em default.toml

[default.session_store]
host = "@get redis.host"
port = "@get redis.port"
db = 1
expiration = "@int @jinja {{60 * 60 * 24 * 7}}"

Criando funções de gestão de session.

dundie/session.py

from __future__ import annotations

from uuid import uuid4
from dundie.config import settings
from redis import Redis

EXPIRATION = settings.session_store.get("expiration", 60 * 60 * 24)

session_store = Redis(
    host=settings.session_store.host,
    port=settings.session_store.port,
    db=settings.session_store.db,
)


def set_session(username) -> str:
    """Creates a new random session"""
    session_id = uuid4().hex
    session_store.set(session_id, username, ex=EXPIRATION, nx=True)
    return session_id


def get_session(session_id) -> bool | str:
    """Get data from a session_id"""
    session_data = session_store.get(session_id)
    return session_data and session_data.decode()

Quando o usuário fizer login através de um formulário vamos criar a sessão usando set_session e vamos isso diretamente no FastAPI e depois submeter um formulário com HTML.

Session login

E, routes/auth.py vamos adicionar:

from fastapi import ..., Form, Request
from fastapi.responses import Response
from dundie.session import set_session, session_store

...


# session auth

@router.post("/login")
async def session_login(
    response: Response,
    username: str = Form(),
    password: str = Form(),
):
    """Cookie Based Session Auth Login"""
    user = authenticate_user(get_user, username, password)
    if not user or not isinstance(user, User):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
        )
    session_id = set_session(user.username)
    response.set_cookie(key="session_id", value=session_id, domain="localhost")
    return {"status": "logged in"}


@router.post("/logout")
async def session_logout(request: Request, response: Response):
    """Cookie Based Session Auth Logout"""
    if session_id := request.cookies.get("session_id"):
        response.delete_cookie(key="session_id")
        session_store.delete(session_id)
    return {"status": "logged out"}

Ajustando as dependencias de auth

dundie/auth.py

from dundie.session import get_session
...

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)

...


def get_current_user(
    request: Request, token: str = Depends(oauth2_scheme), fresh=False  # pyright: ignore
) -> User:
    """Get current user authenticated"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    if request:
        if session_id := request.cookies.get("session_id"):
            username = get_session(session_id)
            if not username:
                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Session ID")
            user = get_user(username=username)
            if user is None:
                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Session user not found")
            return user


        if authorization := request.headers.get("authorization"):
            try:
                token = authorization.split(" ")[1]
            except IndexError:
                raise credentials_exception

    if not token:
        raise credentials_exception

    try:
        payload = jwt.decode(
            token, SECRET_KEY, algorithms=[ALGORITHM]  # pyright: ignore  # pyright: ignore
        )
        username: str = payload.get("sub")  # pyright: ignore

        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(username=token_data.username)
    if user is None:
        raise credentials_exception
    if fresh and (not payload["fresh"] and not user.superuser):
        raise credentials_exception

    return user

Agora em http://localhost:8000/docs

login endpoint

Form HTML

Agora precisamos de um formulário HTML para postar as credenciais de login.

ui/login.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Login Form</title>
    <style>
        #message {
            margin-bottom: 20px;
            padding: 10px;
            border: 1px solid transparent;
            display: none;
        }
        #message.success {
            border-color: green;
            color: green;
        }
        #message.error {
            border-color: red;
            color: red;
        }
    </style>
</head>
<body>
    <h2>Login Form</h2>

    <div id="message"></div> <!-- Message div -->

    <form id="loginForm">
        <label for="username">Username:</label><br>
        <input type="text" id="username" name="username" required><br><br>

        <label for="password">password:</label><br>
        <input type="password" id="password" name="password" required><br><br>

        <button type="submit">Submit</button>
    </form>

    <script>

        const messageDiv = document.getElementById('message');
        function reset_message_div() {
            // Reset the message div
            messageDiv.style.display = 'none';
            messageDiv.classList.remove('success', 'error');
        }

        document.getElementById('loginForm').addEventListener('submit', function(event) {
            event.preventDefault();  // Prevent the default form submission

            const formData = new FormData();
            formData.append("username", document.getElementById("username").value);
            formData.append("password", document.getElementById("password").value);

            reset_message_div();

            const url = `http://localhost:8000/login`;
            fetch(url, {
                method: 'POST',
                body: formData,
                credentials: 'include'
            })
            .then(response => {
                if (response.status >= 400 && response.status <= 500) {
                    throw new Error(`Authentication Error ${response.status} ${response.statusText}`);
                }
                return response.json();
            })
            .then(data => {
                location.href = "/add.html"
            })
            .catch((error) => {
                console.error('Error:', error);
                messageDiv.textContent = error;
                messageDiv.classList.add('error');
                messageDiv.style.display = 'block';
            });
        });
    </script>
</body>
</html>

Agora em http://localhost:8001/login.html

login form login error

Transaction HTML

ui/add.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Transaction Form</title>
    <style>
        #message {
            margin-bottom: 20px;
            padding: 10px;
            border: 1px solid transparent;
            display: none;
        }
        #message.success {
            border-color: green;
            color: green;
        }
        #message.error {
            border-color: red;
            color: red;
        }
        nav#controls {
            float: right;
        }
    </style>
</head>
<body>
    <nav id="controls">
        <button id="login" onclick="login()">Login</button>
        <span>
        <button id="logout" onclick="logout()">Logout</button>
    </nav>

    <h2>Transaction Form</h2>

    <div id="message"></div> <!-- Message div -->

    <form id="transactionForm">
        <label for="username">Username:</label><br>
        <input type="text" id="username" name="username" required><br><br>

        <label for="value">Value (positive integer):</label><br>
        <input type="number" id="value" name="value" min="1" required><br><br>

        <button type="submit">Submit</button>
    </form>

    <script>
        function getCookie(name) {
            let cookies = document.cookie.split(';');
            for(let i = 0; i < cookies.length; i++) {
                let cookie = cookies[i].trim();
                if (cookie.startsWith(name + '=')) {
                    return cookie.substring(name.length + 1);
                }
            }
            return null;
        }

        function checkSessionLogin() {
            let sessionCookie = getCookie("session_id");
            if (!sessionCookie) {
                window.location.href = "/login.html";
            }
        }

        checkSessionLogin();

        const messageDiv = document.getElementById('message');
        function reset_message_div() {
            // Reset the message div
            messageDiv.style.display = 'none';
            messageDiv.classList.remove('success', 'error');
        }

        function logout() {
            fetch('http://localhost:8000/logout', {
                method: 'POST',
                credentials: 'include'
            })
            .then(response => {
                location.href = "/login.html"
            })
            .catch((error) => {
                console.error('Error:', error);
                messageDiv.textContent = error;
                messageDiv.classList.add('error');
                messageDiv.style.display = 'block';
            });
        }
        function login() {
            location.href = "/login.html"
        }

        document.getElementById('transactionForm').addEventListener('submit', function(event) {
            event.preventDefault();  // Prevent the default form submission

            const username = document.getElementById('username').value;
            const value = document.getElementById('value').value;

            reset_message_div();

            if (value <= 0) {
                messageDiv.textContent = 'Please enter a positive integer for the value.';
                messageDiv.classList.add('error');
                messageDiv.style.display = 'block';
                return;
            }

            const url = `http://localhost:8000/transaction/${username}/`;
            const data = { value: parseInt(value) };

            fetch(url, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify(data),
                credentials: 'include'
            })
            .then(response => {
                if (response.status >= 400 && response.status <= 500) {
                    throw new Error(`Authentication Error ${response.status} ${response.statusText}`);
                }
                return response.json();
            })
            .then(data => {
                messageDiv.textContent = 'Transaction successful: ' + JSON.stringify(data);
                messageDiv.classList.add('success');
                messageDiv.style.display = 'block';
            })
            .catch((error) => {
                console.error('Error:', error);
                messageDiv.textContent = error;
                messageDiv.classList.add('error');
                messageDiv.style.display = 'block';
            });
        });
    </script>
</body>
</html>

Agora em http://localhost:8001/add.html

login form

Finalização

Muito obrigado por ter participado do treinamento Python Web API.