Composable, cache-aware batch processing pipelines for LLMs, APIs, and dataset generation.
Project description
BatchFactory
Composable, cache‑aware pipelines for parallel LLM workflows, API calls, and dataset generation.
Status —
v0.3alpha. More robust and battle-tested on small projects. Still evolving quickly — APIs may shift.
Install
pip install batchfactory # latest tag
pip install --upgrade batchfactory # grab the newest patch
Quick‑start
import batchfactory as bf
from batchfactory.op import *
project = bf.CacheFolder("quickstart", 1, 0, 0)
broker = bf.brokers.ConcurrentLLMCallBroker(project["cache/llm_broker.jsonl"])
PROMPT = """
Write a poem about {keyword}.
"""
g = bf.Graph()
g |= ReadMarkdownLines("./demo_data/greek_mythology_stories.md")
g |= Shuffle(42) | TakeFirstN(5)
g |= GenerateLLMRequest(PROMPT, model="gpt-4o-mini@openai")
g |= ConcurrentLLMCall(project["cache/llm_call.jsonl"],broker)
g |= ExtractResponseText()
g |= WriteMarkdownEntries(project["out/poems.md"])
g.execute(dispatch_brokers=True)
Run it twice – everything after the first run is served from the on‑disk ledger.
Why BatchFactory? Three killer moves
| 🏭 Mass data distillation & cleanup | 🎭 Multi-agent, multi-round workflows | 🌲 Hierarchical spawning (ListParallel) |
|---|---|---|
Chain GenerateLLMRequest → ConcurrentLLMCall → ExtractResponseText after keyword / file sources to mass-produce, filter, or polish datasets—millions of Q&A rows, code explanations, translation pairs—with built-in caching & cost tracking. |
With Repeat, If, While, and chat helpers, you can script complex role-based collaborations—e.g. Junior Translator → Senior Editor → QA → Revision—and run full multi-agent, multi-turn simulations in just a few lines of code. Ideal for workflows inspired by TransAgents, MATT, or ChatDev. |
ListParallel breaks a complex item into fine-grained subtasks, runs them concurrently, then reunites the outputs—ideal for long-text summarisation, RAG chunking, or any tree-structured pipeline. |
Spawn snippet (Text Segmentation)
g |= Apply(lambda x: split_text(label_line_numbers(x)), "text", "text_segments")
spawn_chain = AskLLM(LABEL_SEG_PROMPT, "labels", 1)
spawn_chain |= Apply(text_to_integer_list, "labels", "labels")
g | ListParallel(spawn_chain, "text_segments", "text", "labels", "labels")
g |= Apply(flatten_list, "labels", "labels")
g |= Apply(split_text_by_line_labels, ["text", "labels"], "text_segments")
g |= ExplodeList(["directory", "text_segments"], ["directory", "text"])
Loop snippet (Role‑Playing)
Teacher = Character("teacher_name", "You are a teacher named {teacher_name}. "+FORMAT_REQ)
Student = Character("student_name", "You are a student named {student_name}. "+FORMAT_REQ)
g = bf.Graph()
g |= ReadMarkdownLines("./demo_data/greek_mythology_stories.md") | TakeFirstN(1)
g |= SetField("teacher_name", "Teacher","student_name", "Student")
g |= Teacher("Please introduce the text from {directory} titled {keyword}.", 0)
loop_body = Student("Please ask questions or respond.", 1)
loop_body |= Teacher("Please respond to the student or continue explaining.", 2)
g |= Repeat(loop_body, 3)
g |= Teacher("Please summarize.", 3)
g |= ChatHistoryToText(template="**{role}**: {content}\n\n")
g |= WriteMarkdownEntries(project["out/roleplay.md"])
Core concepts (one‑liner view)
| Term | Story in one sentence | |
|---|---|---|
| Entry | Tiny record with immutable idx, mutable data, auto‑incrementing rev. |
|
| Op | Atomic node; compose with ` | orwire()`. |
| Graph | A chain of Ops wired together — supports flexible pipelines and subgraphs. |
|
| Executor | Internal engine that tracks graph state, manages batching, resumption, and broker dispatch. Created automatically when you call graph.execute(). |
|
| Broker | Pluggable engine for expensive or async jobs (LLM APIs, search, human labelers). | |
| Ledger | Append‑only JSONL backing each broker & graph — enables instant resume and transparent caching. | |
| execute() | High-level command that runs the graph: creates an Executor, resumes from cache, and dispatches brokers as needed. |
📚 Example Gallery
| ✨ Example | Demonstrates |
|---|---|
| 1_quickstart | Linear LLM transform with caching and auto‑resuming |
| 2_roleplay | Multi-agent, multi-turn roleplay using chat agents |
| 3_text_segmentation | Divide‑and‑conquer pipeline for text segmentation |
⚙️ Broker & Cache Highlights
- Every expensive call is hashed to a unique
job_idx— repeated prompts are automatically deduplicated. - Control how failures propagate with
BrokerFailureBehavior = RETRY | STAY | EMIT. - On restart,
execute()resumes from cached state and dispatches only missing or incomplete jobs — no manual checkpoints needed.
🛣️ Roadmap → v0.4
- Native vector store and semantic search nodes
- Streamlined cost tracking and progress reporting
Available Ops
| Operation | Description |
|---|---|
Apply |
Apply a function to modify the entry data, or maps between fields. |
BeginIf |
Switch to port 1 if criteria is met. See If function for usage. |
ChatHistoryToText |
Format the chat history into a single text. |
CheckPoint |
A no-op checkpoint that saves inputs to the cache, and resumes from the cache. |
CleanupLLMData |
Clean up internal fields used for LLM processing, such as llm_request, llm_response, and status. |
Collect |
Collect data from port 1, merge to 0. |
CollectAllToList |
Collect items from spawn entries on port 1 and merge them into a list (or lists if multiple items provided). |
ConcurrentLLMCall |
Dispatch concurrent LLM API calls — may induce API billing from external providers. |
EndIf |
Join entries from either port 0 or port 1. See If function for usage. |
ExplodeList |
Explode an entry to multiple entries based on a list (or lists). |
ExtractResponseMeta |
Extract metadata from the LLM response like model name and accumulated cost. |
ExtractResponseText |
Extract the text content from the LLM response and store it to entry data. |
Filter |
Filter entries based on a custom criteria function. |
FilterFailedEntries |
Drop entries that have a status "failed". |
FilterMissingFields |
Drop entries that do not have specific fields. |
FromList |
Create entries from a list of dictionaries or objects, each representing an entry. |
GenerateLLMRequest |
Generate a LLM query from a given prompt, formatting it with the entry data. |
If |
Switch to true_chain if criteria is met, otherwise stay on false_chain. |
ListParallel |
Spawn entries from a list (or lists), process them in parallel, and collect them back to a list (or lists). |
PrintEntry |
Print the first n entries information. |
PrintField |
Print the specific field(s) from the first n entries. |
PrintTotalCost |
Print the total accumulated API cost for the output batch. |
ReadJsonl |
Read JSON Lines files. |
ReadMarkdownEntries |
Read Markdown files and extract entries with markdown heading hierarchy as directory and keyword. |
ReadMarkdownLines |
Read Markdown files and extract non-empty lines as keyword with markdown heading hierarchy as directory. |
ReadTxtFolder |
Collect all txt files in a folder. |
RemoveField |
Remove fields from the entry data. |
RenameField |
Rename fields in the entry data. |
Repeat |
Repeat the loop body for a fixed number of rounds. |
RepeatNode |
Repeat the loop body for a fixed number of rounds. See Repeat function for usage. |
| Operation | Description |
|---|---|
Replicate |
Replicate an entry to all output ports. |
SetField |
Set fields in the entry data to specific values. |
Shuffle |
Shuffle the entries in a batch randomly. |
Sort |
Sort the entries in a batch |
SortMarkdownEntries |
No documentation available |
SpawnFromList |
Spawn multiple spawn entries to port 1 based on a list (or lists). |
TakeFirstN |
Takes the first N entries from the batch. discards the rest. |
ToList |
Output a list of specific field(s) from entries. |
TransformCharacterDialogueForLLM |
Map custom character roles to valid LLM roles (user/assistant/system). Must be called after GenerateLLMRequest. |
UpdateChatHistory |
Appending the LLM response to the chat history. |
While |
Executes the loop body while the criteria is met. |
WhileNode |
Executes the loop body while the criteria is met. See While function for usage. |
WriteJsonl |
Write entries to a JSON Lines file. |
WriteMarkdownEntries |
Write entries to a Markdown file, with heading hierarchy defined by directory and keyword. |
remove_cot |
Remove the chain of thought (CoT) from the LLM response. Use Apply to wrap it. |
remove_speaker_tag |
Remove speaker tags. Use Apply to wrap it. |
split_cot |
Split the LLM response into text and chain of thought (CoT). Use Apply to wrap it. |
© 2025 · MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchfactory-0.3.0.tar.gz.
File metadata
- Download URL: batchfactory-0.3.0.tar.gz
- Upload date:
- Size: 46.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ec588d0d1784be26b3c778971c3d52171e8d2bf8496d02298ca11c9256556ae9
|
|
| MD5 |
1c43803065270f23316b057543ebf824
|
|
| BLAKE2b-256 |
b2de375052bd0bbc11276b68681e0c1586539b80680c25872b1754f948cd30d6
|
File details
Details for the file batchfactory-0.3.0-py3-none-any.whl.
File metadata
- Download URL: batchfactory-0.3.0-py3-none-any.whl
- Upload date:
- Size: 47.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
22eef2b7c964f206dca50bc6fbec669f6546ddb71052e54569e6b50321cf13e7
|
|
| MD5 |
cb61c999dcf4219a9943b297102fa22b
|
|
| BLAKE2b-256 |
6896f63847ff26f3c57647538cd41ef3748240b5e9e62663f48856dbf12c1376
|