Postgres-native durable execution + shared async DB/Redis primitives for the ForkTex ecosystem
Project description
forktex-core
The shared substrate every ForkTex Python service builds on — async Postgres, caching, durable execution, secure object storage, background queues, and a virtual-data layer for tenant-defined schemas. One library, six modules, one set of opinions.
forktex-core exists so individual services don't reinvent connection management, retry/replay scaffolding, signed-URL plumbing, or the umpteenth bespoke state machine. Each consumer (forktex-cloud-api, forktex-network-api, forktex-intelligence-api, …) imports what it needs and inherits the same operational shape.
Install
pip install forktex-core
PostgreSQL (asyncpg + SQLAlchemy 2) and Redis (hiredis) are bundled defaults — no optional extras to remember. Requires Python 3.12+.
Architecture overview
forktex_core is a flat namespace of independent modules. Pull in only what you need; nothing imports cross-module unless explicitly composed.
| Module | Status | Purpose |
|---|---|---|
db |
0.3 (currently psql, renaming) |
Async SQLAlchemy engine, session lifecycle, base ORM (BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin), CRUD helpers |
cache |
0.3 (currently redis, renaming) |
Async Redis client, @cached, namespaced keys, sliding-window rate limiter |
flow |
shipping in 0.3.0 | Postgres-native durable execution: @flow.workflow for code-defined linear flows, Graph for declarative state machines, signals, replay-on-resume, leader election |
vault |
planned (0.4) | Fernet-based symmetric encryption for credential blobs at rest, KEK (key-encryption-key) rotation flow, EncryptedJSON SQLAlchemy column type — extracted from cloud's FORKTEX_CLOUD_MASTER_KEY plumbing |
storage |
planned (0.4) | S3/MinIO-compatible object storage client + secure callback URLs (consumer-driven public-config mapping) — extracted from network's existing minio plumbing |
queue |
planned (0.4) | Lightweight background-job queue (likely arq-backed) for fire-and-forget work that isn't durable — distilled from the sandbox-poc prototype |
data |
planned (0.5) | Generalised "virtual database" layer: dynamic registers / fields / relationships with two layers of abstraction — *Query / *Result business objects above the Postgres primitives — recycled and generalised from network's tenant-defined-schema engine |
The first three are real today (with the db / cache renames in flight); the last four are the planned roadmap toward 1.0. The table is the source of truth — when in doubt, check what the module exports.
Module reference
db — async Postgres
(currently exported as forktex_core.psql while the rename lands; both will be importable through 0.4 for migration breathing room.)
from forktex_core.db import (
init_engine, close_engine, get_session,
BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin,
)
init_engine("postgresql+asyncpg://user:pass@localhost/db", pool_size=20)
async with get_session() as session:
user = await session.get(User, user_id)
What's in the box:
- Engine + session lifecycle — single
init_engine/close_enginepair,get_sessioncontext manager that commits/rolls-back automatically. - Base model + mixins —
BaseDBModel(UUID PK + timestamps),TimestampMixin(created_at/updated_at),AuditMixin(created_by/updated_by),OrgScopedMixin(org_id FK + tenant filter helper). All composable. - CRUD helpers — typed generic
Repository[Model]with the usual list/get/create/update/delete + filter/sort/paginate built on SQLAlchemy 2'sselect()API. - Migration friendliness — every table the library defines lives in its own schema (e.g.
forktex_flow.*) so the consumer's alembic only sees consumer-owned tables.
cache — Redis
(currently exported as forktex_core.redis; same parallel-import migration window as db.)
from forktex_core.cache import init, close, cached, rate_limit
await init("redis://localhost:6379/0")
@cached(ttl=300, namespace="user:profile")
async def get_profile(user_id: str): ...
ok, retry_after = await rate_limit("login", key=ip, limit=5, window_seconds=60)
What's in the box:
- Connection pool —
init/close/get_client, hiredis-backed. @cached(ttl=…)— transparent decorator caching for any async function with serialisable args.- Typed ops —
get_json,set_json,incr,mget,delete_pattern(cluster-safe). - Namespaces —
Namespace("user.profile").key(user_id)helper so cross-service key collisions stay impossible. - Sliding-window rate limiter — production-tested limiter used by cloud's auth middleware.
flow — durable execution
Postgres-native workflows that survive crashes, replay-on-resume from a checkpoint log, and elect one leader at a time across N workers via advisory locks. Schema-isolated under forktex_flow.* so the consumer's alembic never sees these tables.
The library ships two complementary modes sharing one runtime substrate:
from forktex_core.flow import Flow, FlowContext, Graph
flow = Flow(database_url="postgresql+asyncpg://user:pass@host/db")
# 1. Code-defined linear workflow — cloud's deploy pipeline shape
@flow.step
async def provision_vps(ctx: FlowContext, manifest: dict) -> str:
return await hetzner.create_server(...)
@flow.workflow(name="cloud.deploy", version=1)
async def deploy(ctx: FlowContext, manifest: dict) -> dict:
server_id = await provision_vps(ctx, manifest)
return {"server_id": server_id}
# 2. Declarative state-machine graph — network's tenant-blueprint shape
g = Graph(name="user.onboarding", version=1, initial="email_pending")
@g.state("email_pending", manual=True)
async def email_pending(ctx, payload):
await send_verification_email_step(ctx, payload["email"])
return payload
@g.state("verified", terminal=True)
async def verified(ctx, payload): return payload
g.transition("email_pending", "verified")
flow.register_graph(g)
# Submit a run; advance a manual state externally:
run_id = await flow.start("user.onboarding", input={"email": "..."})
await flow.send_signal(run_id, "advance", payload={"code": "abc"})
Why two modes, one library? Cloud's deploy pipeline is naturally linear, code-defined, and benefits from Python control flow — @flow.workflow is the right shape. Network's tenant-facing flows are declarative state machines that end-users CRUD via API, then the runtime walks them — Graph is the right shape. They share runtime infrastructure: every graph is, internally, a synthesised @flow.workflow whose body walks the state machine, so driver / leader election / replay / observability / introspection apply unchanged.
Versioning gate — every workflow body is AST-hashed at registration. The shipped forktex-flow audit ENTRYPOINT CLI is a CI command that fails when a workflow body changed without a version= bump — because replay determinism for in-flight runs would silently break otherwise. Run with --update after a conscious version bump to refresh the manifest.
See src/forktex_core/flow/__init__.py for the full public surface and the in-repo design notes for rationale, schema, and the extension Protocol consumers use to graft tenant/RBAC concerns onto runs without forking the library.
vault — secret encryption + KEK rotation
(planned for 0.4 — extracted and generalised from cloud's FORKTEX_CLOUD_MASTER_KEY provider-credentials plumbing)
The minimal contract:
from forktex_core.vault import Vault, EncryptedJSON
# One process-wide Vault, configured with the platform-wide Fernet KEK.
# (KEK = key-encryption-key — encrypts the blobs, NOT a tenant secret.
# Leaking it still requires DB access to use; rotation is supported.)
vault = Vault.from_key(settings.master_key)
# Encrypt / decrypt arbitrary JSON-serialisable payloads:
token = vault.encrypt({"hetzner_token": "...", "ssh_key_id": 12345})
payload = vault.decrypt(token)
# Or use the SQLAlchemy column type for transparent at-rest encryption —
# reads decrypt automatically, writes encrypt automatically:
class OrgProviderCredential(BaseDBModel):
__tablename__ = "org_provider_credential"
payload: Mapped[dict] = mapped_column(EncryptedJSON(vault))
# ... rest of the model
# Generate a fresh Fernet key (e.g. for ops bootstrapping a new env):
new_key = Vault.generate_key() # base64-encoded 32-byte key
KEK rotation is a first-class flow:
# When ops rotate the master key: re-encrypt every blob against the new
# Vault, then atomically swap the env var. The old Vault stays usable
# until every consumer process restarts with the new key.
old_vault = Vault.from_key(old_master_key)
new_vault = Vault.from_key(new_master_key)
await rotate_blobs(
session,
table=OrgProviderCredential,
column="payload",
old=old_vault,
new=new_vault,
)
What it solves: every service that stores credentials (cloud's per-org Hetzner / Cloudflare tokens, network's per-tenant SMTP / API keys, intelligence's per-pipeline upstream creds) ends up reinventing the same Fernet wrapper, the same "decrypt error means rotated KEK" diagnostic, and the same rotation script. vault packages those once.
Decision boundaries:
- Symmetric only — Fernet under the hood. No asymmetric / public-key crypto here; that's a different problem with different threat models.
- At-rest encryption, not transport — for transport-level secrecy, terminate TLS at the edge.
- Single KEK by default — per-tenant key derivation (DEK-per-org wrapped by KEK) is a v2 concern; today's threat model in the ForkTex products is well-served by one platform-wide KEK plus row-level access control.
storage — object storage + signed URLs
(planned for 0.4 — extracted and generalised from forktex-network-api's minio plumbing)
The minimal contract:
from forktex_core.storage import StorageClient, PublicAccess
storage = StorageClient(endpoint="s3://...", credentials=...)
# Upload from a service:
await storage.put("invoices/2026-04/INV-123.pdf", body, content_type="application/pdf")
# Mint a time-limited signed URL for a callback or a tenant download:
url = await storage.signed_url(
"invoices/2026-04/INV-123.pdf",
access=PublicAccess.read,
ttl_seconds=600,
)
What it solves: every service that ships file uploads/downloads ends up reinventing the same five things — bucket layout, content-type sniffing, secure short-lived URLs, public-vs-private access policy, and a callback path that the consumer's public config (CDN host, custom domain) can override. storage packages those once, with the public-link mapping driven by consumer-side configuration so the same code works against MinIO in dev and S3 + CloudFront in prod.
S3-compatible by default (MinIO + AWS S3); GCS / Azure Blob behind the same StorageClient interface as a follow-up.
queue — background jobs
(planned for 0.4 — distilled from a sandbox-poc prototype)
For fire-and-forget work that doesn't need flow's durability guarantees: send-an-email, refresh-a-cache, kick-off-a-report. Likely arq-backed (Redis-native, async-first, lighter than Celery), wrapped in a thin opinionated client so consumers get the same enqueue / process ergonomics across services.
Decision boundary vs. flow:
flow: must survive crashes, must replay-on-resume, has steps with retry policies, lives for minutes-to-hours, observability matters.queue: short-lived, idempotent, "best-effort" fire-and-forget; if a worker dies mid-job, redelivery is cheap and the job runs again.
If you're unsure which you need, you probably need flow.
data — virtual schemas + Query/Result abstraction
(planned for 0.5 — generalised from network's "virtual DB" engine)
Network has a battle-tested layer for tenant-defined data: registers (logical tables), fields (logical columns), relationships, all CRUD-able by end users at runtime. The consuming product (a flexible business-process tool) pivots on this every day. That layer is too useful to keep trapped in one repo.
data lifts it to two clean abstraction levels:
- Storage layer (Postgres-backed, leans on
db): the dynamic-schema mechanics — register definitions, field definitions, relationship graph, JSONB-backed row storage with per-field type coercion, indexable derived columns. - Business-logic layer: composable
*Queryobjects (filtering / sorting / paginating-or-scrolling / soft-archiving / hard-deleting / search / faceting) returning typed*Resultobjects. The same*Queryshape applies whether the underlying register is Postgres-stored, virtualised over an external API, or computed on the fly — so consumers don't have to know which.
# Aspirational sketch — final API will be informed by extracting from network.
from forktex_core.data import Register, RowQuery
invoices = Register.load("invoices", tenant_id=...)
result = await (
RowQuery(invoices)
.where(status="paid")
.since(date(2026, 1, 1))
.order_by("-issued_at")
.scroll(page_size=50)
)
Why this lives in core rather than each consumer rolling its own: every B2B product reaches a point where end users want to add a field. Once you've solved that once, you don't want to solve it three more times in three more services.
Configuration
forktex-core does not read environment variables directly. Connection strings, credentials, KEKs, and pool settings are passed in by the host service at init_engine / init / Flow(...) / Vault.from_key(...) / StorageClient(...) construction time. Each consuming service decides its own configuration source (env vars, secrets manager, manifest, etc.).
Development
The root Makefile is generated by forktex fsd makefile sync from forktex.json — do not hand-edit.
make help # list every available target
make deps # editable install
make format # ruff format
make lint # ruff check
make test # pytest tests/
make build # python3 -m build → dist/
make ci # format-check + lint + license-check + audit + test + build
make clean # remove caches and dist/
make ci is the single command that gates a publish: format-check, lint, dual-license header verification across every source file, dependency CVE audit, full pytest sweep (uses Postgres testcontainers for flow), wheel + sdist build, and twine check.
make start / make stop / make logs are intentional no-ops — forktex-core is a library, there is nothing to run.
License headers
Every source file carries the AGPL-3.0 + Commercial dual-license SPDX header, applied idempotently via:
make license-check # CI gate — fails if any source file is missing the header
make license-fix # add or refresh headers across src/, tests/, scripts/
make license-strip # remove headers (used before license-model changes)
FSD Self-Description
This repo follows the ForkTex Standard Delivery (FSD) shape:
- root manifest:
forktex.json - profile:
workspace/python-monorepo - target maturity:
L3 - single publishable package:
forktex-coreat path.
Re-validate locally with:
forktex fsd --project-dir . check
forktex arch discover --project-dir . --output-dir /tmp/arch-corepy
License
Dual-licensed — AGPL-3.0-or-later for open-source use, commercial for everything else (proprietary products, SaaS without source release, redistribution in closed-source form). See LICENSE and NOTICE for the full terms.
Commercial licensing inquiries: info@forktex.com.
The 1.0.0 release on PyPI remains under MIT; from 0.2.3 onwards the package ships AGPL-3.0+Commercial. 0.3.0 adds forktex_core.flow (durable execution + state-machine graphs) and begins the psql → db / redis → cache rename.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file forktex_core-0.3.0.tar.gz.
File metadata
- Download URL: forktex_core-0.3.0.tar.gz
- Upload date:
- Size: 60.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
94edb8c0669761fb754eb3e34e0f8d527e04134408fe71079a133859a899f39e
|
|
| MD5 |
117325feb45de7631042386bee8ee263
|
|
| BLAKE2b-256 |
b76ffc5e26ec480987cb58036c806c3f37dbef725b8291dc03637e161fa39969
|
File details
Details for the file forktex_core-0.3.0-py3-none-any.whl.
File metadata
- Download URL: forktex_core-0.3.0-py3-none-any.whl
- Upload date:
- Size: 84.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e1ba5c8bb6bbce194434328d4e97243fd556b1d9b5553511ea26782d1b389bb9
|
|
| MD5 |
9661fa303a78d4f845feef159730b2f6
|
|
| BLAKE2b-256 |
ea3a6447bbe97ff1a1389ed357df63c59d26876de22186fb748bfd518a2daacd
|