Skip to main content

Shared Python substrate for ForkTex services — async Postgres, Redis, durable execution, encryption, S3/MinIO, background jobs, vector search, virtual schemas, and structured logging

Project description

forktex-core

PyPI Python License

Shared Python substrate for ForkTex services — nine independent modules, one install.

pip install forktex-core                 # db · cache · flow · log
pip install forktex-core[vault]          # + Fernet encryption
pip install forktex-core[storage]        # + S3/MinIO
pip install forktex-core[queue]          # + arq background jobs
pip install forktex-core[vector]         # + Qdrant vector search
pip install forktex-core[all]            # everything

Modules

Module Purpose Reference
db Async Postgres — engine, session, ORM base classes, CRUD, advisory locks, migration runner docs/db.md
cache Async Redis — @cached, stale-while-revalidate, namespaced keys docs/cache.md
flow Durable execution — pipelines, graphs, state machines, AI agent loops docs/flow.md
vault Encryption at rest — Vault, EncryptedJSON column type, KEK rotation docs/vault.md
storage Object storage — S3/MinIO connector, multi-bucket, presigned URLs docs/storage.md
queue Background jobs — @task, enqueue, worker, inspect/cancel docs/queue.md
vector Vector search — Qdrant, dense/hybrid/multimodal, cross-collection docs/vector.md
data Virtual schemas — tenant-defined entities, JSONB rows, DataQuery docs/data.md
log Structured logging — JSON/Loki, trace_id contextvar, FastAPI middleware docs/log.md

Usage

log — set up first, before anything else

from forktex_core.log import setup_logging, get_logger, TraceIDMiddleware

setup_logging(service="my-service")       # JSON to stdout, INFO
# setup_logging(service="my-service", debug=True)  # human-readable, DEBUG

log = get_logger(__name__)
log.info("starting up")

# FastAPI: add middleware so every request gets a trace_id automatically
app.add_middleware(TraceIDMiddleware)

db — Postgres connection + ORM

from forktex_core.db import init_engine, get_session, BaseDBModel, OrgScopedMixin, AuditMixin
import sqlalchemy as sa
from sqlalchemy.orm import Mapped, mapped_column
import uuid

init_engine("postgresql+asyncpg://user:pass@host/db", pool_size=10)

class Invoice(BaseDBModel, OrgScopedMixin, AuditMixin):
    __tablename__ = "invoice"
    id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    amount: Mapped[int] = mapped_column(sa.Integer)

async with get_session() as session:          # auto-commit / rollback
    session.add(Invoice(org_id=org_id, amount=100))

cache — Redis

from forktex_core.cache import init, cached, async_log_context

await init("redis://localhost:6379/0")

@cached(ttl=300)
async def get_org(org_id: str) -> dict: ...

# Structured log context (also works without cache)
from forktex_core.log import async_log_context
async with async_log_context(org_id=str(org_id)):
    log.info("processing")    # → {..."org_id": "org-xyz"}

flow — durable workflows

from forktex_core.flow import Flow, step

flow = Flow(database_url="postgresql+asyncpg://...")
await flow.init()

@step
async def send_welcome(ctx, state): ...

@flow.pipeline("onboarding.user", version=1)
class UserOnboarding:
    steps = [send_welcome, create_workspace]

instance = await flow.run("onboarding.user", state={"email": "x@y.com"})
await instance.wait(timeout=60)

vault — encryption at rest

from forktex_core.vault import Vault, EncryptedJSON
import os

vault = Vault(kek=os.environ["FTX_KEK"])

class Provider(BaseDBModel):
    __tablename__ = "provider"
    credentials: Mapped[bytes] = mapped_column(EncryptedJSON(vault))

provider.credentials = {"api_key": "sk-..."}   # transparent encrypt/decrypt

storage — S3/MinIO

from forktex_core.storage import register, get_client

# Register once at startup (supports multiple buckets)
register("docs", url="http://minio:9000", bucket="documents",
         access_key=KEY, secret_key=SECRET)

client = get_client("docs")
await client.upload("invoices/abc.pdf", pdf_bytes, content_type="application/pdf")
url = await client.presign("invoices/abc.pdf", expires_in=3600)

# Actor uploads directly to MinIO — no auth header needed, signature is in the URL
put_url = await client.presign("uploads/photo.jpg", method="put_object",
                               content_type="image/jpeg", expires_in=900)

