Skip to main content

Postgres-native durable execution + shared async DB/Redis primitives for the ForkTex ecosystem

Project description

forktex-core

PyPI Python License

The shared substrate every ForkTex Python service builds on — async Postgres, caching, durable execution, secure object storage, background queues, and a virtual-data layer for tenant-defined schemas. One library, six modules, one set of opinions.

forktex-core exists so individual services don't reinvent connection management, retry/replay scaffolding, signed-URL plumbing, or the umpteenth bespoke state machine. Each consuming service imports what it needs and inherits the same operational shape.

Install

pip install forktex-core

PostgreSQL (asyncpg + SQLAlchemy 2) and Redis (hiredis) are bundled defaults — no optional extras to remember. Requires Python 3.12+.

Architecture overview

forktex_core is a flat namespace of independent modules. Pull in only what you need; nothing imports cross-module unless explicitly composed.

Module Status Purpose
db 0.3 (currently psql, renaming) Async SQLAlchemy engine, session lifecycle, base ORM (BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin), CRUD helpers
cache 0.3 (currently redis, renaming) Async Redis client, @cached, namespaced keys, sliding-window rate limiter
flow shipping in 0.3.0 Postgres-native durable execution: @flow.workflow for code-defined linear flows, Graph for declarative state machines, signals, replay-on-resume, leader election
vault planned (0.4) Fernet-based symmetric encryption for credential blobs at rest, KEK (key-encryption-key) rotation flow, EncryptedJSON SQLAlchemy column type
storage planned (0.4) S3/MinIO-compatible object storage client + secure callback URLs (consumer-driven public-config mapping)
queue planned (0.4) Lightweight background-job queue (likely arq-backed) for fire-and-forget work that isn't durable
data planned (0.5) Generalised "virtual database" layer: dynamic registers / fields / relationships with two layers of abstraction — *Query / *Result business objects above the Postgres primitives

The first three are real today (with the db / cache renames in flight); the last four are the planned roadmap toward 1.0. The table is the source of truth — when in doubt, check what the module exports.

Module reference

db — async Postgres

(currently exported as forktex_core.psql while the rename lands; both will be importable through 0.4 for migration breathing room.)

from forktex_core.db import (
    init_engine, close_engine, get_session,
    BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin,
)

init_engine("postgresql+asyncpg://user:pass@localhost/db", pool_size=20)

async with get_session() as session:
    user = await session.get(User, user_id)

What's in the box:

  • Engine + session lifecycle — single init_engine / close_engine pair, get_session context manager that commits/rolls-back automatically.
  • Base model + mixinsBaseDBModel (UUID PK + timestamps), TimestampMixin (created_at/updated_at), AuditMixin (created_by/updated_by), OrgScopedMixin (org_id FK + tenant filter helper). All composable.
  • CRUD helpers — typed generic Repository[Model] with the usual list/get/create/update/delete + filter/sort/paginate built on SQLAlchemy 2's select() API.
  • Migration friendliness — every table the library defines lives in its own schema (e.g. forktex_flow.*) so the consumer's alembic only sees consumer-owned tables.

cache — Redis

(currently exported as forktex_core.redis; same parallel-import migration window as db.)

from forktex_core.cache import init, close, cached, rate_limit

await init("redis://localhost:6379/0")

@cached(ttl=300, namespace="user:profile")
async def get_profile(user_id: str): ...

ok, retry_after = await rate_limit("login", key=ip, limit=5, window_seconds=60)

What's in the box:

  • Connection poolinit / close / get_client, hiredis-backed.
  • @cached(ttl=…) — transparent decorator caching for any async function with serialisable args.
  • Typed opsget_json, set_json, incr, mget, delete_pattern (cluster-safe).
  • NamespacesNamespace("user.profile").key(user_id) helper so cross-service key collisions stay impossible.
  • Sliding-window rate limiter — production-tested limiter for auth middleware and similar hot paths.

flow — durable execution

Postgres-native workflows that survive crashes, replay-on-resume from a checkpoint log, and elect one leader at a time across N workers via advisory locks. Schema-isolated under forktex_flow.* so the consumer's alembic never sees these tables.

The library ships two complementary modes sharing one runtime substrate:

from forktex_core.flow import Flow, FlowContext, Graph

flow = Flow(database_url="postgresql+asyncpg://user:pass@host/db")

# 1. Code-defined linear workflow — e.g. a deploy pipeline
@flow.step
async def provision_resource(ctx: FlowContext, manifest: dict) -> str:
    return await provider.create(...)

@flow.workflow(name="example.deploy", version=1)
async def deploy(ctx: FlowContext, manifest: dict) -> dict:
    resource_id = await provision_resource(ctx, manifest)
    return {"resource_id": resource_id}

# 2. Declarative state-machine graph — e.g. a tenant-defined blueprint
g = Graph(name="user.onboarding", version=1, initial="email_pending")

