From Zero to Production: Setting Up a SQL Database with Async Engine in FastAPI

From Zero to Production: Setting Up a SQL Database with Async Engine in FastAPI

Everything from Automated Database Creation, Connection Checks, and Reliable Lifecycle Management

Introduction

Today's technological age is marked by scalable web services, high-concurrency applications, applications producing large volumes of data etc. Therefore, choosing the right tech stacks and database setup is crucial. FastAPI has quickly become popular for building APIs due to its speed, high performance, and simplicity. It uses Python's asynchronous capabilities. However, with great speed comes the responsibility of setting up an efficient database connection, which is important when working with SQL databases.

In this guide, we'll walk through the process of integrating an SQL database with FastAPI using SQLAlchemy’s async engine. We’ll cover everything from automatically creating your database and tables to managing connections and ensuring your application’s database interactions are seamless and resilient.

Whether you're an experienced developer looking to refine your database setup or a newbie techie eager to learn the essentials of working with databases in FastAPI, this guide is for you. Let’s get started 🚀

Project Setup and Prerequisites

Before diving into the database module proper, it’s important to ensure that our development environment is ready.

  1. Prerequisites

I assume that you have a basic understanding of FastAPI and that you have a PostgreSQL database instance running locally or in a docker container.

  1. Setting Up the Project
  • Install dependencies: We’ll use fastapi, sqlalchemy with its asynchronous extension, and asyncpg as the PostgreSQL driver.
pip install fastapi sqlalchemy[asyncio] asyncpg
  • Folder Structure: You may organize your project directory like this for better manageability.
bookapi/
├── app/
│   ├── main.py
│   ├── core/
│   │   └── database.py
│   ├── models/
│   │   └── book_model.py
│   ├── schemas/
│   │   └── book_schema.py
│   └── routers/
│       └── book_router.py
├── venv/
└── requirements.txt
  • Configure Environment Variables: Store database connection details in an .env file.
DATABASE_URL=postgresql+asyncpg://user:password@localhost:5432/bookapi

With the project structure set up, we're ready to move on to creating an async database engine and handling connections.

Creating the Async Database Engine

Using an asynchronous engine prevents database queries from blocking the event loop, enabling FastAPI to handle multiple requests at once. This is especially useful in high-traffic situations like real-time chat apps or online gaming servers, where many users access the database simultaneously.

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base
from .config import env_vars

# Database URL fetched from environment variables
DATABASE_URL = env_vars.DATABASE_URL

# Create the async engine
engine = create_async_engine(DATABASE_URL, echo=True, future=True)

# Define the base class for models
Base = declarative_base()

Explanations:

  • echo=True allows SQL queries to be logged to the console, which is helpful during development, but should be False in production.

  • future=True ensures compatibility with SQLAlchemy 2.0-style code.

  • declarative_base is used to create the base class for our models. This base class will help in defining database tables later.

Implementing the Database Module

In this section, we'll create a module to manage connections, automatically create the database and tables, and ensure smooth interaction with our PostgreSQL database using SQLAlchemy's async engine. This setup makes our database operations efficient and scalable, preparing the app for production.

  1. Setting Up the Database Module
# database.py

from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError

from .config import env_vars

class Database:
    def __init__(self):
        self.engine = create_async_engine(env_vars.DATABASE_URL, echo=True, future=True)
        self.Base = declarative_base()
  1. Automated Database Creation

To simplify the setup, we aim to automatically create the database if it doesn't exist. This is especially useful in development or initial deployment. Inside the Database class, define the following method:

# database.py
#...continued

async def create_database(self, database_name: str):
    superuser_database_url = env_vars.SUPERUSER_DATABASE_URL
    superuser_engine = create_async_engine(superuser_database_url, echo=True, future=True, isolation_level="AUTOCOMMIT")
    try:
        async with superuser_engine.connect() as conn:
            result = await conn.execute(text(f"SELECT 1 FROM pg_database WHERE datname = '{database_name}'"))
            db_exists = result.scalar()
            if not db_exists:
               await conn.execute(text(f"CREATE DATABASE \"{database_name}\""))
               print(f"Database '{database_name}' created successfully")
            else:
                print(f"Database '{database_name}' already exists")
        except ProgrammingError as e:
            print(f"Error creating database: {e}")
        finally:
            await superuser_engine.dispose()

