Postgres-native durable workflow execution for the ForkTex ecosystem — graph-first pipelines, state machines, and AI agent loops
Project description
forktex-core
The shared substrate every ForkTex Python service builds on — async Postgres, caching, durable execution, secure object storage, background queues, and a virtual-data layer for tenant-defined schemas. One library, six modules, one set of opinions.
forktex-core exists so individual services don't reinvent connection management, retry/replay scaffolding, signed-URL plumbing, or the umpteenth bespoke state machine. Each consuming service imports what it needs and inherits the same operational shape.
Install
pip install forktex-core
PostgreSQL (asyncpg + SQLAlchemy 2) and Redis (hiredis) are bundled defaults — no optional extras to remember. Requires Python 3.12+.
Architecture overview
forktex_core is a flat namespace of independent modules. Pull in only what you need; nothing imports cross-module unless explicitly composed.
| Module | Status | Purpose |
|---|---|---|
db |
0.3 (currently psql, renaming) |
Async SQLAlchemy engine, session lifecycle, base ORM (BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin), CRUD helpers |
cache |
0.3 (currently redis, renaming) |
Async Redis client, @cached, namespaced keys, sliding-window rate limiter |
flow |
0.4 | Postgres-native durable execution: graph-first pipelines, state machines, and AI agent loops — @flow.pipeline / @flow.graph / @flow.scheduled, namespace-track runtime definitions, InstanceQuery filter/sort/page API |
vault |
planned (0.4) | Fernet-based symmetric encryption for credential blobs at rest, KEK (key-encryption-key) rotation flow, EncryptedJSON SQLAlchemy column type |
storage |
planned (0.4) | S3/MinIO-compatible object storage client + secure callback URLs (consumer-driven public-config mapping) |
queue |
planned (0.4) | Lightweight background-job queue (likely arq-backed) for fire-and-forget work that isn't durable |
data |
planned (0.5) | Generalised "virtual database" layer: dynamic registers / fields / relationships with two layers of abstraction — *Query / *Result business objects above the Postgres primitives |
The first three are real today (with the db / cache renames in flight); the last four are the planned roadmap toward 1.0. The table is the source of truth — when in doubt, check what the module exports.
Module reference
db — async Postgres
(currently exported as forktex_core.psql while the rename lands; both will be importable through 0.4 for migration breathing room.)
from forktex_core.db import (
init_engine, close_engine, get_session,
BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin,
)
init_engine("postgresql+asyncpg://user:pass@localhost/db", pool_size=20)
async with get_session() as session:
user = await session.get(User, user_id)
What's in the box:
- Engine + session lifecycle — single
init_engine/close_enginepair,get_sessioncontext manager that commits/rolls-back automatically. - Base model + mixins —
BaseDBModel(UUID PK + timestamps),TimestampMixin(created_at/updated_at),AuditMixin(created_by/updated_by),OrgScopedMixin(org_id FK + tenant filter helper). All composable. - CRUD helpers — typed generic
Repository[Model]with the usual list/get/create/update/delete + filter/sort/paginate built on SQLAlchemy 2'sselect()API. - Migration friendliness — every table the library defines lives in its own schema (e.g.
forktex_flow.*) so the consumer's alembic only sees consumer-owned tables.
cache — Redis
(currently exported as forktex_core.redis; same parallel-import migration window as db.)
from forktex_core.cache import init, close, cached, rate_limit
await init("redis://localhost:6379/0")
@cached(ttl=300, namespace="user:profile")
async def get_profile(user_id: str): ...
ok, retry_after = await rate_limit("login", key=ip, limit=5, window_seconds=60)
What's in the box:
- Connection pool —
init/close/get_client, hiredis-backed. @cached(ttl=…)— transparent decorator caching for any async function with serialisable args.- Typed ops —
get_json,set_json,incr,mget,delete_pattern(cluster-safe). - Namespaces —
Namespace("user.profile").key(user_id)helper so cross-service key collisions stay impossible. - Sliding-window rate limiter — production-tested limiter for auth middleware and similar hot paths.
flow — durable execution
Postgres-native workflows that survive crashes, replay-on-resume from a checkpoint log, and elect one leader at a time across N workers via advisory locks. Schema-isolated under forktex_flow.* — the consumer's alembic never sees these tables.
Everything is a graph. A linear deploy pipeline is a chain graph. A user-onboarding state machine is a cyclic graph with event-driven edges. An AI agent loop is a cyclic graph with model-driven routing. All three use the same @step primitive and the same Flow runtime.
Quick start
from forktex_core.flow import Flow, Ctx, step, node, parallel, edge, conditional, wait_edge, START, END
from typing import TypedDict, Annotated
import operator
flow = Flow(database_url="postgresql+asyncpg://user:pass@host/db")
Standalone steps — the single durable primitive
# @step / @node are aliases — standalone, not bound to any Flow instance.
# Returns a partial state-update dict merged into the run's accumulated state.
# Every @step call is implicitly a durable checkpoint (cached on crash recovery).
@step
async def provision(ctx: Ctx, state: dict) -> dict:
server_id = await provider.create(state["manifest"])
return {"server_id": server_id} # ← only the keys that changed
@step(max_attempts=5, backoff=(10.0, 60.0, 300.0))
async def configure(ctx: Ctx, state: dict) -> dict:
await ansible.run(state["server_id"])
return {}
Three workflow declaration styles
@flow.scheduled — one function, one step, one workflow
@flow.scheduled("cloud.backup.create", version=1, cron="0 2 * * *")
async def backup_create(ctx: Ctx, state: dict) -> dict:
return await run_backup(state)
Compiles to START → backup_create → END. Also manually triggerable via flow.run().
@flow.pipeline — linear steps array
from forktex_core.flow import step, parallel
class DeployState(TypedDict):
org_id: str
manifest: dict
server_id: str | None
logs: Annotated[list[str], operator.add] # reducer: append, not overwrite
@flow.pipeline("cloud.deploy.up", version=1, state=DeployState)
class DeployUp:
steps = [
provision,
configure,
step(setup_dns, when=lambda s: s["manifest"].get("dns")), # optional: skipped if False
parallel(verify_dns, verify_ssl), # fan-out + implicit join
health_check,
]
step(fn, when=cond)— node is skipped (state passes through unchanged) ifcond(state)isFalseparallel(a, b, c)— all execute concurrently; merged state continues to next step- Optional
cron=kwarg makes it also fire on a schedule
@flow.graph — full explicit topology
Graduation point from pipeline when you need branching, cycles, or event-driven transitions:
from forktex_core.flow import edge, conditional, wait_edge
# Branching deploy
@flow.graph("cloud.deploy.advanced", version=1, state=DeployState)
class DeployAdvanced:
nodes = {
"provision": provision,
"configure": configure,
"rollback": rollback,
"health_check": health_check,
}
topology = [
edge(START, "provision"),
edge("provision", "configure"),
conditional("configure", router_fn, {
"rollback": "rollback",
"health_check": "health_check",
}),
edge("health_check", END),
edge("rollback", END),
]
# State machine (event-driven)
@flow.graph("user.onboarding", version=1, state=OnboardingState)
class UserOnboarding:
nodes = {"email_pending": email_pending, "verified": verified}
entry = "email_pending"
terminal = "verified"
topology = [
wait_edge("email_pending", "verified", on="email.verified"), # suspends until signal
]
# AI agent loop (cyclic)
@flow.graph("intelligence.agent", version=1, state=AgentState)
class AgentLoop:
nodes = {"llm_call": llm_call, "tool_executor": tool_executor}
topology = [
edge(START, "llm_call"),
conditional("llm_call", should_continue, {
"tool_executor": "tool_executor",
END: END,
}),
edge("tool_executor", "llm_call"), # cycle back
]
Namespace track — runtime definitions
For workflows defined by users at runtime (e.g. a network UI where each org configures their own automation):
# Platform engineers register step templates (the building blocks)
@flow.step_template("network.reroute_traffic")
@step
async def reroute(ctx: Ctx, state: dict) -> dict: ...
@flow.step_template("network.send_alert")
@step
async def alert(ctx: Ctx, state: dict) -> dict: ...
# Org users define their workflow at runtime (via API / UI)
await flow.define(
name = "link_failure_response",
namespace = "org-abc",
version = 1,
config = {
"type": "pipeline",
"steps": [
"network.reroute_traffic",
{"step": "network.send_alert", "when": {"field": "ok", "is": False}},
],
},
)
# Delete (raises if active runs exist)
await flow.undefine("link_failure_response", namespace="org-abc")
Config format for graphs:
config = {
"type": "graph",
"topology": [
{"from": "__START__", "to": "detect"},
{"from": "detect", "to": "reroute", "on": "peer.down"}, # wait_edge
{"from": "reroute", "to": "__END__"},
],
}
Dispatch — identical for all workflow types
# Platform-track (no namespace)
instance = await flow.run(
"cloud.deploy.up",
state = {"org_id": "...", "manifest": {...}},
metadata = {"org_id": "...", "kind": "up"},
)
# Namespace-track
instance = await flow.run(
"link_failure_response",
namespace = "org-abc",
state = {"link_id": "eth0-router-1"},
)
# Via definition handle
defn = flow.workflow("cloud.deploy.up")
instance = await defn.run(state={...}, metadata={...})
WorkflowInstance — the run handle
instance.instance_id # UUID
instance.workflow_name # "cloud.deploy.up"
instance.status # "pending" | "running" | "completed" | "failed" | "cancelled"
instance.state # current accumulated state dict
instance.current_node # "configure" — which node is executing now
instance.nodes # list[NodeInstance] with per-node status, duration, delta
await instance.cancel()
await instance.wait(timeout=300) # blocks until terminal
await instance.send("email.verified", payload={...}) # advance a wait_edge
await instance.refresh() # re-fetch from DB
async for event in instance.stream(): ... # live SSE events
Query — filter / sort / page / aggregate
# Cloud dashboard: recent deploy runs for an org
page = await (
flow.query()
.workflow("cloud.deploy.up")
.status("running", "failed")
.metadata(org_id="abc")
.sort("started_at", desc=True)
.limit(20)
.fetch()
)
page.items # list[WorkflowInstance]
page.total # total matching (for pagination UI)
page.next_cursor # str | None — pass to .fetch(cursor=...) for next page
# Network dashboard: summary card
summary = await (
flow.query()
.namespace("org-abc")
.since(datetime.now() - timedelta(days=7))
.summary()
)
summary.total # 23
summary.by_status # {"completed": 20, "failed": 3}
summary.avg_duration_seconds # 183.4
# Find by payload field (JSONB @> filter on run.input)
instance = await (
flow.query()
.state(link_id="eth0-router-1")
.status("running")
.first()
)
# Stuck in a specific node
stuck = await (
flow.query()
.namespace("org-abc")
.current_node("configure_routes")
.status("running")
.fetch()
)
Available filter methods: .workflow(name, version=None) · .namespace(ns) · .status(*statuses) · .metadata(**kv) · .state(**kv) · .current_node(*names) · .since(dt) · .until(dt) · .triggered_by(*triggers) · .sort(field, desc=True) · .limit(n)
Terminal methods: .fetch(cursor=None) → InstancePage · .count() → int · .first() → WorkflowInstance | None · .summary() → InstanceSummary
Child workflow orchestration (dynamic fan-out)
# From inside a @step body:
@step
async def configure_all_peers(ctx: Ctx, state: NetworkState) -> dict:
# Scatter: launch N child workflows in parallel
results = await ctx.map(
"network.peer.configure",
states=[{"peer_id": p} for p in state["peers"]],
)
return {"peer_results": results}
# Or: spawn + wait for more control
@step
async def deploy_region(ctx: Ctx, state: dict) -> dict:
instance_id = await ctx.spawn("cloud.deploy.svc", state={"region": state["region"]})
result_state = await ctx.wait(instance_id) # durable: survives restart
return {"region_result": result_state}
Lifecycle
await flow.init() # apply schema migrations (idempotent)
await flow.start_driver() # start leader-elected driver
await flow.stop_driver() # graceful drain
# Or use as async context manager:
async with flow:
await flow.run(...)
# For FastAPI:
@asynccontextmanager
async def lifespan(app):
await flow.start_driver()
yield
await flow.stop_driver()
State reducers
from typing import Annotated
import operator
class DeployState(TypedDict):
server_id: str | None # last-write-wins (default)
logs: Annotated[list[str], operator.add] # append reducer: [a] + [b] = [a, b]
Any Annotated[T, fn] field uses fn(existing, new) to merge partial updates. All other fields use last-write-wins. This makes parallel fan-out safe — multiple nodes can update different keys without stepping on each other.
vault — secret encryption + KEK rotation
(planned for 0.4)
The minimal contract:
from forktex_core.vault import Vault, EncryptedJSON
# One process-wide Vault, configured with the platform-wide Fernet KEK.
# (KEK = key-encryption-key — encrypts the blobs, NOT a tenant secret.
# Leaking it still requires DB access to use; rotation is supported.)
vault = Vault.from_key(settings.master_key)
# Encrypt / decrypt arbitrary JSON-serialisable payloads:
token = vault.encrypt({"provider_token": "...", "ssh_key_id": 12345})
payload = vault.decrypt(token)
# Or use the SQLAlchemy column type for transparent at-rest encryption —
# reads decrypt automatically, writes encrypt automatically:
class OrgProviderCredential(BaseDBModel):
__tablename__ = "org_provider_credential"
payload: Mapped[dict] = mapped_column(EncryptedJSON(vault))
# ... rest of the model
# Generate a fresh Fernet key (e.g. for ops bootstrapping a new env):
new_key = Vault.generate_key() # base64-encoded 32-byte key
KEK rotation is a first-class flow:
# When ops rotate the master key: re-encrypt every blob against the new
# Vault, then atomically swap the env var. The old Vault stays usable
# until every consumer process restarts with the new key.
old_vault = Vault.from_key(old_master_key)
new_vault = Vault.from_key(new_master_key)
await rotate_blobs(
session,
table=OrgProviderCredential,
column="payload",
old=old_vault,
new=new_vault,
)
What it solves: every service that stores credentials (per-org provider tokens, per-tenant SMTP / API keys, per-pipeline upstream creds) ends up reinventing the same Fernet wrapper, the same "decrypt error means rotated KEK" diagnostic, and the same rotation script. vault packages those once.
Decision boundaries:
- Symmetric only — Fernet under the hood. No asymmetric / public-key crypto here; that's a different problem with different threat models.
- At-rest encryption, not transport — for transport-level secrecy, terminate TLS at the edge.
- Single KEK by default — per-tenant key derivation (DEK-per-org wrapped by KEK) is a v2 concern; today's threat model in the ForkTex products is well-served by one platform-wide KEK plus row-level access control.
storage — object storage + signed URLs
(planned for 0.4)
The minimal contract:
from forktex_core.storage import StorageClient, PublicAccess
storage = StorageClient(endpoint="s3://...", credentials=...)
# Upload from a service:
await storage.put("invoices/2026-04/INV-123.pdf", body, content_type="application/pdf")
# Mint a time-limited signed URL for a callback or a tenant download:
url = await storage.signed_url(
"invoices/2026-04/INV-123.pdf",
access=PublicAccess.read,
ttl_seconds=600,
)
What it solves: every service that ships file uploads/downloads ends up reinventing the same five things — bucket layout, content-type sniffing, secure short-lived URLs, public-vs-private access policy, and a callback path that the consumer's public config (CDN host, custom domain) can override. storage packages those once, with the public-link mapping driven by consumer-side configuration so the same code works against MinIO in dev and S3 + CloudFront in prod.
S3-compatible by default (MinIO + AWS S3); GCS / Azure Blob behind the same StorageClient interface as a follow-up.
queue — background jobs
(planned for 0.4)
For fire-and-forget work that doesn't need flow's durability guarantees: send-an-email, refresh-a-cache, kick-off-a-report. Likely arq-backed (Redis-native, async-first, lighter than Celery), wrapped in a thin opinionated client so consumers get the same enqueue / process ergonomics across services.
Decision boundary vs. flow:
flow: must survive crashes, must replay-on-resume, has steps with retry policies, lives for minutes-to-hours, observability matters.queue: short-lived, idempotent, "best-effort" fire-and-forget; if a worker dies mid-job, redelivery is cheap and the job runs again.
If you're unsure which you need, you probably need flow.
data — virtual schemas + Query/Result abstraction
(planned for 0.5)
A common need across ForkTex services is a layer for tenant-defined data: registers (logical tables), fields (logical columns), relationships, all CRUD-able by end users at runtime. Flexible business-process tools pivot on this every day.
data lifts that pattern to two clean abstraction levels:
- Storage layer (Postgres-backed, leans on
db): the dynamic-schema mechanics — register definitions, field definitions, relationship graph, JSONB-backed row storage with per-field type coercion, indexable derived columns. - Business-logic layer: composable
*Queryobjects (filtering / sorting / paginating-or-scrolling / soft-archiving / hard-deleting / search / faceting) returning typed*Resultobjects. The same*Queryshape applies whether the underlying register is Postgres-stored, virtualised over an external API, or computed on the fly — so consumers don't have to know which.
# Aspirational sketch — final API will be informed by the extraction.
from forktex_core.data import Register, RowQuery
invoices = Register.load("invoices", tenant_id=...)
result = await (
RowQuery(invoices)
.where(status="paid")
.since(date(2026, 1, 1))
.order_by("-issued_at")
.scroll(page_size=50)
)
Why this lives in core rather than each consumer rolling its own: every B2B product reaches a point where end users want to add a field. Once you've solved that once, you don't want to solve it three more times in three more services.
Configuration
forktex-core does not read environment variables directly. Connection strings, credentials, KEKs, and pool settings are passed in by the host service at init_engine / init / Flow(...) / Vault.from_key(...) / StorageClient(...) construction time. Each consuming service decides its own configuration source (env vars, secrets manager, manifest, etc.).
Development
The root Makefile is generated by forktex fsd makefile sync from forktex.json — do not hand-edit.
make help # list every available target
make deps # editable install
make format # ruff format
make lint # ruff check
make test # pytest tests/
make build # python3 -m build → dist/
make ci # format-check + lint + license-check + audit + test + build
make clean # remove caches and dist/
make ci is the single command that gates a publish: format-check, lint, dual-license header verification across every source file, dependency CVE audit, full pytest sweep (uses Postgres testcontainers for flow), wheel + sdist build, and twine check.
make start / make stop / make logs are intentional no-ops — forktex-core is a library, there is nothing to run.
License headers
Every source file carries the AGPL-3.0 + Commercial dual-license SPDX header, applied idempotently via:
make license-check # CI gate — fails if any source file is missing the header
make license-fix # add or refresh headers across src/, tests/, scripts/
make license-strip # remove headers (used before license-model changes)
FSD Self-Description
This repo follows the ForkTex Standard Delivery (FSD) shape:
- root manifest:
forktex.json - profile:
workspace/python-monorepo - target maturity:
L3 - single publishable package:
forktex-coreat path.
Re-validate locally with:
forktex fsd --project-dir . check
forktex arch discover --project-dir . --output-dir /tmp/arch-corepy
License
Dual-licensed — AGPL-3.0-or-later for open-source use, commercial for everything else (proprietary products, SaaS without source release, redistribution in closed-source form). See LICENSE and NOTICE for the full terms.
Commercial licensing inquiries: info@forktex.com.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file forktex_core-0.4.1.tar.gz.
File metadata
- Download URL: forktex_core-0.4.1.tar.gz
- Upload date:
- Size: 80.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
13812417eab6cc4848b10dfbc9d5eb0f37668e70958c08414dfa70fa0e0a59c4
|
|
| MD5 |
e38e8ef803dbfe11631be76a83b69421
|
|
| BLAKE2b-256 |
cbf7ef90c64397c628d768d3241d3659177bb03cf9ad707b4e1f3c41a8a70733
|
File details
Details for the file forktex_core-0.4.1-py3-none-any.whl.
File metadata
- Download URL: forktex_core-0.4.1-py3-none-any.whl
- Upload date:
- Size: 104.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
33cc607d4c6125e1dc11e8644894a6e81ab4ab0f5e0f2d592a7f7c7112e22d83
|
|
| MD5 |
4b9c51145739b90da1dc52ba3223f1ae
|
|
| BLAKE2b-256 |
c562681b8c52e7091b466a264ffad9df0f9918258ed87527ecb27207f4714673
|