Skip to main content

Spring JPA-inspired async database library for Python — built on SQLAlchemy async and Redis

Project description

viveka-kosha

A Spring JPA-inspired async database library for Python. Provides declarative entity mapping, generic repositories, transaction management, and cache decorators — built on SQLAlchemy async and Redis.


Design Philosophy

viveka-kosha maps Spring JPA concepts to Python idioms:

Spring JPA viveka-kosha
@Entity + @Table @entity + @table
JpaRepository<T, ID> DbRepo[T]
@Repository @repository
@Transactional @transactional
@Cacheable @cacheable
@CacheEvict @cache_evict
OpenSessionInView filter DbMiddleware
ThreadLocal session ContextVar session

The library is async-first. Every repository method, cache call, and session operation is async/await. This makes it a natural fit for FastAPI / Starlette services.


Architecture

viveka-kosha/
└── viveka_db/
    ├── base/
    │   ├── db_base.py        # VivekaDbBase — shared SQLAlchemy declarative base
    │   └── db_repo.py        # DbRepo[T] — generic CRUD repository
    ├── decorator/
    │   └── db_decorator.py   # @entity, @table, @repository, @transactional,
    │                         # @cacheable, @cache_evict
    ├── session/
    │   └── db_session.py     # DatabaseSessionFactory — singleton engine + session factory
    ├── middleware/
    │   └── db_middleware.py  # DbMiddleware — per-request session lifecycle
    ├── context/
    │   └── db_context.py     # ContextVar session store (async-safe ThreadLocal equivalent)
    └── cache/
        └── db_cache.py       # RedisCache — async Redis wrapper

Session Lifecycle

viveka-kosha manages sessions in two distinct paths:

HTTP Request
    │
    ▼
DbMiddleware.dispatch()
    │  creates AsyncSession
    │  stores in ContextVar
    ▼
Route Handler → Service → @transactional method
    │               detects session in ContextVar
    │               uses it directly (no new session)
    ▼
DbMiddleware
    │  commit on success
    │  rollback on exception
    └  close + clear ContextVar


Background Task / Async Worker
    │
    ▼
@transactional method
    │  no session in ContextVar
    │  opens a new session via DatabaseSessionFactory
    │  commit on success / rollback on exception / close
    └  logs a warning so async task usage is visible

This means @transactional works transparently in both HTTP and non-HTTP contexts with no configuration change.

Entity Registration

@entity registers the class in a global _entity_registry. @table(name) sets __tablename__ and triggers SQLAlchemy's declarative mapping. VivekaDbBase.__init_subclass__ marks any subclass without a __tablename__ as __abstract__ so SQLAlchemy skips unmapped intermediary classes.


Installation

Add to your service's requirements.txt:

viveka-kosha
sqlalchemy[asyncio]
asyncpg          # for PostgreSQL
aiomysql         # for MySQL
redis            # for cache decorators
starlette        # for DbMiddleware

Configuration

viveka-kosha reads from your ConfigService (from viveka-grantha).

config.ini:

[database]
url          = postgresql+asyncpg://user:pass@localhost/mydb
pool_size    = 10
max_overflow = 20
echo         = false

[cache]
url          = redis://localhost:6379/0
enabled      = true

Usage

1. Initialize the session factory at startup

from viveka_db.session.db_session import DatabaseSessionFactory
from viveka_common.config.config_service import ConfigService

config = ConfigService()
DatabaseSessionFactory(config_service=config)

2. Register the middleware (FastAPI / Starlette)

from fastapi import FastAPI
from viveka_db.middleware.db_middleware import DbMiddleware
from viveka_db.session.db_session import DatabaseSessionFactory

app = FastAPI()
engine = DatabaseSessionFactory.get_instance().engine
app.add_middleware(DbMiddleware, engine=engine)

3. Define an entity

from sqlalchemy import Column, Integer, String
from viveka_db.base.db_base import VivekaDbBase
from viveka_db.decorator.db_decorator import entity, table

@entity
@table("users")
class User(VivekaDbBase):
    id       = Column(Integer, primary_key=True)
    name     = Column(String(100), nullable=False)
    email    = Column(String(255), unique=True, nullable=False)

4. Define a repository

from viveka_db.base.db_repo import DbRepo
from viveka_db.decorator.db_decorator import repository, transactional, cacheable, cache_evict

