Skip to main content

Loom Python project

Project description

loom-kernel

loom-kernel

CI Docs Quality Gate Status Security Rating Vulnerabilities Coverage PyPI Python

Framework-agnostic Python toolkit to build backend applications with:

  • Auto-CRUD — full REST surface from a model declaration, two lines of code
  • typed use cases (msgspec.Struct) with rules, computes, and dependency injection
  • repositories decoupled from infrastructure
  • REST/FastAPI adapters with OpenAPI generation
  • background jobs and Celery workers, first-class
  • testing utilities for business workflows

Purpose

loom-kernel helps you ship production APIs faster without sacrificing clean architecture. Declare your domain model, describe your business rules, and let the framework handle the infrastructure plumbing — DB wiring, DI, routing, serialization.

The library separates core contracts from concrete adapters so you can swap infrastructure (DB, cache, transport) without breaking business logic.

Documentation

  • Usage guides and architecture docs are available in the docs/ site.
  • API reference is autogenerated from public docstrings.
  • End-to-end REST demo: dummy-loom.
  • End-to-end ETL demo: dummy-loom-etl — full Polars and Spark pipeline examples.

Main subpaths

Subpath What it is for
src/loom/core/use_case UseCase definition, rules (Rule), and compute steps (Compute).
src/loom/core/engine Compilation and runtime execution of a use-case plan.
src/loom/core/repository/abc Repository contracts, pagination, and typed query spec.
src/loom/core/repository/sqlalchemy Concrete async SQLAlchemy repository implementation.
src/loom/core/model Base model, fields, relations, and entity introspection.
src/loom/core/cache Decorators and cached repository with dependency invalidation.
src/loom/core/config YAML config loader with cloud URI support and pluggable resolvers.
src/loom/rest Framework-agnostic REST model and route compiler.
src/loom/rest/fastapi Direct FastAPI integration (auto wiring and runtime router).
src/loom/prometheus Middleware and adapter for runtime metrics.
src/loom/testing Harnesses for unit/integration tests and golden tests.
src/loom/etl Declarative ETL subsystem — pipelines, Polars/Spark backends, observability.
src/loom/etl/pipeline ETLStep, ETLProcess, ETLPipeline, StepSQL, and ETLParams.
src/loom/etl/declarative FromTable, FromFile, IntoTable, IntoFile, predicate DSL, and params proxy.
src/loom/etl/schema Backend-agnostic schema model (LoomDtype, ColumnSchema, TableRef).
src/loom/etl/storage Storage config, table/file locators, and route resolution.
src/loom/etl/compiler Compile-time validation and execution plan builder.
src/loom/etl/runner ETLRunner entry point and YAML config loader.
src/loom/etl/checkpoint Step-level checkpoint store for incremental re-runs.
src/loom/etl/observability Run/step observers, OTEL sink, structlog sink, and execution records.
src/loom/etl/backends/polars Polars + Delta Lake reader, writer, and schema aligner.
src/loom/etl/backends/spark Spark + Delta reader, writer, and schema aligner.
src/loom/etl/testing PolarsStepRunner, SparkStepRunner, ETLScenario, and test stubs.

Quick start

1. Define your model

from loom.core.model import ColumnField, OnDelete, TimestampedModel

class User(TimestampedModel):
    __tablename__ = "users"
    id: int = ColumnField(primary_key=True, autoincrement=True)
    full_name: str = ColumnField(length=120)
    email: str = ColumnField(length=255, unique=True, index=True)

class Address(TimestampedModel):
    __tablename__ = "addresses"
    id: int = ColumnField(primary_key=True, autoincrement=True)
    user_id: int = ColumnField(foreign_key="users.id", on_delete=OnDelete.CASCADE, index=True)
    city: str = ColumnField(length=120)
    country: str = ColumnField(length=120)

2. Write use cases

Use cases declare their inputs and invariants declaratively. The engine resolves them before execute() runs.

import re
from loom.core.command import Command, Patch
from loom.core.errors import NotFound
from loom.core.use_case import Exists, F, Input, LoadById, OnMissing, Rule
from loom.core.use_case.use_case import UseCase

_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")

class CreateUser(Command, frozen=True):
    full_name: str
    email: str