queue — background jobs

from forktex_core.queue import task, init, enqueue, make_worker, JobCtx

await init("redis://localhost:6379/1")

@task(retries=2, timeout=120)
async def process_document(ctx: JobCtx, doc_id: str) -> None:
    ...

job_id = await enqueue(process_document, str(doc.id))

# Worker entrypoint (run separately)
WorkerSettings = make_worker("redis://localhost:6379/1")

vector — semantic search

from forktex_core.vector import Vector, VectorPoint, SearchQuery

vector = Vector(qdrant_url="http://qdrant:6333")
coll = vector.collection(f"org-{org_id}--knowledge")   # use -- not : as separator
await coll.create(dim=1536)

await coll.upsert([VectorPoint(id=1, vector=embed(text), payload={"text": text})])

hits = await coll.search(SearchQuery(vector=embed(query)).limit(10).using("hybrid"))
for h in hits:
    print(h.score, h.payload["text"])

data — runtime schemas

from forktex_core.data import VirtualEntity, VirtualField, EntityMode, FieldDataType

async with get_session() as session:
    entity = VirtualEntity(namespace=str(org_id), slug="contacts",
                           label="Contacts", mode=EntityMode.VIRTUAL)
    session.add(entity)
    await session.flush()
    session.add(VirtualField(entity_id=entity.id, namespace=str(org_id),
                             key="email", label="Email",
                             data_type=FieldDataType.TEXT, is_required=True))

FastAPI integration pattern

from contextlib import asynccontextmanager
from fastapi import FastAPI
from forktex_core.db import init_engine, close_engine
from forktex_core.cache import init as cache_init, close as cache_close
from forktex_core.log import setup_logging, TraceIDMiddleware

setup_logging(service="my-api")   # call before app creation

@asynccontextmanager
async def lifespan(app: FastAPI):
    init_engine(settings.db_url, pool_size=20)
    await cache_init(settings.redis_url)
    yield
    await close_engine()
    await cache_close()

app = FastAPI(lifespan=lifespan)
app.add_middleware(TraceIDMiddleware)

Managed Postgres (no CREATE SCHEMA)

Library schemas (forktex_flow, forktex_data) are isolated from your alembic by default. If your Postgres host doesn't allow CREATE SCHEMA, route them to public:

init_engine(url, schema_translate_map={
    "forktex_flow": None,    # None = public schema
    "forktex_data": None,
})

For AI agents

Read the per-module reference first. Each docs/MODULE.md has: complete API signatures, canonical usage patterns, anti-patterns, edge case table, error catalogue, and integration map.

Critical things to get right:

Rule Why
Qdrant collection names: use -- not : Qdrant rejects : with 422
Qdrant point IDs: int or str(uuid.uuid4()) only Arbitrary strings → 400
schema_translate_map: use None key for default-schema tables "public" key doesn't remap schema=None tables
AuditMixin requires BaseDBModel Raises TypeError on class definition
cache.init() raises on failure Doesn't silently degrade — handle at startup
`data.DataQuery.fetch() in 0.6
queue.make_worker() returns arq.Worker Not arq.worker.WorkerSettings

Module reference: db · cache · flow · vault · storage · queue · vector · data · log

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

forktex_core-0.6.0.tar.gz (100.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

forktex_core-0.6.0-py3-none-any.whl (147.9 kB view details)

Uploaded Python 3

File details

Details for the file forktex_core-0.6.0.tar.gz.

File metadata

  • Download URL: forktex_core-0.6.0.tar.gz
  • Upload date:
  • Size: 100.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for forktex_core-0.6.0.tar.gz
Algorithm Hash digest
SHA256 4e148e37b68be1725e88f9ef7ceba89b4407058a66660b467c4d3ad109a57242
MD5 85a4f1ef100b70b6f07ffd6b8116488a
BLAKE2b-256 45265a2bc56e12b7e2f690056b552352f2eb2bf35a6c1b40854da39251ea1db5

See more details on using hashes here.

File details

Details for the file forktex_core-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: forktex_core-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 147.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for forktex_core-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ce88d256c2d802ff4f17cc9ea2557820b3c673128ed954c4e48ae005f31fc077
MD5 4707be3a682d46363c1932f3d9e20b0c
BLAKE2b-256 bfdc7495dcbe43d2f378ac4e9e701cf9b3b4eabb6b257bfdc59b9db3edb633f8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page