@repository
class UserRepo(DbRepo[User]):

    def __init__(self):
        super().__init__(User)

    @transactional
    async def save(self, user: User) -> User:
        result = await self.insert_all([user])
        return result[0]

    @transactional
    @cacheable(key_prefix="user", ttl=3600)
    async def get(self, user_id: int):
        return await self.get_by_id(user_id)

    @transactional
    @cache_evict(key_prefix="user")
    async def remove(self, user_id: int) -> bool:
        return await self.delete(user_id)

    @transactional
    async def find_by_email(self, email: str):
        return await self.find_by_field("email", email)

5. Use the repository in a service

class UserService:

    def __init__(self):
        self.user_repo = UserRepo()

    async def register(self, name: str, email: str) -> User:
        user = User(name=name, email=email)
        return await self.user_repo.save(user)

    async def get_user(self, user_id: int) -> User:
        return await self.user_repo.get(user_id)

Built-in Repository Methods

DbRepo[T] provides these methods out of the box — all require @transactional on the calling method:

Method Description
insert_all(entities) Bulk insert, returns saved entities
get_by_id(id) Fetch by primary key
get_all(skip, limit) Paginated fetch (default: 0, 100)
update(entity) Merge and flush
delete(id) Delete by primary key, returns bool
delete_entity(entity) Delete a loaded entity instance
exists(id) Check existence by primary key
count() Total row count
find_by_field(field, value) First match on any field
find_all_by_field(field, value) All matches on any field

Decorators Reference

@entity

Marks a class as a mapped entity. Registers it in the global entity registry. The class must inherit from VivekaDbBase.

@table(name)

Sets the database table name and activates SQLAlchemy mapping. Must be used together with @entity.

@entity
@table("orders")
class Order(VivekaDbBase):
    ...

@repository

Marks a class as a repository for auto-discovery. No runtime behaviour beyond flagging.

@transactional

Injects the active AsyncSession into self.db before the method runs.

  • HTTP context: reuses the session created by DbMiddleware — middleware commits/rolls back.
  • Background task context: opens a dedicated session, commits on success, rolls back on exception.
@transactional
async def do_something(self):
    # self.db is an AsyncSession here
    result = await self.db.execute(select(MyModel))

@cacheable(key_prefix, ttl=3600)

Cache-aside read. Checks cache before calling the method. Caches the result on a miss. Cache key is {key_prefix}:{first_arg}.

@transactional
@cacheable(key_prefix="product", ttl=1800)
async def get_product(self, product_id: int):
    return await self.get_by_id(product_id)

@cache_evict(key_prefix)

Runs the method first, then deletes all keys matching {key_prefix}:* from cache.

@transactional
@cache_evict(key_prefix="product")
async def update_product(self, product: Product):
    return await self.update(product)

Dependencies

Package Role
sqlalchemy[asyncio] ORM, async engine, session
asyncpg / aiomysql Async DB driver (PostgreSQL / MySQL)
redis Async Redis client (redis.asyncio)
starlette BaseHTTPMiddleware for DbMiddleware
viveka-grantha ConfigService, VivekaLogManager, VivekaCacheManager

License

Copyright (c) 2025 VivekaSutra. All rights reserved.

This software is distributed under the VivekaSutra Proprietary Software License.

  • Free to download and use for personal or commercial purposes
  • Modification, redistribution, and reverse engineering are not permitted
  • Provided "AS IS" — no warranty of any kind
  • VivekaSutra is not liable for any damages arising from use

See LICENSE.txt for the full license text. For licensing inquiries visit vivekasutra.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

viveka_kosha-0.1.1.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

viveka_kosha-0.1.1-py3-none-any.whl (20.8 kB view details)

Uploaded Python 3

File details

Details for the file viveka_kosha-0.1.1.tar.gz.

File metadata

  • Download URL: viveka_kosha-0.1.1.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for viveka_kosha-0.1.1.tar.gz
Algorithm Hash digest
SHA256 525f4b811a45dd73fc074bfaa5ca4128b1c6a30cc569b03fdd1b82acd5d596d3
MD5 a3bcf8d02013e32c8c4e9824d753c775
BLAKE2b-256 265efc2f3eaad53418fe87d899855176baefbf1f0bf69b003247ce273f07c1cc

See more details on using hashes here.

File details

Details for the file viveka_kosha-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: viveka_kosha-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 20.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for viveka_kosha-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1e5f3c7a918cad81265d51fe062addff2945a1b94d162b93bf28debea808cb7e
MD5 6197ee814b8fdafc1b44f9233b724aa1
BLAKE2b-256 38e9093f5f42901e878aa897783525148722cbb397cdbedd230ff8287eebccb7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page