Explanations:

  • This method connects to the PostgreSQL database (usually postgres) and checks if our target database exists (say database_name = “bookdb”)

  • The database user should be a superuser or admin-level (like postgres), which can run CREATE DATABASE queries. E.g:

SUPERUSER_DATABASE_URL="postgresql+asyncpg://postgres:<PASSWORD>@localhost/postgres"
  • isolation_level=”AUTOCOMMIT”: this setting is important and it ensures that CREATE DATABASE command is not wrapped inside a transaction.
  1. Automatic Connection Checking

It's a good practice to check the database connection before any operations. This prevents the application from running queries when the database is unreachable, avoiding errors or data loss.

# database.py
#...continued

async def ping_database(self):
    try:
        async with self.engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        print("Successfully connected to the Database!")
    except Exception as e:
        print(f"Error connecting to database: {e}")
  1. Creating Database Tables on Startup

Once the database is created, we need to create tables from our SQLAlchemy models. This maps our Python models to SQL tables, ensuring tables are ready for data storage.

# database.py
#...continued

async def create_tables(self):
    async with self.engine.begin() as conn:
        await conn.run_sync(self.Base.metadata.create_all)
    print("Database tables created successfully")

Session Management

  1. Session Manager

With asynchronous programming, we aim to maximize the responsiveness of the application, especially when dealing with I/O operations like database interactions.

# database.py
#...continued

@asynccontextmanager
async def get_session(self) -> AsyncSession:
    async_session = sessionmaker(self.engine, class_=AsyncSession)
    session = None
    try:
        session = async_session()
        async with session:
            yield session
    except Exception as e:
        await session.rollback()
        raise e
    finally:
        await session.close()

Explanations:

  • @asynccontextmanager is used to manage asynchronous resources (e.g database connections, network sockets) that needs the same setup and tear-down functionality.

  • we use sessionmaker to create sessions that serve as the interface to our database. With the sessionmaker, each request gets its own session, ensuring that database operations are isolated from each other.

  • async with ensures that sessions are used safely, allowing for automatic cleanup when the context is exited.

  • session.rollback() is called if an error occurs, ensuring that any partial changes are undone to maintain data consistency.

  • session.close() ensures that the session is closed after each transaction, freeing up resources for other operations.

  1. Graceful Shutdown of Connections

Finally, to ensure a clean shutdown of the application, we need a method to properly close the database engine when the application stops:

# database.py
#...continued

async def close_database(self):
    await self.engine.dispose()
    print("Database connection closed!")

Running the Module on App’s Events

Finally on the database module, we need to run the appropriate methods defined in the Database at different stages of our FastAPI application.

# database.py
#...continued

database = Database()

# This is outside the Database class
async def setup_database():
    await database.create_database(env_vars.DATABASE_NAME)
    await database.ping_database()
    await database.create_tables()
# main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.core.database import database, setup_database

@asynccontextmanager
async def lifespan(app: FastAPI):
    await setup_database()
    yield
    await database.close_database()

app = FastAPI(
    lifespan=lifespan,
    title="Book API"
)

Explanation:

  • Inside our main.py file, we defined a FastAPI lifespan function(event)

  • the logic before the yield statement is a startup event that will run once before the application startup

  • the logic after the yield statement is a shutdown event that will run once after the application shutdown

With these practices in place, we can be sure that our database connections are well-managed as our app grows. Next, we'll use these sessions to create a sample endpoint for CRUD operations on our database.

Implementing a Sample Endpoint: Create a Book

With our connection management in place, we can now use the get_session method in our FastAPI endpoints or service logic to perform database operations safely:

# book_router.py

from fastapi import APIRouter, Depends
from app.core.database import database
from app.models.book_model import Book
from app.schemas.book_schema import BookSchema

router = APIRouter()

@router.post("/books")
async def create_book(book_data: BookSchema, db_session=Depends(database.get_session)):
    async with db_session as session:
        new_book = Book(**book_data)
        session.add(new_book)
        await session.commit()
        return {"message": "Book created successfully!"}

Conclusion

That's it for now guys. I’m sure that you’ve gained more knowledge about how to setup an asynchronous SQL database engine in FastAPI through this article.

For schema changes, I recommend you use Alembic for database migrations, ensuring consistent updates.

Feel free to drop a comment, and ask question if you have any. Cheers to building more scalabe and performant FastAPI apps.

Happy coding! 👨‍💻❤️