Skip to main content

SQLAlchemy middleware for FastAPI

Project description

SQLAlchemy FastAPI middleware

ci ci codecov License: MIT pip Downloads Updates

Description

Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.

Install

      pip install fastapi-async-sqlalchemy

It also works with sqlmodel

Examples

Note that the session object provided by db.session is based on the Python3.7+ ContextVar. This means that each session is linked to the individual request context in which it was created.

from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware
from fastapi_async_sqlalchemy import db  # provide access to a database session
from sqlalchemy import column
from sqlalchemy import table

app = FastAPI()
app.add_middleware(
    SQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
    engine_args={              # engine arguments example
        "echo": True,          # print all SQL statements
        "pool_pre_ping": True, # feature will normally emit SQL equivalent to “SELECT 1” each time a connection is checked out from the pool
        "pool_size": 5,        # number of connections to keep open at a time
        "max_overflow": 10,    # number of connections to allow to be opened above pool_size
    },
)
# once the middleware is applied, any route can then access the database session
# from the global ``db``

foo = table("ms_files", column("id"))

# Usage inside of a route
@app.get("/")
async def get_files():
    result = await db.session.execute(foo.select())
    return result.fetchall()

async def get_db_fetch():
    # It uses the same ``db`` object and use it as a context manager:
    async with db():
        result = await db.session.execute(foo.select())
        return result.fetchall()

# Usage inside of a route using a db context
@app.get("/db_context")
async def db_context():
    return await get_db_fetch()

# Usage outside of a route using a db context
@app.on_event("startup")
async def on_startup():
    # We are outside of a request context, therefore we cannot rely on ``SQLAlchemyMiddleware``
    # to create a database session for us.
    result = await get_db_fetch()


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8002)

Usage of multiple databases

databases.py

from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxy

FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()

main.py

from fastapi import FastAPI

from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router

app = FastAPI()

app.include_router(router)

app.add_middleware(
    FirstSQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)
app.add_middleware(
    SecondSQLAlchemyMiddleware,
    db_url="mysql+aiomysql://user:user@192.168.88.200:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)

routes.py

import asyncio

from fastapi import APIRouter
from sqlalchemy import column, table, text

from databases import first_db, second_db

router = APIRouter()

foo = table("ms_files", column("id"))

@router.get("/first-db-files")
async def get_files_from_first_db():
    result = await first_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/second-db-files")
async def get_files_from_second_db():
    result = await second_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/concurrent-queries")
async def parallel_select():
    async with first_db(multi_sessions=True):
        async def execute_query(query):
            return await first_db.session.execute(text(query))

        tasks = [
            asyncio.create_task(execute_query("SELECT 1")),
            asyncio.create_task(execute_query("SELECT 2")),
            asyncio.create_task(execute_query("SELECT 3")),
            asyncio.create_task(execute_query("SELECT 4")),
            asyncio.create_task(execute_query("SELECT 5")),
            asyncio.create_task(execute_query("SELECT 6")),
        ]

        await asyncio.gather(*tasks)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_async_sqlalchemy-0.7.1.post3.tar.gz (27.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file fastapi_async_sqlalchemy-0.7.1.post3.tar.gz.

File metadata

File hashes

Hashes for fastapi_async_sqlalchemy-0.7.1.post3.tar.gz
Algorithm Hash digest
SHA256 0d619d6b99eb8373d71034ba4ed2ae561676ccc31913ba1ba67fb51473fbe181
MD5 b64fcd7169184d1824b303b6fd356ca0
BLAKE2b-256 51fc25991e84814d646adf538d4c6a5b6c531fc728aad5d9f5b8d33690634188

See more details on using hashes here.

File details

Details for the file fastapi_async_sqlalchemy-0.7.1.post3-py3-none-any.whl.

File metadata

File hashes

Hashes for fastapi_async_sqlalchemy-0.7.1.post3-py3-none-any.whl
Algorithm Hash digest
SHA256 ae833c47d7b24eae755af4c9c29514713b03bd17c1e4b0078afbf29c4637194b
MD5 a55808e097fd480c40fa44a0d9abddfc
BLAKE2b-256 e6cb9e1ffd12ebf0b562120d8da090bbce86536a8489279c685d630f6fea6a4d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page