A manager to easily handle multiple SQLAlchemy configurations
Project description
SQLAlchemy bind manager
This package provides an easy way to configure and use SQLAlchemy engines and sessions without depending on frameworks.
It is composed by two main components:
- A manager class for SQLAlchemy engine and session configuration
- A repository/unit-of-work pattern implementation for model retrieval and persistence
Installation
pip install sqlalchemy-bind-manager
Components maturity
- SQLAlchemy manager: Implementation is mostly finalised, needs testing in production.
- Repository / Unit of work: Implementation is mostly finalised, needs testing in production.
SQLAlchemy manager
Initialise the manager providing an instance of SQLAlchemyConfig
from sqlalchemy_bind_manager import SQLAlchemyConfig, SQLAlchemyBindManager
config = SQLAlchemyConfig(
engine_url="sqlite:///./sqlite.db",
engine_options=dict(connect_args={"check_same_thread": False}, echo=True),
session_options=dict(expire_on_commit=False),
)
sa_manager = SQLAlchemyBindManager(config)
🚨 NOTE: Using global variables is not thread-safe, please read the Threading section if your application uses multi-threading.
The engine_url
and engine_options
dictionaries accept the same parameters as SQLAlchemy create_engine()
The session_options
dictionary accepts the same parameters as SQLALchemy sessionmaker()
Once the bind manager is initialised we can retrieve and use the SQLAlchemyBind using the method get_bind()
The SQLAlchemyBind
and SQLAlchemyAsyncBind
class has the following attributes:
engine
: The initialised SQLALchemyEngine
model_declarative_base
: A base class that can be used to create declarative modelsregistry_mapper
: Theregistry
associated with theengine
. It can be used with Alembic or to setup imperative mappingsession_class
: The class built by sessionmaker(), eitherSession
orAsyncSession
The SQLAlchemyBindManager
provides some helper methods to quickly access some of the bind properties without using the SQLAlchemyBind
:
get_session
: returns a Session objectget_mapper
: returns the mapper associated with the bind
Example:
bind = sa_manager.get_bind()
class DeclarativeModel(bind.model_declarative_base):
pass
class ImperativeModel:
id: int
imperative_table = Table(
"imperative",
bind.registry_mapper.metadata,
Column("id", Integer, primary_key=True),
Column("name", String, primary_key=True),
)
bind.registry_mapper.map_imperatively(ImperativeModel, imperative_table)
# or using the get_mapper() helper method
sa_manager.get_mapper().map_imperatively(ImperativeModel, imperative_table)
# Persist an object
o = ImperativeModel() # also o = DeclarativeModel()
o.name = "John"
with sa_manager.get_bind().session_class()() as session:
session.add(o)
session.commit()
# or using the get_session() helper method for better readability
with sa_manager.get_session() as session:
session.add(o)
session.commit()
Multiple database binds
SQLAlchemyBindManager
accepts also multiple databases configuration, provided as a dictionary. The dictionary keys are used as a reference name for each bind.
from sqlalchemy_bind_manager import SQLAlchemyConfig, SQLAlchemyBindManager
config = {
"default": SQLAlchemyConfig(
engine_url="sqlite:///./sqlite.db",
engine_options=dict(connect_args={"check_same_thread": False}, echo=True),
session_options=dict(expire_on_commit=False),
),
"secondary": SQLAlchemyConfig(
engine_url="sqlite:///./secondary.db",
engine_options=dict(connect_args={"check_same_thread": False}, echo=True),
session_options=dict(expire_on_commit=False),
),
}
sa_manager = SQLAlchemyBindManager(config)
All the SQLAlchemyBindManager
helper methods accept the bind_name
optional parameter:
get_session(bind_name="default")
: returns aSession
orAsyncSession
objectget_mapper(bind_name="default")
: returns the mapper associated with the bind
Threading
Global variables are shared between different threads in python. If your application uses multiple threads, like spawning a thread per request, then you should not store an initialised session in a global variable, otherwise the state of your models will be shared among the threads and produce undesired changes in the database.
This is not thread safe:
session = sa_manager.get_session()
session.add(model)
session.commit()
If you truly need to have a long-lived session you'll need to use a scoped session, something like this:
from sqlalchemy.orm import scoped_session
session = scoped_session(sa_manager.get_bind().session_class())
Handling the life cycle of scoped sessions is not supported by this documentations. Please refer to SQLAlchemy documentation about this.
Asynchronous database binds
Is it possible to supply configurations for asyncio supported engines.
config = SQLAlchemyAsyncConfig(
engine_url="postgresql+asyncpg://scott:tiger@localhost/test",
)
This will make sure we have an AsyncEngine
and an AsyncSession
are initialised.
async with sa_manager.get_session() as session:
session.add(o)
session.commit()
Note that async implementation has several differences from the sync one, make sure to check SQLAlchemy asyncio documentation
Repository / Unit of work
The SQLAlchemyRepository
and SQLAlchemyAsyncRepository
class can be used directly or by extending them.
from sqlalchemy_bind_manager.repository import SQLAlchemyRepository
class MyModel(model_declarative_base):
pass
# Direct usage
repo_instance = SQLAlchemyRepository(sqlalchemy_bind_manager.get_bind(), model_class=MyModel)
class ModelRepository(SQLAlchemyRepository[MyModel]):
_model = MyModel
def _some_custom_method_implemented(self):
...
# Extended class usage
extended_repo_instance = ModelRepository(sqlalchemy_bind_manager.get_bind())
The classes provide some common use methods:
get
: Retrieve a model by identifiersave
: Persist a modelsave_many
: Persist multiple models in a single transactiondelete
: Delete a modelfind
: Search for a list of models (basically an adapter for SELECT queries)paginated_find
: Search for a list of models, with pagination supportcursor_paginated_find
: Search for a list of models, with cursor based pagination support
Session lifecycle in repositories
SQLAlchemy documentation
recommends we create Session
object at the beginning of a logical operation where
database access is potentially anticipated.
Doing this too soon might cause unexpected effects, like unexpected updates being committed, if the initialised session is shared among different repositories.
A Repository
represents a generic interface to persist data object to a storage, not necessarily
using SQLAlchemy. It makes sense that the lifecycle of a Session
follows the one of the Repository
(The assumption is: if we create a Repository, we're going to do a DB operation,
otherwise we wouldn't need one).
Each Repository instance create an internal scoped session. The session gets automatically closed when the Repository instance is not referenced by any variable (and the garbage collector clean it up)
In this way we ensure the Session
we use is isolated, and the same for all the operations we do with the
same Repository.
This approach has a consequence: We can't use SQLAlchemy lazy loading, so we'll need to make sure relationship are always loaded eagerly, using either approach:
- Setup your model/table relationships to always use always eager loading
- Implement ad-hoc methods to deal with relationships as necessary
Note that AsyncSession
has the same limitation on lazy loading,
even when keeping the session opened, so it makes sense that the two Repository implementations behave consistently.
Use the Unit Of Work to share a session among multiple repositories
It is possible we need to run several operations in a single database transaction. While a single repository provide by itself an isolated session for single operations, we have to use a different approach for multiple operations.
We can use the UnitOfWork
or the AsyncUnitOfWork
class to provide a shared session to
be used for repository operations, assumed the same bind is used for all the repositories.
class MyRepo(SQLAlchemyRepository):
_model = MyModel
class MyOtherRepo(SQLAlchemyRepository):
_model = MyOtherModel
bind = sa_manager.get_bind()
uow = UnitOfWork(bind, (MyRepo, MyOtherRepo))
with uow.transaction():
uow.MyRepo.save(some_model)
uow.MyOtherRepo.save(some_other_model)
# Optionally disable the commit/rollback handling
with uow.transaction(read_only=True):
model1 = uow.MyRepo.get(1)
model2 = uow.MyOtherRepo.get(2)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for sqlalchemy_bind_manager-0.3.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | cb1e8ddcd091b46785c405af72ab35f0d68c0a920627de7e3f00b0d1e59c9629 |
|
MD5 | 71a3656d9776743cb99039d7f1ddb3a2 |
|
BLAKE2b-256 | 39de6f3d6dcf0b7d8536bdcadee76a4e0e127ef51b5b53c18edef7f848e2c92c |
Hashes for sqlalchemy_bind_manager-0.3.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5834504731306f77438d1afafe4f7953fd5bbf6d6ca1d5e3f47e5e54d90461d7 |
|
MD5 | 911e277f545cab35397b8afcb6a59744 |
|
BLAKE2b-256 | e66ce36d3827f93ed4b15373f6969e24dd76b87282edcea0c200bc06638956c1 |