An extendable async API using FastAPI, Pydantic V2, SQLAlchemy 2.0, PostgreSQL and Redis.
Bugs fixed and now working with MyPy
.
uuid_pkg.UUID(as_uuid=True)
from uuid mixinstatus_code.description
changed to HTTPStatus(status_code).description
in custom exception definitionModelType = TypeVar("ModelType", bound=Base)
PostUpdate
inheriting from BaseModel
instead of PostBase
MissingClientError
created plus a bunch of None dealt withFull Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.8.0...v0.8.1
Old core folder structure:
βββ core # Core utilities and configurations for the application.
β β βββ __init__.py
β β βββ cache.py # Utilities related to caching.
β β βββ config.py # Application configuration settings.
β β βββ database.py # Database connectivity and session management.
β β βββ exceptions.py # Contains core custom exceptions for the application.
β β βββ logger.py # Logging utilities.
β β βββ models.py # Base models for the application.
β β βββ queue.py # Utilities related to task queues.
β β βββ rate_limit.py # Rate limiting utilities and configurations.
β β βββ security.py # Security utilities like password hashing and token generation.
β β βββ setup.py # File defining settings and FastAPI application instance definition.
New structure:
βββ core # Core utilities and configurations for the application.
β βββ __init__.py
β βββ config.py # Configuration settings for the application.
β βββ logger.py # Configuration for application logging.
β βββ schemas.py # Pydantic schemas for data validation.
β βββ security.py # Security utilities, such as password hashing.
β βββ setup.py # Setup file for the FastAPI app instance.
β β
β βββ db # Core Database related modules.
β β βββ __init__.py
β β βββ crud_token_blacklist.py # CRUD operations for token blacklist.
β β βββ database.py # Database connectivity and session management.
β β βββ models.py # Core Database models.
β β βββ token_blacklist.py # Model for token blacklist functionality.
β β
β βββ exceptions # Custom exception classes.
β β βββ __init__.py
β β βββ ...
β β
β βββ utils # Utility functions and helpers.
β βββ __init__.py
β βββ cache.py # Cache-related utilities.
β βββ queue.py # Utilities for task queue management.
β βββ rate_limit.py # Rate limiting utilities.
A few relevant notes:
Exception handling was restructured.
Old exceptions structure:
βββ app # Main application directory.
βββ ...
β
βββ api # Folder containing API-related logic.
β βββ ...
β βββ exceptions.py # Custom exceptions for the API.
β
βββ core # Core utilities and configurations for the application.
β βββ ...
β β
β βββ exceptions # Custom exception classes.
β β βββ __init__.py
β β βββ exceptions.py # Definitions of custom exceptions.
New structure:
βββ app # Main application directory.
βββ ...
β
βββ core # Core utilities and configurations for the application.
β βββ ...
β β
β βββ exceptions # Custom exception classes.
β β βββ __init__.py
β β βββ cache_exceptions.py # Exceptions related to cache operations.
β β βββ http_exceptions.py # HTTP-related exceptions.
Now to use http exceptions you may just import from app/core/exceptions/http_exceptions
and optionally add a detail:
from app.core.exceptions.http_exceptions import NotFoundException
# If you want to specify the detail, just add the message
if not user:
raise NotFoundException("User not found")
# Or you may just use the default message
if not post:
raise NotFoundException()
The predefined possibilities in http_exceptions are the following:
CustomException
: 500 internal errorBadRequestException
: 400 bad requestNotFoundException
: 404 not foundForbiddenException
: 403 forbiddenUnauthorizedException
: 401 unauthorizedUnprocessableEntityException
: 422 unprocessable entityDuplicateValueException
: 422 unprocessable entityRateLimitException
: 429 too many requestsNGINX is a high-performance web server, known for its stability, rich feature set, simple configuration, and low resource consumption. NGINX acts as a reverse proxy, that is, it receives client requests, forwards them to the FastAPI server (running via Uvicorn or Gunicorn), and then passes the responses back to the clients.
To run with NGINX, you start by uncommenting the following part in your docker-compose.yml
:
# docker-compose.yml
...
# #-------- uncomment to run with nginx --------
# nginx:
# image: nginx:latest
# ports:
# - "80:80"
# volumes:
# - ./default.conf:/etc/nginx/conf.d/default.conf
# depends_on:
# - web
...
Which should be changed to:
# docker-compose.yml
...
#-------- uncomment to run with nginx --------
nginx:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./default.conf:/etc/nginx/conf.d/default.conf
depends_on:
- web
...
Then comment the following part:
# docker-compose.yml
services:
web:
...
# -------- Both of the following should be commented to run with nginx --------
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
# command: gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000
Which becomes:
# docker-compose.yml
services:
web:
...
# -------- Both of the following should be commented to run with nginx --------
# command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
# command: gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000
Then pick the way you want to run (uvicorn or gunicorn managing uvicorn workers) in Dockerfile
.
The one you want should be uncommented, comment the other one.
# Dockerfile
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
# CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker". "-b", "0.0.0.0:8000"]
If you want to run with one server only, your setup should be ready. Just make sure the only part that is not a comment in deafult.conf
is:
# default.conf
# ---------------- Running With One Server ----------------
server {
listen 80;
location / {
proxy_pass http://web:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
NGINX can distribute incoming network traffic across multiple servers, improving the efficiency and capacity utilization of your application.
To run with multiple servers, just comment the Running With One Server
part in default.conf
and Uncomment the other one:
# default.conf
# ---------------- Running With One Server ----------------
...
# ---------------- To Run with Multiple Servers, Uncomment below ----------------
upstream fastapi_app {
server fastapi1:8000; # Replace with actual server names or IP addresses
server fastapi2:8000;
# Add more servers as needed
}
server {
listen 80;
location / {
proxy_pass http://fastapi_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
[!WARNING] Note that we are using
fastapi1:8000
andfastapi2:8000
as examples, you should replace it with the actual name of your service and the port it's running on.
Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.7.0...v0.8.0
To retrieve data with a join operation, you can use the get_joined method from your CRUD module. Here's how to do it:
# Fetch a single record with a join on another model (e.g., User and Tier).
result = await crud_users.get_joined(
db=db, # The SQLAlchemy async session.
join_model=Tier, # The model to join with (e.g., Tier).
schema_to_select=UserSchema, # Pydantic schema for selecting User model columns (optional).
join_schema_to_select=TierSchema # Pydantic schema for selecting Tier model columns (optional).
)
Relevant Parameters:
join_model
: The model you want to join with (e.g., Tier).join_prefix
: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.join_on
: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.schema_to_select
: A Pydantic schema to select specific columns from the primary model (e.g., UserSchema).join_schema_to_select
: A Pydantic schema to select specific columns from the joined model (e.g., TierSchema).join_type
: pecifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join. Default "left".kwargs
: Filters to apply to the primary query.This method allows you to perform a join operation, selecting columns from both models, and retrieve a single record.
Similarly, to retrieve multiple records with a join operation, you can use the get_multi_joined method. Here's how:
# Retrieve a list of objects with a join on another model (e.g., User and Tier).
result = await crud_users.get_multi_joined(
db=db, # The SQLAlchemy async session.
join_model=Tier, # The model to join with (e.g., Tier).
join_prefix="tier_", # Optional prefix for joined model columns.
join_on=and_(User.tier_id == Tier.id, User.is_superuser == True), # Custom join condition.
schema_to_select=UserSchema, # Pydantic schema for selecting User model columns.
join_schema_to_select=TierSchema, # Pydantic schema for selecting Tier model columns.
username="john_doe" # Additional filter parameters.
)
Relevant Parameters:
join_model
: The model you want to join with (e.g., Tier).join_prefix
: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.join_on
: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.schema_to_select
: A Pydantic schema to select specific columns from the primary model (e.g., UserSchema).join_schema_to_select
: A Pydantic schema to select specific columns from the joined model (e.g., TierSchema).join_type
: pecifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join. Default "left".kwargs
: Filters to apply to the primary query.offset
: The offset (number of records to skip) for pagination. Default 0.limit
: The limit (maximum number of records to return) for pagination. Default 100.kwargs
: Filters to apply to the primary query.Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.6.0...v0.7.0
To log users out, a token blacklist was created. Now you can invalidate a token at any moment. To do so, verify_token
function (also schemas and CRUD) created.
create_first_superuser.py
If you change the user
model, you must also update the table definition in src/scripts/create_first_superuser.py
in order for it to keep working.
Here is the table definition:
# src/scripts/create_first_superuser.py
...
if user is None:
metadata = MetaData()
user_table = Table(
"user", metadata,
Column("id", Integer, primary_key=True, autoincrement=True, nullable=False),
Column("name", String(30), nullable=False),
Column("username", String(20), nullable=False, unique=True, index=True),
Column("email", String(50), nullable=False, unique=True, index=True),
Column("hashed_password", String, nullable=False),
Column("profile_image_url", String, default="https://profileimageurl.com"),
Column("uuid", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True),
Column("created_at", DateTime, default=datetime.utcnow, nullable=False),
Column("updated_at", DateTime),
Column("deleted_at", DateTime),
Column("is_deleted", Boolean, default=False, index=True),
Column("is_superuser", Boolean, default=False),
Column("tier_id", Integer, ForeignKey("tier.id"), index=True)
)
Let's say you added a column in your user model, age
.
...
class User(Base):
__tablename__ = "user"
...
# --------- here the new `age` column was added ---------
age: Mapped[Optional[int]] = mapped_column(default=None)
# -------------------------------------------------------
...
I'll now update the table definition in create_first_superuser.py
accordingly:
# src/scripts/create_first_superuser.py
...
if user is None:
metadata = MetaData()
user_table = Table(
"user", metadata,
...
Column("age", Integer, nullable=True, default=None),
...
)
This should work out of the box, all you need to do is run the alembic migration.
While in the src
folder:
poetry run alembic revision --autogenerate
And to apply the migration
poetry run alembic upgrade head
Now you can create your middleware in the app/middleware
folder. Client-side cache was moved there.
rate_limit.py
#45create_first_superuser
now working again.[!WARNING] If you change the user model, now you'll also have to change the definition in
create_first_superuser
script. That happens because the script wasn't working without the relationship definitions, also getting the user model isn't trivial for async. May be fixed eventually.
Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.5.0...v0.6.0
rate_limiter
dependency created πpattern_to_invalidate_extra
π¬To allow fully customizable tier creation and rate limiting, new models, schemas and crud objects were created.
To limit how many times a user can make a request in a certain interval of time (very useful to create subscription plans or just to protect your API against DDOS), you may just use the rate_limiter
dependency:
from fastapi import Depends
from app.api.dependencies import rate_limiter
from app.core import queue
from app.schemas.job import Job
@router.post("/task", response_model=Job, status_code=201, dependencies=[Depends(rate_limiter)])
async def create_task(message: str):
job = await queue.pool.enqueue_job("sample_background_task", message)
return {"id": job.job_id}
By default, if no token is passed in the header (that is - the user is not authenticated), the user will be limited by his IP address with the default limit
(how many times the user can make this request every period) and period
(time in seconds) defined in .env
.
Even though this is useful, real power comes from creating tiers
(categories of users) and standard rate_limits
(limits
and periods
defined for specific paths
- that is - endpoints) for these tiers.
All of the tier
and rate_limit
models, schemas, and endpoints are already created in the respective folders (and usable only by superusers). You may use the create_tier
script to create the first tier (it uses the .env
variable TIER_NAME
, which is all you need to create a tier) or just use the api:
Here I'll create a free
tier:
And a pro
tier:
Then I'll associate a rate_limit
for the path api/v1/tasks/task
for each of them, I'll associate a rate limit
for the path api/v1/tasks/task
.
1 request every hour (3600 seconds) for the free tier:
10 requests every hour for the pro tier:
Now let's read all the tiers available (GET api/v1/tiers
):
{
"data": [
{
"name": "free",
"id": 1,
"created_at": "2023-11-11T05:57:25.420360"
},
{
"name": "pro",
"id": 2,
"created_at": "2023-11-12T00:40:00.759847"
}
],
"total_count": 2,
"has_more": false,
"page": 1,
"items_per_page": 10
}
And read the rate_limits
for the pro
tier to ensure it's working (GET api/v1/tier/pro/rate_limits
):
{
"data": [
{
"path": "api_v1_tasks_task",
"limit": 10,
"period": 3600,
"id": 1,
"tier_id": 2,
"name": "api_v1_tasks:10:3600"
}
],
"total_count": 1,
"has_more": false,
"page": 1,
"items_per_page": 10
}
Now, whenever an authenticated user makes a POST
request to the api/v1/tasks/task
, they'll use the quota that is defined by their tier.
You may check this getting the token from the api/v1/login
endpoint, then passing it in the request header:
curl -X POST 'http://127.0.0.1:8000/api/v1/tasks/task?message=test' \
-H 'Authorization: Bearer <your-token-here>'
Warning Since the
rate_limiter
dependency uses theget_optional_user
dependency instead ofget_current_user
, it will not require authentication to be used, but will behave accordingly if the user is authenticated (and token is passed in header). If you want to ensure authentication, also useget_current_user
if you need.
To change a user's tier, you may just use the PATCH api/v1/user/{username}/tier
endpoint.
Note that for flexibility (since this is a boilerplate), it's not necessary to previously inform a tier_id to create a user, but you probably should set every user to a certain tier (let's say free
) once they are created.
Warning If a user does not have a
tier
or the tier does not have a definedrate limit
for the path and the token is still passed to the request, the defaultlimit
andperiod
will be used, this will be saved inapp/logs
.
Let's assume we have an endpoint with a paginated response, such as:
@router.get("/{username}/posts", response_model=PaginatedListResponse[PostRead])
@cache(
key_prefix="{username}_posts:page_{page}:items_per_page:{items_per_page}",
resource_id_name="username",
expiration=60
)
async def read_posts(
request: Request,
username: str,
db: Annotated[AsyncSession, Depends(async_get_db)],
page: int = 1,
items_per_page: int = 10
):
db_user = await crud_users.get(db=db, schema_to_select=UserRead, username=username, is_deleted=False)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
posts_data = await crud_posts.get_multi(
db=db,
offset=compute_offset(page, items_per_page),
limit=items_per_page,
schema_to_select=PostRead,
created_by_user_id=db_user["id"],
is_deleted=False
)
return paginated_response(
crud_data=posts_data,
page=page,
items_per_page=items_per_page
)
Just passing to_invalidate_extra
will not work to invalidate this cache, since the key will change based on the page
and items_per_page
values.
To overcome this we may use the pattern_to_invalidate_extra
parameter:
@router.patch("/{username}/post/{id}")
@cache(
"{username}_post_cache",
resource_id_name="id",
pattern_to_invalidate_extra=["{username}_posts:*"]
)
async def patch_post(
request: Request,
username: str,
id: int,
values: PostUpdate,
current_user: Annotated[UserRead, Depends(get_current_user)],
db: Annotated[AsyncSession, Depends(async_get_db)]
):
...
Now it will invalidate all caches with a key that matches the pattern "{username}_posts:*
, which will work for the paginated responses.
Warning Using
pattern_to_invalidate_extra
can be resource-intensive on large datasets. Use it judiciously and consider the potential impact on Redis performance. Be cautious with patterns that could match a large number of keys, as deleting many keys simultaneously may impact the performance of the Redis server.
pattern_to_invalidate_extra
will also allow you to invalidate paginated responses cacheWarning What's retrieved from the get and get multi methods is no longer a
sqlalchemy.engine.row.Row
, is a pythondict
instead. Attributes should be accessed with object["attribute_name"] instead of object.attribute_name
Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.4.1...v0.5.0
app.api.pagination
module createdListResponse
and PaginatedListResponse
moved to pagination modulepaginated_response
and compute_offset
functions created in pagination moduleget_multi
updated to the new structureWith the get_multi
method we get a python dict
with full suport for pagination:
{
"data": [
{
"id": 4,
"name": "User Userson",
"username": "userson4",
"email": "[email protected]",
"profile_image_url": "https://profileimageurl.com"
},
{
"id": 5,
"name": "User Userson",
"username": "userson5",
"email": "[email protected]",
"profile_image_url": "https://profileimageurl.com"
}
],
"total_count": 2,
"has_more": false,
"page": 1,
"items_per_page": 10
}
And in the endpoint, we can import from app/api/paginated
the following functions and Pydantic Schema:
from app.api.paginated import (
PaginatedListResponse, # What you'll use as a response_model to validate
paginated_response, # Creates a paginated response based on the parameters
compute_offset # Calculate the offset for pagination ((page - 1) * items_per_page)
)
Then let's create the endpoint:
import fastapi
from app.schemas.entity imoport EntityRead
...
@router.get("/entities", response_model=PaginatedListResponse[EntityRead])
async def read_entities(
request: Request,
db: Annotated[AsyncSession, Depends(async_get_db)],
page: int = 1,
items_per_page: int = 10
):
entities_data = await crud_entity.get_multi(
db=db,
offset=compute_offset(page, items_per_page),
limit=items_per_page,
schema_to_select=UserRead,
is_deleted=False
)
return paginated_response(
crud_data=entities_data,
page=page,
items_per_page=items_per_page
)
Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.4.0...v0.4.1
get_multi
method we now get a python dict
with full support to pagination:{
"data": [
{
"id": 4,
"name": "User Userson",
"username": "userson4",
"email": "[email protected]",
"profile_image_url": "https://profileimageurl.com"
},
{
"id": 5,
"name": "User Userson",
"username": "userson5",
"email": "[email protected]",
"profile_image_url": "https://profileimageurl.com"
}
],
"total_count": 2,
"has_more": false,
"page": 1,
"items_per_page": 10
}
Warning What's retrieved from the get and get multi methods is no longer a
sqlalchemy.engine.row.Row
, is a pythondict
instead.
PaginatedListResponse
as your response_model:@router.get("/users", response_model=PaginatedListResponse[UserRead])
async def read_users(
request: Request,
db: Annotated[AsyncSession, Depends(async_get_db)],
page: int = 1,
items_per_page: int = 10
):
users_data = await crud_users.get_multi(
db=db,
offset=(page - 1) * items_per_page,
limit=items_per_page,
schema_to_select=UserRead,
is_deleted=False
) # this returns a python dict
return {
"data": users_data["data"],
"total_count": users_data["total_count"],
"has_more": (page * items_per_page) < users_data["total_count"],
"page": page,
"items_per_page": items_per_page
}
There's also a less powerful ListResponse.
You can also get the count of a certain object with the specified filter:
# Here I'm getting the count of users with the name 'User Userson'
user = await crud_users.count(
db=db,
name="User Userson"
)
Indexes
were added to relevant fields in pydantic for faster countdict
, so instead of:# BAD
db_user = crud_user.get(db=db, username=username)
db_user.id
You should do:
# GOOD
db_user = crud_user.get(db=db, username=username)
db_user["id"]
jsonable_encoder
instead of custom function in cache
field_serializer
in date fieldsFull Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.3.3...v0.4.0
removed read deleted users endpoint
Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.3.2...v0.3.3
schema_to_select now also accepts a list of column names
Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.3.1...v0.3.2
CRUDBase docs and type hints corrected (get and get_multi now return Row, not Model)
Full Changelog: https://github.com/igorbenav/FastAPI-boilerplate/compare/v0.3.0...v0.3.1