@g.state("email_pending", manual=True)
async def email_pending(ctx, payload):
    await send_verification_email_step(ctx, payload["email"])
    return payload

@g.state("verified", terminal=True)
async def verified(ctx, payload): return payload

g.transition("email_pending", "verified")
flow.register_graph(g)

# Submit a run; advance a manual state externally:
run_id = await flow.start("user.onboarding", input={"email": "..."})
await flow.send_signal(run_id, "advance", payload={"code": "abc"})

Why two modes, one library? Some workflows (e.g. a deploy pipeline) are naturally linear, code-defined, and benefit from Python control flow — @flow.workflow is the right shape. Other flows are declarative state machines that end-users CRUD via API, then the runtime walks them — Graph is the right shape. They share runtime infrastructure: every graph is, internally, a synthesised @flow.workflow whose body walks the state machine, so driver / leader election / replay / observability / introspection apply unchanged.

Versioning gate — every workflow body is AST-hashed at registration. The shipped forktex-flow audit ENTRYPOINT CLI is a CI command that fails when a workflow body changed without a version= bump — because replay determinism for in-flight runs would silently break otherwise. Run with --update after a conscious version bump to refresh the manifest.

See src/forktex_core/flow/__init__.py for the full public surface and the in-repo design notes for rationale, schema, and the extension Protocol consumers use to graft tenant/RBAC concerns onto runs without forking the library.

vault — secret encryption + KEK rotation

(planned for 0.4)

The minimal contract:

from forktex_core.vault import Vault, EncryptedJSON

# One process-wide Vault, configured with the platform-wide Fernet KEK.
# (KEK = key-encryption-key — encrypts the blobs, NOT a tenant secret.
#  Leaking it still requires DB access to use; rotation is supported.)
vault = Vault.from_key(settings.master_key)

# Encrypt / decrypt arbitrary JSON-serialisable payloads:
token = vault.encrypt({"provider_token": "...", "ssh_key_id": 12345})
payload = vault.decrypt(token)

# Or use the SQLAlchemy column type for transparent at-rest encryption —
# reads decrypt automatically, writes encrypt automatically:
class OrgProviderCredential(BaseDBModel):
    __tablename__ = "org_provider_credential"
    payload: Mapped[dict] = mapped_column(EncryptedJSON(vault))
    # ... rest of the model

# Generate a fresh Fernet key (e.g. for ops bootstrapping a new env):
new_key = Vault.generate_key()  # base64-encoded 32-byte key

KEK rotation is a first-class flow:

# When ops rotate the master key: re-encrypt every blob against the new
# Vault, then atomically swap the env var. The old Vault stays usable
# until every consumer process restarts with the new key.
old_vault = Vault.from_key(old_master_key)
new_vault = Vault.from_key(new_master_key)

await rotate_blobs(
    session,
    table=OrgProviderCredential,
    column="payload",
    old=old_vault,
    new=new_vault,
)

What it solves: every service that stores credentials (per-org provider tokens, per-tenant SMTP / API keys, per-pipeline upstream creds) ends up reinventing the same Fernet wrapper, the same "decrypt error means rotated KEK" diagnostic, and the same rotation script. vault packages those once.

Decision boundaries:

  • Symmetric only — Fernet under the hood. No asymmetric / public-key crypto here; that's a different problem with different threat models.
  • At-rest encryption, not transport — for transport-level secrecy, terminate TLS at the edge.
  • Single KEK by default — per-tenant key derivation (DEK-per-org wrapped by KEK) is a v2 concern; today's threat model in the ForkTex products is well-served by one platform-wide KEK plus row-level access control.

storage — object storage + signed URLs

(planned for 0.4)

The minimal contract:

from forktex_core.storage import StorageClient, PublicAccess

storage = StorageClient(endpoint="s3://...", credentials=...)

# Upload from a service:
await storage.put("invoices/2026-04/INV-123.pdf", body, content_type="application/pdf")

# Mint a time-limited signed URL for a callback or a tenant download:
url = await storage.signed_url(
    "invoices/2026-04/INV-123.pdf",
    access=PublicAccess.read,
    ttl_seconds=600,
)

What it solves: every service that ships file uploads/downloads ends up reinventing the same five things — bucket layout, content-type sniffing, secure short-lived URLs, public-vs-private access policy, and a callback path that the consumer's public config (CDN host, custom domain) can override. storage packages those once, with the public-link mapping driven by consumer-side configuration so the same code works against MinIO in dev and S3 + CloudFront in prod.

S3-compatible by default (MinIO + AWS S3); GCS / Azure Blob behind the same StorageClient interface as a follow-up.

queue — background jobs

(planned for 0.4)

For fire-and-forget work that doesn't need flow's durability guarantees: send-an-email, refresh-a-cache, kick-off-a-report. Likely arq-backed (Redis-native, async-first, lighter than Celery), wrapped in a thin opinionated client so consumers get the same enqueue / process ergonomics across services.

