Skip to main content

Postgres-native durable workflow execution for the ForkTex ecosystem — graph-first pipelines, state machines, and AI agent loops

Project description

forktex-core

PyPI Python License

The shared substrate every ForkTex Python service builds on — async Postgres, caching, durable execution, secure object storage, background queues, and a virtual-data layer for tenant-defined schemas. One library, six modules, one set of opinions.

forktex-core exists so individual services don't reinvent connection management, retry/replay scaffolding, signed-URL plumbing, or the umpteenth bespoke state machine. Each consuming service imports what it needs and inherits the same operational shape.

Install

pip install forktex-core

PostgreSQL (asyncpg + SQLAlchemy 2) and Redis (hiredis) are bundled defaults — no optional extras to remember. Requires Python 3.12+.

Architecture overview

forktex_core is a flat namespace of independent modules. Pull in only what you need; nothing imports cross-module unless explicitly composed.

Module Status Purpose
db 0.3 (currently psql, renaming) Async SQLAlchemy engine, session lifecycle, base ORM (BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin), CRUD helpers
cache 0.3 (currently redis, renaming) Async Redis client, @cached, namespaced keys, sliding-window rate limiter
flow 0.4 Postgres-native durable execution: graph-first pipelines, state machines, and AI agent loops — @flow.pipeline / @flow.graph / @flow.scheduled, namespace-track runtime definitions, InstanceQuery filter/sort/page API
vault planned (0.4) Fernet-based symmetric encryption for credential blobs at rest, KEK (key-encryption-key) rotation flow, EncryptedJSON SQLAlchemy column type
storage planned (0.4) S3/MinIO-compatible object storage client + secure callback URLs (consumer-driven public-config mapping)
queue planned (0.4) Lightweight background-job queue (likely arq-backed) for fire-and-forget work that isn't durable
data planned (0.5) Generalised "virtual database" layer: dynamic registers / fields / relationships with two layers of abstraction — *Query / *Result business objects above the Postgres primitives

The first three are real today (with the db / cache renames in flight); the last four are the planned roadmap toward 1.0. The table is the source of truth — when in doubt, check what the module exports.

Module reference

db — async Postgres

(currently exported as forktex_core.psql while the rename lands; both will be importable through 0.4 for migration breathing room.)

from forktex_core.db import (
    init_engine, close_engine, get_session,
    BaseDBModel, TimestampMixin, AuditMixin, OrgScopedMixin,
)

init_engine("postgresql+asyncpg://user:pass@localhost/db", pool_size=20)

async with get_session() as session:
    user = await session.get(User, user_id)

What's in the box:

  • Engine + session lifecycle — single init_engine / close_engine pair, get_session context manager that commits/rolls-back automatically.
  • Base model + mixinsBaseDBModel (UUID PK + timestamps), TimestampMixin (created_at/updated_at), AuditMixin (created_by/updated_by), OrgScopedMixin (org_id FK + tenant filter helper). All composable.
  • CRUD helpers — typed generic Repository[Model] with the usual list/get/create/update/delete + filter/sort/paginate built on SQLAlchemy 2's select() API.
  • Migration friendliness — every table the library defines lives in its own schema (e.g. forktex_flow.*) so the consumer's alembic only sees consumer-owned tables.

cache — Redis

(currently exported as forktex_core.redis; same parallel-import migration window as db.)

from forktex_core.cache import init, close, cached, rate_limit

await init("redis://localhost:6379/0")

@cached(ttl=300, namespace="user:profile")
async def get_profile(user_id: str): ...

ok, retry_after = await rate_limit("login", key=ip, limit=5, window_seconds=60)

What's in the box:

  • Connection poolinit / close / get_client, hiredis-backed.
  • @cached(ttl=…) — transparent decorator caching for any async function with serialisable args.
  • Typed opsget_json, set_json, incr, mget, delete_pattern (cluster-safe).
  • NamespacesNamespace("user.profile").key(user_id) helper so cross-service key collisions stay impossible.
  • Sliding-window rate limiter — production-tested limiter for auth middleware and similar hot paths.

flow — durable execution

Postgres-native workflows that survive crashes, replay-on-resume from a checkpoint log, and elect one leader at a time across N workers via advisory locks. Schema-isolated under forktex_flow.* — the consumer's alembic never sees these tables.

Everything is a graph. A linear deploy pipeline is a chain graph. A user-onboarding state machine is a cyclic graph with event-driven edges. An AI agent loop is a cyclic graph with model-driven routing. All three use the same @step primitive and the same Flow runtime.

Quick start

from forktex_core.flow import Flow, Ctx, step, node, parallel, edge, conditional, wait_edge, START, END
from typing import TypedDict, Annotated
import operator

flow = Flow(database_url="postgresql+asyncpg://user:pass@host/db")

Standalone steps — the single durable primitive

# @step / @node are aliases — standalone, not bound to any Flow instance.
# Returns a partial state-update dict merged into the run's accumulated state.
# Every @step call is implicitly a durable checkpoint (cached on crash recovery).

@step
async def provision(ctx: Ctx, state: dict) -> dict:
    server_id = await provider.create(state["manifest"])
    return {"server_id": server_id}           # ← only the keys that changed

@step(max_attempts=5, backoff=(10.0, 60.0, 300.0))
async def configure(ctx: Ctx, state: dict) -> dict:
    await ansible.run(state["server_id"])
    return {}

Three workflow declaration styles

@flow.scheduled — one function, one step, one workflow

@flow.scheduled("cloud.backup.create", version=1, cron="0 2 * * *")
async def backup_create(ctx: Ctx, state: dict) -> dict:
    return await run_backup(state)

Compiles to START → backup_create → END. Also manually triggerable via flow.run().

@flow.pipeline — linear steps array

from forktex_core.flow import step, parallel

class DeployState(TypedDict):
    org_id: str
    manifest: dict
    server_id: str | None
    logs: Annotated[list[str], operator.add]   # reducer: append, not overwrite

@flow.pipeline("cloud.deploy.up", version=1, state=DeployState)
class DeployUp:
    steps = [
        provision,
        configure,
        step(setup_dns, when=lambda s: s["manifest"].get("dns")),  # optional: skipped if False
        parallel(verify_dns, verify_ssl),                           # fan-out + implicit join
        health_check,
    ]
  • step(fn, when=cond) — node is skipped (state passes through unchanged) if cond(state) is False
  • parallel(a, b, c) — all execute concurrently; merged state continues to next step
  • Optional cron= kwarg makes it also fire on a schedule

@flow.graph — full explicit topology

Graduation point from pipeline when you need branching, cycles, or event-driven transitions:

from forktex_core.flow import edge, conditional, wait_edge

# Branching deploy
@flow.graph("cloud.deploy.advanced", version=1, state=DeployState)
class DeployAdvanced:
    nodes = {
        "provision":    provision,
        "configure":    configure,
        "rollback":     rollback,
        "health_check": health_check,
    }
    topology = [
        edge(START,           "provision"),
        edge("provision",     "configure"),
        conditional("configure", router_fn, {
            "rollback":     "rollback",
            "health_check": "health_check",
        }),
        edge("health_check",  END),
        edge("rollback",      END),
    ]

# State machine (event-driven)
@flow.graph("user.onboarding", version=1, state=OnboardingState)
class UserOnboarding:
    nodes    = {"email_pending": email_pending, "verified": verified}
    entry    = "email_pending"
    terminal = "verified"
    topology = [
        wait_edge("email_pending", "verified", on="email.verified"),  # suspends until signal
    ]

# AI agent loop (cyclic)
@flow.graph("intelligence.agent", version=1, state=AgentState)
class AgentLoop:
    nodes    = {"llm_call": llm_call, "tool_executor": tool_executor}
    topology = [
        edge(START, "llm_call"),
        conditional("llm_call", should_continue, {
            "tool_executor": "tool_executor",
            END:             END,
        }),
        edge("tool_executor", "llm_call"),   # cycle back
    ]

Namespace track — runtime definitions

For workflows defined by users at runtime (e.g. a network UI where each org configures their own automation):

# Platform engineers register step templates (the building blocks)
@flow.step_template("network.reroute_traffic")
@step
async def reroute(ctx: Ctx, state: dict) -> dict: ...

@flow.step_template("network.send_alert")
@step
async def alert(ctx: Ctx, state: dict) -> dict: ...

# Org users define their workflow at runtime (via API / UI)
await flow.define(
    name      = "link_failure_response",
    namespace = "org-abc",
    version   = 1,
    config    = {
        "type":  "pipeline",
        "steps": [
            "network.reroute_traffic",
            {"step": "network.send_alert", "when": {"field": "ok", "is": False}},
        ],
    },
)

# Delete (raises if active runs exist)
await flow.undefine("link_failure_response", namespace="org-abc")

Config format for graphs:

config = {
    "type": "graph",
    "topology": [
        {"from": "__START__", "to": "detect"},
        {"from": "detect",    "to": "reroute",   "on": "peer.down"},  # wait_edge
        {"from": "reroute",   "to": "__END__"},
    ],
}

Dispatch — identical for all workflow types

# Platform-track (no namespace)
instance = await flow.run(
    "cloud.deploy.up",
    state    = {"org_id": "...", "manifest": {...}},
    metadata = {"org_id": "...", "kind": "up"},
)

# Namespace-track
instance = await flow.run(
    "link_failure_response",
    namespace = "org-abc",
    state     = {"link_id": "eth0-router-1"},
)

# Via definition handle
defn     = flow.workflow("cloud.deploy.up")
instance = await defn.run(state={...}, metadata={...})

WorkflowInstance — the run handle

instance.instance_id      # UUID
instance.workflow_name    # "cloud.deploy.up"
instance.status           # "pending" | "running" | "completed" | "failed" | "cancelled"
instance.state            # current accumulated state dict
instance.current_node     # "configure" — which node is executing now
instance.nodes            # list[NodeInstance] with per-node status, duration, delta

await instance.cancel()
await instance.wait(timeout=300)                         # blocks until terminal
await instance.send("email.verified", payload={...})     # advance a wait_edge
await instance.refresh()                                 # re-fetch from DB
async for event in instance.stream(): ...                # live SSE events

Query — filter / sort / page / aggregate

# Cloud dashboard: recent deploy runs for an org
page = await (
    flow.query()
    .workflow("cloud.deploy.up")
    .status("running", "failed")
    .metadata(org_id="abc")
    .sort("started_at", desc=True)
    .limit(20)
    .fetch()
)
page.items        # list[WorkflowInstance]
page.total        # total matching (for pagination UI)
page.next_cursor  # str | None — pass to .fetch(cursor=...) for next page

# Network dashboard: summary card
summary = await (
    flow.query()
    .namespace("org-abc")
    .since(datetime.now() - timedelta(days=7))
    .summary()
)
summary.total                  # 23
summary.by_status              # {"completed": 20, "failed": 3}
summary.avg_duration_seconds   # 183.4

# Find by payload field (JSONB @> filter on run.input)
instance = await (
    flow.query()
    .state(link_id="eth0-router-1")
    .status("running")
    .first()
)

# Stuck in a specific node
stuck = await (
    flow.query()
    .namespace("org-abc")
    .current_node("configure_routes")
    .status("running")
    .fetch()
)

Available filter methods: .workflow(name, version=None) · .namespace(ns) · .status(*statuses) · .metadata(**kv) · .state(**kv) · .current_node(*names) · .since(dt) · .until(dt) · .triggered_by(*triggers) · .sort(field, desc=True) · .limit(n)

Terminal methods: .fetch(cursor=None)InstancePage · .count()int · .first()WorkflowInstance | None · .summary()InstanceSummary

Child workflow orchestration (dynamic fan-out)

# From inside a @step body:
@step
async def configure_all_peers(ctx: Ctx, state: NetworkState) -> dict:
    # Scatter: launch N child workflows in parallel
    results = await ctx.map(
        "network.peer.configure",
        states=[{"peer_id": p} for p in state["peers"]],
    )
    return {"peer_results": results}

# Or: spawn + wait for more control
@step
async def deploy_region(ctx: Ctx, state: dict) -> dict:
    instance_id = await ctx.spawn("cloud.deploy.svc", state={"region": state["region"]})
    result_state = await ctx.wait(instance_id)      # durable: survives restart
    return {"region_result": result_state}

Lifecycle

await flow.init()    # apply schema migrations (idempotent)
await flow.start_driver()   # start leader-elected driver
await flow.stop_driver()    # graceful drain

# Or use as async context manager:
async with flow:
    await flow.run(...)

# For FastAPI:
@asynccontextmanager
async def lifespan(app):
    await flow.start_driver()
    yield
    await flow.stop_driver()

State reducers

from typing import Annotated
import operator

class DeployState(TypedDict):
    server_id: str | None           # last-write-wins (default)
    logs: Annotated[list[str], operator.add]  # append reducer: [a] + [b] = [a, b]

Any Annotated[T, fn] field uses fn(existing, new) to merge partial updates. All other fields use last-write-wins. This makes parallel fan-out safe — multiple nodes can update different keys without stepping on each other.

vault — secret encryption + KEK rotation

(planned for 0.4)

The minimal contract:

from forktex_core.vault import Vault, EncryptedJSON

# One process-wide Vault, configured with the platform-wide Fernet KEK.
# (KEK = key-encryption-key — encrypts the blobs, NOT a tenant secret.
#  Leaking it still requires DB access to use; rotation is supported.)
vault = Vault.from_key(settings.master_key)

# Encrypt / decrypt arbitrary JSON-serialisable payloads:
token = vault.encrypt({"provider_token": "...", "ssh_key_id": 12345})
payload = vault.decrypt(token)

# Or use the SQLAlchemy column type for transparent at-rest encryption —
# reads decrypt automatically, writes encrypt automatically:
class OrgProviderCredential(BaseDBModel):
    __tablename__ = "org_provider_credential"
    payload: Mapped[dict] = mapped_column(EncryptedJSON(vault))
    # ... rest of the model

# Generate a fresh Fernet key (e.g. for ops bootstrapping a new env):
new_key = Vault.generate_key()  # base64-encoded 32-byte key

KEK rotation is a first-class flow:

# When ops rotate the master key: re-encrypt every blob against the new
# Vault, then atomically swap the env var. The old Vault stays usable
# until every consumer process restarts with the new key.
old_vault = Vault.from_key(old_master_key)
new_vault = Vault.from_key(new_master_key)

await rotate_blobs(
    session,
    table=OrgProviderCredential,
    column="payload",
    old=old_vault,
    new=new_vault,
)

What it solves: every service that stores credentials (per-org provider tokens, per-tenant SMTP / API keys, per-pipeline upstream creds) ends up reinventing the same Fernet wrapper, the same "decrypt error means rotated KEK" diagnostic, and the same rotation script. vault packages those once.

Decision boundaries:

  • Symmetric only — Fernet under the hood. No asymmetric / public-key crypto here; that's a different problem with different threat models.
  • At-rest encryption, not transport — for transport-level secrecy, terminate TLS at the edge.
  • Single KEK by default — per-tenant key derivation (DEK-per-org wrapped by KEK) is a v2 concern; today's threat model in the ForkTex products is well-served by one platform-wide KEK plus row-level access control.

storage — object storage + signed URLs

(planned for 0.4)

The minimal contract:

from forktex_core.storage import StorageClient, PublicAccess

storage = StorageClient(endpoint="s3://...", credentials=...)

# Upload from a service:
await storage.put("invoices/2026-04/INV-123.pdf", body, content_type="application/pdf")

# Mint a time-limited signed URL for a callback or a tenant download:
url = await storage.signed_url(
    "invoices/2026-04/INV-123.pdf",
    access=PublicAccess.read,
    ttl_seconds=600,
)

What it solves: every service that ships file uploads/downloads ends up reinventing the same five things — bucket layout, content-type sniffing, secure short-lived URLs, public-vs-private access policy, and a callback path that the consumer's public config (CDN host, custom domain) can override. storage packages those once, with the public-link mapping driven by consumer-side configuration so the same code works against MinIO in dev and S3 + CloudFront in prod.

S3-compatible by default (MinIO + AWS S3); GCS / Azure Blob behind the same StorageClient interface as a follow-up.

queue — background jobs

(planned for 0.4)

For fire-and-forget work that doesn't need flow's durability guarantees: send-an-email, refresh-a-cache, kick-off-a-report. Likely arq-backed (Redis-native, async-first, lighter than Celery), wrapped in a thin opinionated client so consumers get the same enqueue / process ergonomics across services.

Decision boundary vs. flow:

  • flow: must survive crashes, must replay-on-resume, has steps with retry policies, lives for minutes-to-hours, observability matters.
  • queue: short-lived, idempotent, "best-effort" fire-and-forget; if a worker dies mid-job, redelivery is cheap and the job runs again.

If you're unsure which you need, you probably need flow.

data — virtual schemas + Query/Result abstraction

(planned for 0.5)

A common need across ForkTex services is a layer for tenant-defined data: registers (logical tables), fields (logical columns), relationships, all CRUD-able by end users at runtime. Flexible business-process tools pivot on this every day.

data lifts that pattern to two clean abstraction levels:

  1. Storage layer (Postgres-backed, leans on db): the dynamic-schema mechanics — register definitions, field definitions, relationship graph, JSONB-backed row storage with per-field type coercion, indexable derived columns.
  2. Business-logic layer: composable *Query objects (filtering / sorting / paginating-or-scrolling / soft-archiving / hard-deleting / search / faceting) returning typed *Result objects. The same *Query shape applies whether the underlying register is Postgres-stored, virtualised over an external API, or computed on the fly — so consumers don't have to know which.
# Aspirational sketch — final API will be informed by the extraction.
from forktex_core.data import Register, RowQuery

invoices = Register.load("invoices", tenant_id=...)
result = await (
    RowQuery(invoices)
    .where(status="paid")
    .since(date(2026, 1, 1))
    .order_by("-issued_at")
    .scroll(page_size=50)
)

Why this lives in core rather than each consumer rolling its own: every B2B product reaches a point where end users want to add a field. Once you've solved that once, you don't want to solve it three more times in three more services.

Configuration

forktex-core does not read environment variables directly. Connection strings, credentials, KEKs, and pool settings are passed in by the host service at init_engine / init / Flow(...) / Vault.from_key(...) / StorageClient(...) construction time. Each consuming service decides its own configuration source (env vars, secrets manager, manifest, etc.).

Development

The root Makefile is generated by forktex fsd makefile sync from forktex.json — do not hand-edit.

make help              # list every available target
make deps              # editable install
make format            # ruff format
make lint              # ruff check
make test              # pytest tests/
make build             # python3 -m build → dist/
make ci                # format-check + lint + license-check + audit + test + build
make clean             # remove caches and dist/

make ci is the single command that gates a publish: format-check, lint, dual-license header verification across every source file, dependency CVE audit, full pytest sweep (uses Postgres testcontainers for flow), wheel + sdist build, and twine check.

make start / make stop / make logs are intentional no-ops — forktex-core is a library, there is nothing to run.

License headers

Every source file carries the AGPL-3.0 + Commercial dual-license SPDX header, applied idempotently via:

make license-check    # CI gate — fails if any source file is missing the header
make license-fix      # add or refresh headers across src/, tests/, scripts/
make license-strip    # remove headers (used before license-model changes)

FSD Self-Description

This repo follows the ForkTex Standard Delivery (FSD) shape:

  • root manifest: forktex.json
  • profile: workspace/python-monorepo
  • target maturity: L3
  • single publishable package: forktex-core at path .

Re-validate locally with:

forktex fsd --project-dir . check
forktex arch discover --project-dir . --output-dir /tmp/arch-corepy

License

Dual-licensed — AGPL-3.0-or-later for open-source use, commercial for everything else (proprietary products, SaaS without source release, redistribution in closed-source form). See LICENSE and NOTICE for the full terms.

Commercial licensing inquiries: info@forktex.com.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

forktex_core-0.4.1.tar.gz (80.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

forktex_core-0.4.1-py3-none-any.whl (104.5 kB view details)

Uploaded Python 3

File details

Details for the file forktex_core-0.4.1.tar.gz.

File metadata

  • Download URL: forktex_core-0.4.1.tar.gz
  • Upload date:
  • Size: 80.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for forktex_core-0.4.1.tar.gz
Algorithm Hash digest
SHA256 13812417eab6cc4848b10dfbc9d5eb0f37668e70958c08414dfa70fa0e0a59c4
MD5 e38e8ef803dbfe11631be76a83b69421
BLAKE2b-256 cbf7ef90c64397c628d768d3241d3659177bb03cf9ad707b4e1f3c41a8a70733

See more details on using hashes here.

File details

Details for the file forktex_core-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: forktex_core-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 104.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for forktex_core-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 33cc607d4c6125e1dc11e8644894a6e81ab4ab0f5e0f2d592a7f7c7112e22d83
MD5 4b9c51145739b90da1dc52ba3223f1ae
BLAKE2b-256 c562681b8c52e7091b466a264ffad9df0f9918258ed87527ecb27207f4714673

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page