class UpdateUser(Command, frozen=True):
    full_name: Patch[str] = None
    email: Patch[str] = None

def _name_must_not_be_blank(full_name: str) -> str | None:
    return None if full_name.strip() else "full_name must not be blank"

def _email_must_be_valid(email: str) -> str | None:
    return None if _EMAIL_RE.fullmatch(email) else "email must be valid"

class CreateUserUseCase(UseCase[User, User]):
    rules = [
        Rule.check(F(CreateUser).full_name, via=_name_must_not_be_blank),
        Rule.check(F(CreateUser).email, via=_email_must_be_valid),
        Rule.forbid(lambda _, __, exists: exists, message="email already exists").from_params("email_exists"),
    ]

    async def execute(
        self,
        cmd: CreateUser = Input(),
        email_exists: bool = Exists(User, from_command="email", against="email"),
    ) -> User:
        return await self.main_repo.create(cmd)

class UpdateUserUseCase(UseCase[User, User | None]):
    rules = [Rule.check(F(UpdateUser).full_name, via=_name_must_not_be_blank).when_present(F(UpdateUser).full_name)]

    async def execute(
        self,
        user_id: int,
        cmd: UpdateUser = Input(),
        current_user: User = LoadById(User, by="user_id"),  # loaded automatically
    ) -> User | None:
        return await self.main_repo.update(user_id, cmd)

Exists checks a DB condition before execute runs — no boilerplate in the body. LoadById fetches an entity by a path/command parameter, available in rules and the body. Patch[T] marks a field as optional in partial updates; .when_present(...) gates rules on whether the field was sent.

3. Scope resources under a parent

Use from_param to guard nested routes (e.g. /users/{user_id}/addresses/{address_id}):

from loom.core.use_case import Exists, Input, OnMissing

class CreateAddressUseCase(UseCase[Address, Address]):
    async def execute(
        self,
        user_id: int,
        cmd: CreateUserAddress = Input(),
        _user_exists: bool = Exists(User, from_param="user_id", against="id", on_missing=OnMissing.RAISE),
    ) -> Address:
        return await self.main_repo.create(CreateAddressRecord(user_id=user_id, **cmd.__dict__))

OnMissing.RAISE returns a structured 404 automatically — no if in the body.

4. Structured queries

Build explicit queries without raw SQL:

from loom.core.repository.abc.query import (
    FilterGroup, FilterOp, FilterSpec, PageResult, PaginationMode, QuerySpec, SortSpec,
)

class ListLowStockProductsUseCase(UseCase[Product, PageResult[Product]]):
    async def execute(self, profile: str = "default") -> PageResult[Product]:
        query = QuerySpec(
            filters=FilterGroup(filters=(FilterSpec(field="stock", op=FilterOp.LTE, value=5),)),
            sort=(SortSpec(field="stock", direction="ASC"),),
            pagination=PaginationMode.OFFSET,
            limit=20,
            page=1,
        )
        result = await self.main_repo.list_with_query(query, profile=profile)
        if not isinstance(result, PageResult):
            raise RuntimeError("expected offset result")
        return result

5. Background jobs

Jobs are use-case-like executors that run in a queue. LoadById works the same way:

from loom.core.job.job import Job
from loom.core.use_case import Input, LoadById

class SendRestockEmailJob(Job[bool]):
    __queue__ = "notifications"

    async def execute(
        self,
        product_id: int,
        cmd: SendRestockEmailCommand = Input(),
        product: Product = LoadById(Product, by="product_id"),
    ) -> bool:
        if product.stock > 0:
            return False
        # send email to cmd.recipient_email ...
        return True

6. Dispatch jobs from use cases + callbacks

from loom.core.job.service import JobService
from loom.core.use_case.use_case import UseCase

class DispatchRestockEmailUseCase(UseCase[Product, DispatchRestockEmailResponse]):
    def __init__(self, job_service: JobService) -> None:
        self._jobs = job_service

    async def execute(self, product_id: str, cmd: DispatchRestockEmailCommand = Input()) -> DispatchRestockEmailResponse:
        handle = self._jobs.dispatch(
            SendRestockEmailJob,
            params={"product_id": int(product_id)},
            payload={"product_id": int(product_id), "recipient_email": cmd.recipient_email},
            on_success=RestockEmailSuccessCallback,
            on_failure=RestockEmailFailureCallback,
        )
        return DispatchRestockEmailResponse(job_id=handle.job_id, queue=handle.queue)