Decision boundary vs. flow:

  • flow: must survive crashes, must replay-on-resume, has steps with retry policies, lives for minutes-to-hours, observability matters.
  • queue: short-lived, idempotent, "best-effort" fire-and-forget; if a worker dies mid-job, redelivery is cheap and the job runs again.

If you're unsure which you need, you probably need flow.

data — virtual schemas + Query/Result abstraction

(planned for 0.5)

A common need across ForkTex services is a layer for tenant-defined data: registers (logical tables), fields (logical columns), relationships, all CRUD-able by end users at runtime. Flexible business-process tools pivot on this every day.

data lifts that pattern to two clean abstraction levels:

  1. Storage layer (Postgres-backed, leans on db): the dynamic-schema mechanics — register definitions, field definitions, relationship graph, JSONB-backed row storage with per-field type coercion, indexable derived columns.
  2. Business-logic layer: composable *Query objects (filtering / sorting / paginating-or-scrolling / soft-archiving / hard-deleting / search / faceting) returning typed *Result objects. The same *Query shape applies whether the underlying register is Postgres-stored, virtualised over an external API, or computed on the fly — so consumers don't have to know which.
# Aspirational sketch — final API will be informed by the extraction.
from forktex_core.data import Register, RowQuery

invoices = Register.load("invoices", tenant_id=...)
result = await (
    RowQuery(invoices)
    .where(status="paid")
    .since(date(2026, 1, 1))
    .order_by("-issued_at")
    .scroll(page_size=50)
)

Why this lives in core rather than each consumer rolling its own: every B2B product reaches a point where end users want to add a field. Once you've solved that once, you don't want to solve it three more times in three more services.

Configuration

forktex-core does not read environment variables directly. Connection strings, credentials, KEKs, and pool settings are passed in by the host service at init_engine / init / Flow(...) / Vault.from_key(...) / StorageClient(...) construction time. Each consuming service decides its own configuration source (env vars, secrets manager, manifest, etc.).

Development

The root Makefile is generated by forktex fsd makefile sync from forktex.json — do not hand-edit.

make help              # list every available target
make deps              # editable install
make format            # ruff format
make lint              # ruff check
make test              # pytest tests/
make build             # python3 -m build → dist/
make ci                # format-check + lint + license-check + audit + test + build
make clean             # remove caches and dist/

make ci is the single command that gates a publish: format-check, lint, dual-license header verification across every source file, dependency CVE audit, full pytest sweep (uses Postgres testcontainers for flow), wheel + sdist build, and twine check.

make start / make stop / make logs are intentional no-ops — forktex-core is a library, there is nothing to run.

License headers

Every source file carries the AGPL-3.0 + Commercial dual-license SPDX header, applied idempotently via:

make license-check    # CI gate — fails if any source file is missing the header
make license-fix      # add or refresh headers across src/, tests/, scripts/
make license-strip    # remove headers (used before license-model changes)

FSD Self-Description

This repo follows the ForkTex Standard Delivery (FSD) shape:

  • root manifest: forktex.json
  • profile: workspace/python-monorepo
  • target maturity: L3
  • single publishable package: forktex-core at path .

Re-validate locally with:

forktex fsd --project-dir . check
forktex arch discover --project-dir . --output-dir /tmp/arch-corepy

License

Dual-licensed — AGPL-3.0-or-later for open-source use, commercial for everything else (proprietary products, SaaS without source release, redistribution in closed-source form). See LICENSE and NOTICE for the full terms.

Commercial licensing inquiries: info@forktex.com.

The 1.0.0 release on PyPI remains under MIT; from 0.2.3 onwards the package ships AGPL-3.0+Commercial. 0.3.0 adds forktex_core.flow (durable execution + state-machine graphs) and begins the psql → db / redis → cache rename.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

forktex_core-0.3.2.tar.gz (60.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

forktex_core-0.3.2-py3-none-any.whl (83.9 kB view details)

Uploaded Python 3

File details

Details for the file forktex_core-0.3.2.tar.gz.

File metadata

  • Download URL: forktex_core-0.3.2.tar.gz
  • Upload date:
  • Size: 60.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for forktex_core-0.3.2.tar.gz
Algorithm Hash digest
SHA256 92aa38aa1f1cd9f28e6266a4e2383c66b1174757045b9816d983c93f2a41d00a
MD5 89c0775bf073b5086b1cf52a03e9a2dd
BLAKE2b-256 c38d315b80cc631d692df51124cc36bb1c2cdca23129b229c90dec0d01f036a8

See more details on using hashes here.

File details

Details for the file forktex_core-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: forktex_core-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 83.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for forktex_core-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e8bd86c521dcfdd6701be857f613d088d64208e0cfa8c985e613fa59dac7bc7f
MD5 2832ad0bd0f1eb2ee5c5272d0c16f0e3
BLAKE2b-256 57640ee4bf45077213a525b3bc9f304670d68eb3805c89932551e66f5ef32297

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page