Skip to main content

Backend-agnostic domain model, service layer, authorization, query AST and DTOs for varco

Project description

varco-core

PyPI version Python License: Apache 2.0 GitHub

Backend-agnostic domain model and service layer for varco.

Provides the pure-Python building blocks that all backend packages depend on — no ORM imports at the core layer. Pair it with varco-sa (SQLAlchemy) or varco-beanie (MongoDB) for a concrete backend.


Install

pip install varco-core

Features

  • Domain modelDomainModel, AuditedDomainModel, VersionedDomainModel, soft-delete and multi-tenant variants
  • Generic async serviceAsyncService with built-in create / read / update / delete / list, pagination, soft-delete, and multi-tenancy mixins
  • AuthorizationAbstractAuthorizer, GrantBasedAuthorizer, RoleBasedAuthorizer, OwnershipAuthorizer
  • DTO layerCreateDTO, ReadDTO, UpdateDTO, generate_dtos() factory, cursor-based pagination
  • Fluent query builderQueryBuilder → AST → QueryParser (string → AST via Lark grammar); backend-independent
  • JWT / JWKJwtBuilder, JwtParser, JwkBuilder, MultiKeyAuthority, OIDC + PEM sources
  • Event systemAbstractEventBus, EventConsumer, BusEventProducer, listen decorator; backend-agnostic pub/sub
  • Dead Letter QueueAbstractDeadLetterQueue, InMemoryDeadLetterQueue; failed events routed after retry exhaustion
  • Transactional OutboxOutboxEntry, OutboxRepository, OutboxRelay; at-least-once event delivery via DB-backed outbox
  • Resilience@retry with exponential back-off, CircuitBreaker (CLOSED / OPEN / HALF_OPEN), @timeout; composable on any sync/async callable
  • Async cacheInMemoryCache, LayeredCache, CachedService; pluggable invalidation strategies (TTL, Explicit, Tagged, EventDriven, Composite)
  • ValidationValidator protocol, DomainModelValidator, CompositeValidator, ValidationResult; collect-all business-invariant validation
  • SerializationSerializer protocol, JsonSerializer (Pydantic-backed), NoOpSerializer; pluggable ser/deser for bus and cache backends
  • Tracingcorrelation_context, current_correlation_id, CorrelationIdFilter
  • DI-ready — all service classes are designed for constructor injection via providify

What's in the package

Module Purpose
model.py DomainModel, AuditedDomainModel, VersionedDomainModel, SoftDeleteMixin, TenantMixin and derived classes
meta.py FieldHint, ForeignKey, PrimaryKey, PKStrategy, constraints, pk_field()
mapper.py AbstractMapper — bidirectional ORM ↔ domain translation
repository.py AsyncRepository ABC — CRUD + exists() + stream_by_query()
uow.py AsyncUnitOfWork ABC
registry.py DomainModelRegistry + @register decorator
providers.py RepositoryProvider ABC
assembler.py AbstractDTOAssembler[D, C, R, U]
service/base.py AsyncService, IUoWProvider
service/tenant.py TenantAwareService, TenantUoWProvider, tenant_context
service/soft_delete.py SoftDeleteService
service/types.py Assembler alias, ServiceProtocol
auth/ AbstractAuthorizer, Action, AuthContext, ResourceGrant
auth/helpers.py GrantBasedAuthorizer, OwnershipAuthorizer, RoleBasedAuthorizer
exception/ FastrestErrorCodes, ErrorCode, ErrorMessage, HTTP error mapping
tracing.py correlation_context, current_correlation_id, CorrelationIdFilter
query/ QueryBuilder, QueryParams, QueryParser, AST visitors
jwt/ JwtBuilder, JwtParser, JwtUtil, JsonWebToken
jwk/ JwkBuilder, JsonWebKey, JsonWebKeySet
authority/ JwtAuthority, MultiKeyAuthority, TrustedIssuerRegistry, OIDC/PEM sources
event/ AbstractEventBus, EventConsumer, BusEventProducer, listen, ChannelManager
event/dlq.py AbstractDeadLetterQueue, InMemoryDeadLetterQueue, DeadLetterEntry
service/outbox.py OutboxEntry, OutboxRepository, OutboxRelay
service/validation.py ValidatorServiceMixin — DI-injected validator hook into AsyncService
resilience/ retry, RetryPolicy, CircuitBreaker, CircuitBreakerConfig, timeout, CallTimeoutError, RetryExhaustedError, CircuitOpenError
cache/ AsyncCache, CacheBackend, InMemoryCache, NoOpCache, LayeredCache, CachedService, all invalidation strategies
validation.py Validator, DomainModelValidator, CompositeValidator, ValidationResult, ValidationError
serialization.py Serializer, JsonSerializer, NoOpSerializer

Quick start

Define a domain model

from __future__ import annotations
from typing import Annotated
from varco_core import AuditedDomainModel
from varco_core.meta import FieldHint, PrimaryKey, PKStrategy, pk_field

class Post(AuditedDomainModel):
    pk: Annotated[int, PrimaryKey(PKStrategy.INT_AUTO)] = pk_field()
    title: Annotated[str, FieldHint(max_length=200)]
    body: str
    published: bool = False

Wire a service with DI

from varco_core import AsyncService, IUoWProvider
from varco_core.assembler import AbstractDTOAssembler
from varco_core.auth import AbstractAuthorizer
from providify import Inject, Singleton

@Singleton
class PostService(AsyncService[Post, int, CreatePostDTO, PostReadDTO, UpdatePostDTO]):
    def __init__(
        self,
        uow_provider: Inject[IUoWProvider],
        authorizer:   Inject[AbstractAuthorizer],
        assembler:    Inject[AbstractDTOAssembler[Post, CreatePostDTO, PostReadDTO, UpdatePostDTO]],
    ) -> None:
        super().__init__(uow_provider=uow_provider, authorizer=authorizer, assembler=assembler)

    def _get_repo(self, uow): return uow.posts

Build and run a query

from varco_core import QueryBuilder, QueryParams

params = QueryParams(
    node=QueryBuilder().eq("published", True).and_().gt("pk", 10).build(),
    limit=20,
    offset=0,
)

# Parse from a query string (e.g. from a URL parameter)
from varco_core import QueryParser
params = QueryParser().parse('published == true AND pk > 10', limit=20)

JWT — build and verify tokens

from varco_core import JwtBuilder, JwtParser

token = JwtBuilder(secret="s3cr3t").subject("user-42").expires_in(3600).build()
payload = JwtParser(secret="s3cr3t").parse(token)

Cache — in-memory with TTL

from varco_core.cache import InMemoryCache, TTLStrategy

async with InMemoryCache(strategy=TTLStrategy(default_ttl=300)) as cache:
    await cache.set("user:42", {"name": "Alice"})
    result = await cache.get("user:42")   # returns dict; None after 300 s
    await cache.delete("user:42")

Cache — layered L1 (memory) + L2 (Redis)

from varco_core.cache import InMemoryCache, LayeredCache, TTLStrategy
from varco_redis.cache import RedisCache, RedisCacheSettings

l1 = InMemoryCache(strategy=TTLStrategy(60))
l2 = RedisCache(RedisCacheSettings(url="redis://localhost:6379/0"))

async with LayeredCache(l1, l2, promote_ttl=60) as cache:
    await cache.set("product:1", product_dict, ttl=300)
    # First read fetches from L2 Redis and promotes to L1
    result = await cache.get("product:1")
    # Subsequent reads are served from L1 (in-process, zero network)

Cache — @cached decorator (any async function or method)

from varco_core.cache import cached, InMemoryCache, LayeredCache, TTLStrategy
from varco_redis.cache import RedisCache, RedisCacheSettings

# Declare a module-level cache (started at app startup)
_cache = LayeredCache(
    InMemoryCache(strategy=TTLStrategy(60)),
    RedisCache(RedisCacheSettings(key_prefix="posts:")),
    promote_ttl=60,
)

@cached(_cache, ttl=300, namespace="posts")
async def get_post(post_id: int) -> dict:
    return await db.fetch_post(post_id)  # only called on a cache miss

post = await get_post(42)     # miss → hits DB → cached
post = await get_post(42)     # hit  → served from cache

# Evict a specific entry:
await get_post.invalidate(42)

# Evict all entries in this cache:
await get_post.invalidate_all()

For instance methods, pass a factory callable:

class PostRepository:
    def __init__(self, cache: CacheBackend) -> None:
        self._cache = cache

    @cached(lambda self: self._cache, ttl=120, namespace="posts")
    async def find_by_id(self, post_id: int) -> dict | None:
        ...

Cache — CacheServiceMixin (look-aside built into the service)

Compose caching into your AsyncService via MRO — no wrapper needed, all methods remain visible and IDE-discoverable.

_cache and _cache_bus are declared as ClassVar[Inject[T]] so the providify container injects them automatically — no extra __init__ parameters required:

from varco_core import AsyncService, CacheServiceMixin
from providify import Inject, Singleton

@Singleton
class PostService(
    CacheServiceMixin[Post, int, CreatePostDTO, PostReadDTO, UpdatePostDTO],
    AsyncService[Post, int, CreatePostDTO, PostReadDTO, UpdatePostDTO],
):
    _cache_namespace = "post"   # unique key prefix
    _cache_ttl = 300             # seconds

    def __init__(
        self,
        uow_provider: Inject[IUoWProvider],
        authorizer:   Inject[AbstractAuthorizer],
        assembler:    Inject[AbstractDTOAssembler[Post, ...]],
        # No cache parameter — resolved via ClassVar[Inject[CacheBackend]]
    ) -> None:
        super().__init__(uow_provider=uow_provider, authorizer=authorizer, assembler=assembler)

    def _get_repo(self, uow): return uow.posts

# Register the cache backend once — all CacheServiceMixin subclasses share it:
from varco_redis.cache import RedisCacheConfiguration
container = DIContainer()
await container.ainstall(RedisCacheConfiguration)  # binds CacheBackend

# Usage is identical to a plain PostService — caching is transparent:
post = await post_service.get(42, ctx)            # look-aside (ReadDTO cached)
posts = await post_service.list(params, ctx)      # cached by params hash
await post_service.update(42, dto, ctx)           # evicts get key + list

Override the qualifier or injection strategy on specific services:

from typing import Annotated, ClassVar
from providify import InjectMeta, LiveMeta

class PostService(CacheServiceMixin, AsyncService[Post, ...]):
    _cache_namespace = "post"
    # Use a specific qualified backend (e.g. "layered"):
    _cache: ClassVar[Annotated[CacheBackend, InjectMeta(qualifier="layered")]]

class SessionService(CacheServiceMixin, AsyncService[Session, ...]):
    _cache_namespace = "session"
    # Re-resolve on every call for a request-scoped cache:
    _cache: ClassVar[Annotated[CacheBackend, LiveMeta()]]

Combine with SoftDeleteService or TenantAwareService via MRO — caching wraps all cross-cutting checks transparently:

class PostService(
    CacheServiceMixin,
    TenantAwareService,
    SoftDeleteService,
    AsyncService[Post, ...],
):
    _cache_namespace = "post"
    _cache_ttl = 60
    ...

Cache — cross-process invalidation via event bus

from varco_core.cache import InMemoryCache, LayeredCache, TTLStrategy
from varco_redis.cache import RedisCache, RedisCacheSettings
from varco_redis import RedisEventBus, RedisEventBusSettings

bus_settings = RedisEventBusSettings(url="redis://localhost:6379/0")
cache_settings = RedisCacheSettings(url="redis://localhost:6379/0")

async with (
    RedisEventBus(bus_settings) as bus,
    LayeredCache(
        InMemoryCache(strategy=TTLStrategy(60)),
        RedisCache(cache_settings),
        promote_ttl=60,
    ) as cache,
):
    class PostService(CacheServiceMixin, AsyncService[Post, ...]):
        _cache_namespace = "post"
        _cache_bus_channel = "posts.cache.invalidations"

        def __init__(self, ..., cache, bus):
            super().__init__(...)
            self._cache = cache
            self._cache_bus = bus  # publishes CacheInvalidated on every mutation

    # All instances sharing this bus evict their L1 caches automatically.
    await post_service.update(42, dto, ctx)

Cache — composable invalidation strategies

from varco_core.cache import (
    InMemoryCache, CompositeStrategy, TTLStrategy, ExplicitStrategy, TaggedStrategy
)

strategy = CompositeStrategy(
    TTLStrategy(300),         # evict after 5 minutes
    ExplicitStrategy(),       # evict on demand
    TaggedStrategy(),         # evict by tag
)

async with InMemoryCache(strategy=strategy) as cache:
    await cache.set("user:1", user, tags={"users", "tenant:acme"})

    # Evict a single key
    strategy.strategies[1].invalidate("user:1")

    # Evict everything tagged "tenant:acme"
    strategy.strategies[2].invalidate_tag("tenant:acme")

Resilience — retry, circuit breaker, timeout

Composable decorators for any sync or async callable:

from varco_core.resilience import retry, RetryPolicy, CircuitBreaker, CircuitBreakerConfig, timeout

# Retry with exponential back-off + jitter
@retry(RetryPolicy(max_attempts=3, base_delay=0.5, retryable_on=(ConnectionError,)))
async def call_payment_api(payload: dict) -> Receipt:
    return await http_client.post("/charge", json=payload)

# Timeout (async only)
@timeout(10.0)
async def fetch_user(user_id: int) -> User:
    return await db.get(User, user_id)

# Circuit breaker — share ONE instance per external dependency
payment_breaker = CircuitBreaker(
    CircuitBreakerConfig(failure_threshold=5, recovery_timeout=30.0),
    name="payments",
)

@payment_breaker.protect
async def charge(amount: float) -> str:
    return await _charge(amount)

# Compose all three (bottom-to-top execution order):
@timeout(10.0)
@retry(RetryPolicy(max_attempts=3, base_delay=0.5))
@circuit_breaker(CircuitBreakerConfig(failure_threshold=5))
async def resilient_call() -> Response: ...

