SilkLoom Core: lightweight, resilient batch pipeline for repeatable LLM and function workflows
Project description
SilkLoom Core
SilkLoom Core is a lightweight, resilient batch pipeline for repeatable workflows. It is a general-purpose execution layer for running the same process over many inputs, with retries and resumability built in.
Overview
Key capabilities:
- Node-based workflow composition (
LLMNode,FunctionNode, customBaseNode) - DAG dependencies via
add_node(..., depends_on=[...])for fan-out/fan-in workflows - Cross-input aggregation via
add_collect_node(...)to reduce item outputs inside one run - Concurrent execution
- Built-in Python orchestration with minimal dependency surface
- Retry with exponential backoff
- SQLite persistence and resumability with
run_id - Aggregated artifact retrieval via
get_run_artifacts - Built-in
tqdmprogress bar with stage prompts - Structured output with Pydantic
Design philosophy:
- Focus on repeatable execution, not intelligent scheduling
- Keep workflow logic explicit and deterministic
- Make long-running batch jobs restartable and observable
- Keep internals compact and explicit for long-term maintainability
Installation
pip install silkloom-core
Install from source:
git clone https://github.com/your-org/silkloom-core.git
cd silkloom-core
pip install -e .
Dev extras:
pip install -e ".[dev]"
Quick Start
from silkloom_core import Pipeline, LLMNode, FunctionNode
def score_text(text: str) -> dict:
score = min(len(text) / 100, 1.0)
return {"score": round(score, 3)}
pipeline = Pipeline(db_path="pipeline.db", execution_mode="depth_first", default_workers=4)
pipeline.add_node(
LLMNode(
name="summarize",
prompt_template="Summarize in one sentence: {input.text}",
model="gpt-4o-mini",
),
depends_on=[],
)
pipeline.add_node(
FunctionNode(
name="score",
func=score_text,
kwargs_mapping={"text": "{summarize.text}"},
),
depends_on=["summarize"],
)
run_id = pipeline.run([
{"text": "SilkLoom Core supports repeatable LLM batch processing."},
{"text": "It persists progress in SQLite and can resume by run_id."},
])
print(pipeline.export_results(run_id))
OpenAI-Compatible Endpoints
LLMNode supports custom OpenAI clients via:
LLMNode(..., client=your_openai_client)
So any endpoint compatible with OpenAI Chat Completions can be used.
1) Official OpenAI
from silkloom_core import LLMNode
node = LLMNode(
name="extract",
prompt_template="Extract key facts: {input.note}",
model="gpt-4o-mini",
)
export OPENAI_API_KEY="your_openai_key"
# PowerShell:
# $env:OPENAI_API_KEY="your_openai_key"
2) GLM-4-Flash (OpenAI-compatible)
import os
from openai import OpenAI
from silkloom_core import LLMNode
glm_client = OpenAI(
api_key=os.environ["ZHIPUAI_API_KEY"],
base_url="https://open.bigmodel.cn/api/paas/v4/",
)
node = LLMNode(
name="extract_geo",
prompt_template="Extract city, topic, and coordinates: {input.note}",
model="glm-4-flash",
client=glm_client,
)
export ZHIPUAI_API_KEY="your_glm_key"
# PowerShell:
# $env:ZHIPUAI_API_KEY="your_glm_key"
3) Local Ollama (OpenAI-compatible)
Start Ollama and pull a model (example):
ollama pull qwen2.5:7b
ollama serve
Use it in SilkLoom Core:
from openai import OpenAI
from silkloom_core import LLMNode
ollama_client = OpenAI(
api_key="ollama",
base_url="http://localhost:11434/v1",
)
node = LLMNode(
name="local_summary",
prompt_template="Summarize this note: {input.note}",
model="qwen2.5:7b",
client=ollama_client,
)
Note: local models vary in structured-output quality. If you use response_model, explicitly require strict JSON-only output in the prompt.
Example Scripts
The provided examples use GIS/urban research as one domain case, but SilkLoom Core itself is domain-agnostic.
python examples/quickstart.py
python examples/structured_output.py
python examples/resume_with_run_id.py
python examples/trajectory_od_commute.py
- quickstart.py: summarize notes and tag themes
- structured_output.py: extract structured attributes and build GeoJSON-like features
- resume_with_run_id.py: simulate repeatable tile processing with resume
- trajectory_od_commute.py: OD extraction + distance/time segmentation + flowline output
Core Concepts
0. Orchestration Boundary
- SilkLoom Core handles task orchestration and concurrency scheduling directly
- The public API stays compact: node API, SQLite persistence, run_id resume, and export interfaces
- Everything runs locally without external orchestrator services
When you call run(), it prints a short workflow prompt and a tqdm progress bar by default.
show_workflow_prompt=False: disable workflow structure promptshow_progress=False: disable progress barshow_stage_prompt=False: disable stage and final summary messagesprogress_callback=callable: subscribe to structured runtime events
progress_callback(event) receives dictionaries with:
event="stage": run stage updates (prepare,execute_nodes,collect,finalize)event="task_settled": per-task completion updates withnode,status,completed,totalevent="run_finished": final summary withstatus,success,failed,elapsed_seconds
1. Pipeline Modes
depth_first: per-item end-to-end progressionbreadth_first: stage-by-stage progression across items
2. Context Flow
- Initial context:
{"input": ...} - Node output storage:
context[node_name] = output_dict
3. Retry and Resume
- Automatic retries with exponential backoff
- Resume unfinished tasks by reusing the same
run_id
4. DAG Branching and Joining
- Use
add_node(node, depends_on=[...])to declare dependencies explicitly
pipeline.add_node(FunctionNode(name="extract_od", func=extract_od), depends_on=[])
pipeline.add_node(FunctionNode(name="estimate_time", func=estimate_time), depends_on=["extract_od"])
pipeline.add_node(FunctionNode(name="estimate_cost", func=estimate_cost), depends_on=["extract_od"])
pipeline.add_node(
FunctionNode(name="join_report", func=join_report),
depends_on=["estimate_time", "estimate_cost"],
)
5. Cross-Input Collect/Reduce
def merge_geojson(items, meta):
features = [item["value"]["feature"] for item in items if "feature" in item["value"]]
return {"type": "FeatureCollection", "features": features, "run_id": meta["run_id"]}
pipeline.add_collect_node(
name="merge_geojson",
func=merge_geojson,
source_node="build_feature",
)
Retrieve collect outputs:
artifacts = pipeline.get_run_artifacts(run_id)
print(artifacts["merge_geojson"])
API Summary
Pipeline.add_node(node, depends_on) -> PipelinePipeline.add_collect_node(name, func, source_node=None, include_failed=False) -> PipelinePipeline.run(inputs, run_id=None, show_workflow_prompt=True, show_progress=True, show_stage_prompt=True, progress_callback=None) -> strPipeline.export_results(run_id, format="json") -> list[dict]Pipeline.get_run_artifacts(run_id) -> dict[str, dict]Pipeline.describe_workflow() -> dict
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file silkloom_core-0.2.0.tar.gz.
File metadata
- Download URL: silkloom_core-0.2.0.tar.gz
- Upload date:
- Size: 17.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5ae1ec21d632454e537db0be34d9180f6af10e4ef48cce127b187788ae6bf107
|
|
| MD5 |
a163cdb2f3ba0b4984f32e3de8c19302
|
|
| BLAKE2b-256 |
01a839a698cb44684aa4826566b118fca74b127b9e24277a314dfdaa3748f254
|
Provenance
The following attestation bundles were made for silkloom_core-0.2.0.tar.gz:
Publisher:
publish.yml on LeLiu-GeoAI/silkloom-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
silkloom_core-0.2.0.tar.gz -
Subject digest:
5ae1ec21d632454e537db0be34d9180f6af10e4ef48cce127b187788ae6bf107 - Sigstore transparency entry: 1306545160
- Sigstore integration time:
-
Permalink:
LeLiu-GeoAI/silkloom-core@96abf7c32b74033dec82167a70c284d1c2eff0ca -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/LeLiu-GeoAI
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@96abf7c32b74033dec82167a70c284d1c2eff0ca -
Trigger Event:
push
-
Statement type:
File details
Details for the file silkloom_core-0.2.0-py3-none-any.whl.
File metadata
- Download URL: silkloom_core-0.2.0-py3-none-any.whl
- Upload date:
- Size: 15.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8606bc262a849b73caf39b249342fc0c91c5da8893974615c9ce61d9dfdb9f4a
|
|
| MD5 |
4e8fb22fbba333900cb7f9903d12070e
|
|
| BLAKE2b-256 |
ec23779229900815be60357c262c16b45d7b60620b16c5bc27b070b7ffe9b47d
|
Provenance
The following attestation bundles were made for silkloom_core-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on LeLiu-GeoAI/silkloom-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
silkloom_core-0.2.0-py3-none-any.whl -
Subject digest:
8606bc262a849b73caf39b249342fc0c91c5da8893974615c9ce61d9dfdb9f4a - Sigstore transparency entry: 1306545246
- Sigstore integration time:
-
Permalink:
LeLiu-GeoAI/silkloom-core@96abf7c32b74033dec82167a70c284d1c2eff0ca -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/LeLiu-GeoAI
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@96abf7c32b74033dec82167a70c284d1c2eff0ca -
Trigger Event:
push
-
Statement type: