Shared Python substrate for ForkTex services — async Postgres, Redis, durable execution, encryption, S3/MinIO, background jobs, vector search, virtual schemas, and structured logging
Project description
forktex-core
Shared Python substrate for ForkTex services — nine independent modules, one install.
pip install forktex-core # db · cache · flow · log
pip install forktex-core[vault] # + Fernet encryption
pip install forktex-core[storage] # + S3/MinIO
pip install forktex-core[queue] # + arq background jobs
pip install forktex-core[vector] # + Qdrant vector search
pip install forktex-core[all] # everything
Modules
| Module | Purpose | Reference |
|---|---|---|
db |
Async Postgres — engine, session, ORM base classes, CRUD, advisory locks, migration runner | docs/db.md |
cache |
Async Redis — @cached, stale-while-revalidate, namespaced keys |
docs/cache.md |
flow |
Durable execution — pipelines, graphs, state machines, AI agent loops | docs/flow.md |
vault |
Encryption at rest — Vault, EncryptedJSON column type, KEK rotation |
docs/vault.md |
storage |
Object storage — S3/MinIO connector, multi-bucket, presigned URLs | docs/storage.md |
queue |
Background jobs — @task, enqueue, worker, inspect/cancel |
docs/queue.md |
vector |
Vector search — Qdrant, dense/hybrid/multimodal, cross-collection | docs/vector.md |
data |
Virtual schemas — tenant-defined entities, JSONB rows, DataQuery |
docs/data.md |
log |
Structured logging — JSON/Loki, trace_id contextvar, FastAPI middleware | docs/log.md |
Usage
log — set up first, before anything else
from forktex_core.log import setup_logging, get_logger, TraceIDMiddleware
setup_logging(service="my-service") # JSON to stdout, INFO
# setup_logging(service="my-service", debug=True) # human-readable, DEBUG
log = get_logger(__name__)
log.info("starting up")
# FastAPI: add middleware so every request gets a trace_id automatically
app.add_middleware(TraceIDMiddleware)
db — Postgres connection + ORM
from forktex_core.db import init_engine, get_session, BaseDBModel, OrgScopedMixin, AuditMixin
import sqlalchemy as sa
from sqlalchemy.orm import Mapped, mapped_column
import uuid
init_engine("postgresql+asyncpg://user:pass@host/db", pool_size=10)
class Invoice(BaseDBModel, OrgScopedMixin, AuditMixin):
__tablename__ = "invoice"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
amount: Mapped[int] = mapped_column(sa.Integer)
async with get_session() as session: # auto-commit / rollback
session.add(Invoice(org_id=org_id, amount=100))
cache — Redis
from forktex_core.cache import init, cached, async_log_context
await init("redis://localhost:6379/0")
@cached(ttl=300)
async def get_org(org_id: str) -> dict: ...
# Structured log context (also works without cache)
from forktex_core.log import async_log_context
async with async_log_context(org_id=str(org_id)):
log.info("processing") # → {..."org_id": "org-xyz"}
flow — durable workflows
from forktex_core.flow import Flow, step
flow = Flow(database_url="postgresql+asyncpg://...")
await flow.init()
@step
async def send_welcome(ctx, state): ...
@flow.pipeline("onboarding.user", version=1)
class UserOnboarding:
steps = [send_welcome, create_workspace]
instance = await flow.run("onboarding.user", state={"email": "x@y.com"})
await instance.wait(timeout=60)
vault — encryption at rest
from forktex_core.vault import Vault, EncryptedJSON
import os
vault = Vault(kek=os.environ["FTX_KEK"])
class Provider(BaseDBModel):
__tablename__ = "provider"
credentials: Mapped[bytes] = mapped_column(EncryptedJSON(vault))
provider.credentials = {"api_key": "sk-..."} # transparent encrypt/decrypt
storage — S3/MinIO
from forktex_core.storage import register, get_client
# Register once at startup (supports multiple buckets)
register("docs", url="http://minio:9000", bucket="documents",
access_key=KEY, secret_key=SECRET)
client = get_client("docs")
await client.upload("invoices/abc.pdf", pdf_bytes, content_type="application/pdf")
url = await client.presign("invoices/abc.pdf", expires_in=3600)
# Actor uploads directly to MinIO — no auth header needed, signature is in the URL
put_url = await client.presign("uploads/photo.jpg", method="put_object",
content_type="image/jpeg", expires_in=900)
queue — background jobs
from forktex_core.queue import task, init, enqueue, make_worker, JobCtx
await init("redis://localhost:6379/1")
@task(retries=2, timeout=120)
async def process_document(ctx: JobCtx, doc_id: str) -> None:
...
job_id = await enqueue(process_document, str(doc.id))
# Worker entrypoint (run separately)
WorkerSettings = make_worker("redis://localhost:6379/1")
vector — semantic search
from forktex_core.vector import Vector, VectorPoint, SearchQuery
vector = Vector(qdrant_url="http://qdrant:6333")
coll = vector.collection(f"org-{org_id}--knowledge") # use -- not : as separator
await coll.create(dim=1536)
await coll.upsert([VectorPoint(id=1, vector=embed(text), payload={"text": text})])
hits = await coll.search(SearchQuery(vector=embed(query)).limit(10).using("hybrid"))
for h in hits:
print(h.score, h.payload["text"])
data — runtime schemas
from forktex_core.data import VirtualEntity, VirtualField, EntityMode, FieldDataType
async with get_session() as session:
entity = VirtualEntity(namespace=str(org_id), slug="contacts",
label="Contacts", mode=EntityMode.VIRTUAL)
session.add(entity)
await session.flush()
session.add(VirtualField(entity_id=entity.id, namespace=str(org_id),
key="email", label="Email",
data_type=FieldDataType.TEXT, is_required=True))
FastAPI integration pattern
from contextlib import asynccontextmanager
from fastapi import FastAPI
from forktex_core.db import init_engine, close_engine
from forktex_core.cache import init as cache_init, close as cache_close
from forktex_core.log import setup_logging, TraceIDMiddleware
setup_logging(service="my-api") # call before app creation
@asynccontextmanager
async def lifespan(app: FastAPI):
init_engine(settings.db_url, pool_size=20)
await cache_init(settings.redis_url)
yield
await close_engine()
await cache_close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(TraceIDMiddleware)
Managed Postgres (no CREATE SCHEMA)
Library schemas (forktex_flow, forktex_data) are isolated from your alembic by default. If your Postgres host doesn't allow CREATE SCHEMA, route them to public:
init_engine(url, schema_translate_map={
"forktex_flow": None, # None = public schema
"forktex_data": None,
})
For AI agents
Read the per-module reference first. Each docs/MODULE.md has: complete API signatures, canonical usage patterns, anti-patterns, edge case table, error catalogue, and integration map.
Critical things to get right:
| Rule | Why |
|---|---|
Qdrant collection names: use -- not : |
Qdrant rejects : with 422 |
Qdrant point IDs: int or str(uuid.uuid4()) only |
Arbitrary strings → 400 |
schema_translate_map: use None key for default-schema tables |
"public" key doesn't remap schema=None tables |
AuditMixin requires BaseDBModel |
Raises TypeError on class definition |
cache.init() raises on failure |
Doesn't silently degrade — handle at startup |
| `data.DataQuery.fetch() in 0.6 | |
queue.make_worker() returns arq.Worker |
Not arq.worker.WorkerSettings |
Module reference: db · cache · flow · vault · storage · queue · vector · data · log
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file forktex_core-0.6.0.tar.gz.
File metadata
- Download URL: forktex_core-0.6.0.tar.gz
- Upload date:
- Size: 100.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4e148e37b68be1725e88f9ef7ceba89b4407058a66660b467c4d3ad109a57242
|
|
| MD5 |
85a4f1ef100b70b6f07ffd6b8116488a
|
|
| BLAKE2b-256 |
45265a2bc56e12b7e2f690056b552352f2eb2bf35a6c1b40854da39251ea1db5
|
File details
Details for the file forktex_core-0.6.0-py3-none-any.whl.
File metadata
- Download URL: forktex_core-0.6.0-py3-none-any.whl
- Upload date:
- Size: 147.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ce88d256c2d802ff4f17cc9ea2557820b3c673128ed954c4e48ae005f31fc077
|
|
| MD5 |
4707be3a682d46363c1932f3d9e20b0c
|
|
| BLAKE2b-256 |
bfdc7495dcbe43d2f378ac4e9e701cf9b3b4eabb6b257bfdc59b9db3edb633f8
|