Skip to main content

A convenient way to configure and interact with a async sqlalchemy session through context in asynchronous applications

Project description

context-async-sqlalchemy

PyPI

ContextVar + async sqlalchemy = happiness.

A convenient way to configure and interact with async sqlalchemy session through context in asynchronous applications.

What does usage look like?

from context_async_sqlalchemy import db_session
from sqlalchemy import insert

from database import master  # your configured connection to the database
from models import ExampleTable  # just some model for example

async def some_func() -> None:
    # Created a session (no connection to the database yet)
    # If you call db_session again, it will return the same session
    # even in child coroutines.
    session = await db_session(master)
    
    stmt = insert(ExampleTable).values(text="example_with_db_session")

    # On the first request, a connection and transaction were opened
    await session.execute(stmt)

    # The commit and closing of the session will occur automatically

How to use

The repository includes an example integration with FastAPI, which describes numerous workflows. FastAPI example

It also includes two types of test setups you can use in your projects. The library currently has 90% test coverage. The tests are in the examples, as we want to test not in the abstract but in the context of a real asynchronous web application.

FastAPI tests example

The most basic example

1. Configure the connection to the database

for example for PostgreSQL database.py:

from sqlalchemy.ext.asyncio import (
    async_sessionmaker,
    AsyncEngine,
    AsyncSession,
    create_async_engine,
)

from context_async_sqlalchemy import DBConnect


def create_engine(host: str) -> AsyncEngine:
    """
    database connection parameters.
    """
    # In production code, you will probably take these parameters from env
    pg_user = "krylosov-aa"
    pg_password = ""
    pg_port = 6432
    pg_db = "test"
    return create_async_engine(
        f"postgresql+asyncpg://"
        f"{pg_user}:{pg_password}"
        f"@{host}:{pg_port}"
        f"/{pg_db}",
        future=True,
        pool_pre_ping=True,
    )


def create_session_maker(
    engine: AsyncEngine,
) -> async_sessionmaker[AsyncSession]:
    """session parameters"""
    return async_sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )


master = DBConnect(
    host="127.0.0.1",
    engine_creator=create_engine,
    session_maker_creator=create_session_maker,
)

2. Manage Database connection lifecycle

Configure the connection to the database at the begin of your application's life. Close the resources at the end of your application's life

Example for FastAPI:

from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator
from fastapi import FastAPI

from database import master


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
    """Database connection lifecycle management"""
    yield
    await master.close()  # Close the engine if it was open

3. Setup context lifetime

For a contextual session to work, a context needs to be set. This assumes some kind of middleware.

I'll use FastAPI middleware as an example:

from fastapi import Request
from starlette.middleware.base import (  # type: ignore[attr-defined]
    Response,
    RequestResponseEndpoint,
)

from context_async_sqlalchemy import (
    init_db_session_ctx,
    is_context_initiated,
    reset_db_session_ctx,
    auto_commit_by_status_code,
    rollback_all_sessions,
)


async def fastapi_db_session_middleware(
    request: Request, call_next: RequestResponseEndpoint
) -> Response:
    """
    Database session lifecycle management.
    The session itself is created on demand in db_session().

    Transaction auto-commit is implemented if there is no exception and
        the response status is < 400. Otherwise, a rollback is performed.

    But you can commit or rollback manually in the handler.
    """
    # Tests have different session management rules
    # so if the context variable is already set, we do nothing
    if is_context_initiated():
        return await call_next(request)

    # We set the context here, meaning all child coroutines will receive the
    # same context. And even if a child coroutine requests the
    # session first, the dictionary itself is shared, and this coroutine will
    # add the session to dictionary = shared context.
    token = init_db_session_ctx()
    try:
        response = await call_next(request)
        await auto_commit_by_status_code(response.status_code)
        return response
    except Exception:
        await rollback_all_sessions()
        raise
    finally:
        await reset_db_session_ctx(token)

You can use ready-made FastAPI middleware:

from fastapi import FastAPI
from context_async_sqlalchemy import add_fastapi_db_session_middleware

app = FastAPI()

add_fastapi_db_session_middleware(app)

4. Write a function that will work with the session

from sqlalchemy import insert

from context_async_sqlalchemy import db_session

from database import master
from models import ExampleTable


async def handler_with_db_session() -> None:
    """
    An example of a typical handle that uses a context session to work with
        a database.
    Autocommit or autorollback occurs automatically at the end of a request
        (in middleware).
    """
    # Created a session (no connection to the database yet)
    # If you call db_session again, it will return the same session
    # even in child coroutines.
    session = await db_session(master)

    stmt = insert(ExampleTable).values(text="example_with_db_session")

    # On the first request, a connection and transaction were opened
    await session.execute(stmt)

Master/Replica or several databases at the same time

This is why db_session and other functions accept DBConnect as input. This way, you can work with multiple hosts simultaneously, for example, with the master and the replica.

libpq can detect the master and replica to create an engine. However, it only does this once during creation. This handler helps change the host on the fly if the master or replica changes. Let's imagine that you have a third-party functionality that helps determine the master or replica.

In this example, the host is not set from the very beginning, but will be calculated during the first call to create a session.

from context_async_sqlalchemy import DBConnect

from master_replica_helper import get_master, get_replica


async def renew_master_connect(connect: DBConnect) -> None:
    """Updates the host if the master has changed"""
    master_host = await get_master()
    if master_host != connect.host:
        await connect.change_host(master_host)

        
master = DBConnect(
    engine_creator=create_engine,
    session_maker_creator=create_session_maker,
    before_create_session_handler=renew_master_connect,
)


async def renew_replica_connect(connect: DBConnect) -> None:
    """Updates the host if the replica has changed"""
    replica_host = await get_replica()
    if replica_host != connect.host:
        await connect.change_host(replica_host)

        
replica = DBConnect(
    engine_creator=create_engine,
    session_maker_creator=create_session_maker,
    before_create_session_handler=renew_replica_connect,
)

Testing

The library provides several ready-made utils that can be used in tests, for example in fixtures. It helps write tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback.

You can see the capabilities in the examples:

Here are tests with a common transaction between the application and the tests.

And here's an example with different transactions.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

context_async_sqlalchemy-1.2.3.tar.gz (159.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

context_async_sqlalchemy-1.2.3-py3-none-any.whl (11.2 kB view details)

Uploaded Python 3

File details

Details for the file context_async_sqlalchemy-1.2.3.tar.gz.

File metadata

File hashes

Hashes for context_async_sqlalchemy-1.2.3.tar.gz
Algorithm Hash digest
SHA256 95b24ddafa0546596dd2e25147e0d5750fc82c754254c95e918d9e53ba75778d
MD5 32962bf3efdcd4304a37feff0288d3ea
BLAKE2b-256 4cdf14b6c13f27c2a5e6b5e3e254c5ed78aedf5f43ca02e0bdd885e3fb7a1d6f

See more details on using hashes here.

File details

Details for the file context_async_sqlalchemy-1.2.3-py3-none-any.whl.

File metadata

File hashes

Hashes for context_async_sqlalchemy-1.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 941e91b1ea7567d0310f0fe75fa5e566ae59d811e73382eb596c9a67b6a9b1b2
MD5 bf3aa903e737e93c48fdd757ad754428
BLAKE2b-256 050d4b35ff7489997cdaf52250ae7679aac5afd758aaa6aafd8695b70152a152

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page