Dead Letter Queue (DLQ)

Route events that exhausted all retry attempts to a DLQ for inspection or replay:

from varco_core.event import EventConsumer, listen
from varco_core.event.dlq import InMemoryDeadLetterQueue, DeadLetterEntry
from varco_core.resilience import RetryPolicy

dlq = InMemoryDeadLetterQueue(max_size=10_000)  # swap for KafkaDLQ / RedisDLQ in production

class OrderConsumer(EventConsumer):
    @PostConstruct
    def _setup(self) -> None:
        self.register_to(self._bus)

    @listen(
        OrderPlacedEvent,
        channel="orders",
        retry_policy=RetryPolicy(max_attempts=3, base_delay=1.0),
        dlq=dlq,  # routed here after 3 failures
    )
    async def on_order(self, event: OrderPlacedEvent) -> None:
        await self._fulfillment.process(event)

# Replay or inspect later
entries: list[DeadLetterEntry] = await dlq.pop_batch(limit=50)
for entry in entries:
    print(entry.handler_name, entry.error_type, entry.attempts)
    await dlq.ack(entry.entry_id)

Transactional Outbox — guaranteed at-least-once delivery

Persist events in the same DB transaction as your domain entity, then relay asynchronously:

from varco_core.service.outbox import OutboxEntry, OutboxRelay

# 1. In your service — save event in the same transaction as the entity
async with uow:
    order = await repo.save(Order(...))
    await outbox_repo.save(OutboxEntry.from_event(
        OrderPlacedEvent(order_id=order.pk), channel="orders"
    ))

# 2. At app startup — start the relay background task
relay = OutboxRelay(outbox=outbox_repo, bus=bus, poll_interval=1.0)
await relay.start()

# 3. At shutdown
await relay.stop()

OutboxRelay polls the outbox, publishes each entry to the bus, and deletes it on success. If the broker is down, entries accumulate until it recovers — no events are lost.

Validation — collect-all business invariants

from dataclasses import dataclass
from varco_core.model import DomainModel
from varco_core.validation import DomainModelValidator, CompositeValidator, ValidationResult

@dataclass(kw_only=True)
class Order(DomainModel):
    quantity: int
    total: float

class OrderValidator(DomainModelValidator[Order]):
    def validate(self, value: Order) -> ValidationResult:
        result = ValidationResult.ok()
        result += self._rule(value.quantity <= 0, "quantity must be positive", field="quantity")
        result += self._rule(value.total < 0, "total must be non-negative", field="total")
        return result

# All errors collected before raising — user sees the full list at once
validator = CompositeValidator(OrderValidator(), AnotherValidator())
result = validator.validate(order)
result.raise_if_invalid()  # raises ServiceValidationError with all errors joined

# Inject into a service via ValidatorServiceMixin:
from varco_core.service.validation import ValidatorServiceMixin

class OrderService(
    ValidatorServiceMixin[Order, int, CreateOrderDTO, OrderReadDTO, UpdateOrderDTO],
    AsyncService[Order, int, CreateOrderDTO, OrderReadDTO, UpdateOrderDTO],
):
    def _get_repo(self, uow): return uow.orders

Serialization — pluggable ser/deser

from varco_core.serialization import JsonSerializer, NoOpSerializer

s = JsonSerializer()
data: bytes = s.serialize({"key": "value"})                  # → UTF-8 JSON bytes
back: dict   = s.deserialize(data)                            # → raw Python dict
model        = s.deserialize(data, type_hint=MyPydanticModel) # → validated model

# Pass-through for pre-serialized bytes
noop = NoOpSerializer()
assert noop.serialize(b"raw") is b"raw"

Related packages

Package Description
varco-sa SQLAlchemy async backend
varco-beanie Beanie / Motor MongoDB backend

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_core-0.1.0.tar.gz (569.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_core-0.1.0-py3-none-any.whl (528.1 kB view details)

Uploaded Python 3

File details

Details for the file varco_core-0.1.0.tar.gz.

File metadata

  • Download URL: varco_core-0.1.0.tar.gz
  • Upload date:
  • Size: 569.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 1a1af028d45efa112bba9a6e705b3d232b3495cb4374ec04c54f703544d350d7
MD5 b09622ce6b09467f0e3d882fa3ffe276
BLAKE2b-256 f9895061d6a37aa5a43843817d7456772b1e65ee521e648e7898bad9af33c498

See more details on using hashes here.

File details

Details for the file varco_core-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: varco_core-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 528.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 547f88dc05415be7dec0b034d9d195cc2de2b185e82193c14d37816aca382539
MD5 11f228049c5f336b7f763568daa67e1e
BLAKE2b-256 380142da85e500cb2a75820bc89599f5f531a9cd39254c957ee4eda7937f0eb4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page