Callbacks are resolved by the DI container and receive the job result + context:

class RestockEmailSuccessCallback:
    def __init__(self, app: ApplicationInvoker) -> None:
        self._app = app

    async def on_success(self, job_id: str, result: Any, **context: Any) -> None:
        if not result:
            return
        entity = self._app.entity(Product)
        product = await entity.get(params={"id": context["product_id"]})
        if product:
            await entity.update(params={"id": product.id}, payload={"category": f"{product.category}-notified"})

7. Chain use cases (workflow pattern)

ApplicationInvoker lets a use case call another use case by type — no tight coupling:

from loom.core.use_case.invoker import ApplicationInvoker

class RestockWorkflowUseCase(UseCase[Product, RestockWorkflowResponse]):
    def __init__(self, app: ApplicationInvoker, job_service: JobService) -> None:
        self._app = app
        self._jobs = job_service

    async def execute(self, product_id: str, cmd: DispatchRestockEmailCommand = Input()) -> RestockWorkflowResponse:
        summary = await self._app.invoke(BuildProductSummaryUseCase, params={"product_id": int(product_id)})
        handle = self._jobs.dispatch(SendRestockEmailJob, params={"product_id": int(product_id)}, payload={...})
        return RestockWorkflowResponse(summary=summary.summary, restock_job_id=handle.job_id, queue=handle.queue)

8. Declare REST interfaces

from loom.rest.autocrud import build_auto_routes
from loom.rest.model import PaginationMode, RestInterface, RestRoute

class ProductRestInterface(RestInterface[Product]):
    prefix = "/products"
    tags = ("Products",)
    pagination_mode = PaginationMode.CURSOR
    routes = (
        RestRoute(use_case=ListLowStockProductsUseCase, method="GET", path="/low-stock",
                  summary="List low stock products"),
        RestRoute(use_case=DispatchRestockEmailUseCase, method="POST",
                  path="/{product_id}/jobs/restock-email", status_code=202,
                  summary="Dispatch restock email"),
        RestRoute(use_case=RestockWorkflowUseCase, method="POST",
                  path="/{product_id}/workflows/restock", status_code=202,
                  summary="Run restock workflow"),
        *build_auto_routes(Product, ()),  # adds GET, POST, PATCH, DELETE automatically
    )

Nested resource interfaces work the same way — routes mirror the URL hierarchy:

class AddressRestInterface(RestInterface[Address]):
    prefix = "/users"
    tags = ("UserAddresses",)
    routes = (
        RestRoute(use_case=CreateAddressUseCase, method="POST", path="/{user_id}/addresses/", status_code=201),
        RestRoute(use_case=ListAddressesUseCase, method="GET",  path="/{user_id}/addresses/"),
        RestRoute(use_case=GetAddressUseCase,    method="GET",  path="/{user_id}/addresses/{address_id}"),
        RestRoute(use_case=UpdateAddressUseCase, method="PATCH", path="/{user_id}/addresses/{address_id}"),
        RestRoute(use_case=DeleteAddressUseCase, method="DELETE", path="/{user_id}/addresses/{address_id}"),
    )

9. Bootstrap with YAML

The create_app() factory wires everything — DB, cache, DI, routes — from a YAML config:

# config/api.yaml
app:
  name: my_store
  code_path: src
  discovery:
    mode: modules
    modules:
      include:
        - app.user.model
        - app.user.interface
        - app.product.model
        - app.product.interface
  rest:
    backend: fastapi
    title: My Store API
    version: 0.1.0

database:
  url: ${oc.env:DATABASE_URL,sqlite+aiosqlite:///store.db}

metrics:
  enabled: false
# main.py — 3 lines
from loom.rest.fastapi.auto import create_app

app = create_app("config/api.yaml")

For larger projects, use mode: manifest and a manifest module:

# app/manifest.py
from app.user.model import User
from app.user.interface import UserRestInterface

MODELS = [User, ...]
INTERFACES = [UserRestInterface, ...]
discovery:
  mode: manifest
  manifest:
    module: app.manifest

10. Rules + Computes (advanced)

For compute-heavy write flows, declare field derivations and run them before rules:

from loom.core.use_case import Compute, F

class PricingPreviewUseCase(UseCase[Record, PricingPreviewResponse]):
    computes = (
        Compute.set(F(PricingCommand).normalized_email).from_command(
            F(PricingCommand).email, via=lambda v: v.strip().lower(),
        ),
        Compute.set(F(PricingCommand).subtotal).from_command(
            F(PricingCommand).unit_price, F(PricingCommand).quantity,
            via=lambda price, qty: price * qty,
        ),
        Compute.set(F(PricingCommand).tax_amount).from_command(
            F(PricingCommand).subtotal, F(PricingCommand).tax_rate,
            via=lambda sub, rate: sub * rate,
        ),
    )
    rules = (
        Rule.check(F(PricingCommand).unit_price, via=lambda v: v <= 0, message="unit_price must be > 0"),
        Rule.check(F(PricingCommand).country, via=lambda v: v not in TAX_RATES, message="Unsupported country"),
    )

    async def execute(self, record_id: int, cmd: PricingCommand = Input()) -> PricingPreviewResponse:
        ...

Computes run in declaration order — later computes can reference fields set by earlier ones.

For deeper references, review the integration examples under tests/integration/fake_repo. For a runnable full-stack sample with all patterns combined, check the companion repository dummy-loom.

Performance

loom-kernel adds zero measurable overhead at the concurrency levels typical of production REST APIs. The benchmark below compares loompy (loom-kernel + SQLAlchemy) against a hand-written FastAPI application hitting the same PostgreSQL database.

Methodology: 3 independent runs × 3 concurrency levels (20 / 100 / 300 workers), median RPS reported. Dataset: 1 200 records, 3 notes each. Infrastructure: each target runs in its own isolated Docker container with a dedicated PostgreSQL instance.

loompy vs hand-written FastAPI (median RPS, 3 repeats):

Scenario c=20 c=100 c=300
GET /:id with joins ≈ tied +8.9 % −11.7 %
LIST cursor −2.8 % +3.8 % −19.4 %
LIST offset + COUNT(*) ≈ tied +6.2 % −25.4 %
PATCH (UPDATE RETURNING) +2.1 % ≈ tied +12.7 %
GET /ping (no DB) ≈ tied −5.1 % +16.5 %

At the production sweet spot (moderate concurrency, c=100), loom-kernel matches or outperforms the baseline in 4 out of 5 scenarios — without a single line of hand-tuned SQL. The GET and LIST advantages come from the compiled single-pass SQL read path. The PATCH advantage at high concurrency comes from the UPDATE … RETURNING pattern (one round-trip vs three in a naive implementation).

The full benchmark suite is available in dummy-loom.

Status

Project under active development.


Author

Designed and built by Massive Data Scope.

For questions, feedback, or collaboration: massivedatascope@gmail.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

loom_kernel-0.3.0.tar.gz (1.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

loom_kernel-0.3.0-py3-none-any.whl (403.8 kB view details)

Uploaded Python 3

File details

Details for the file loom_kernel-0.3.0.tar.gz.

File metadata

  • Download URL: loom_kernel-0.3.0.tar.gz
  • Upload date:
  • Size: 1.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for loom_kernel-0.3.0.tar.gz
Algorithm Hash digest
SHA256 18fc7621645a88b08808e2dbf28c362ab854c2f30a026a34b412f973911388d7
MD5 365106b6cca1bb6b2363ad3e64262103
BLAKE2b-256 48815643cc988d3a81d34d67f5ad74d3d3a357ee88b57dd43b978d78d9579e27

See more details on using hashes here.

File details

Details for the file loom_kernel-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: loom_kernel-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 403.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for loom_kernel-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 31daa846377688ca1aef87aa3c8801bba2e678389a5543a3cd6c469d231369f9
MD5 38856bdf2b0de6863356c3d29bd48ecc
BLAKE2b-256 e6534b1e4cf30b2055d132257171f49d5c3a933433ea7cb0f69d5a59f4b87ccc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page