Skip to main content

Composable, cache-aware batch processing pipelines for LLMs, APIs, and dataset generation.

Project description

BatchFactory

Composable, cache‑aware pipelines for parallel LLM workflows, API calls, and dataset generation.

Status — v0.2 alpha. Stable enough for prototypes; expect fast‑moving APIs.


Install

pip install batchfactory            # latest tag
pip install --upgrade batchfactory  # grab the newest patch

Quick‑start

import batchfactory as bf
from batchfactory.op import *

project = bf.CacheFolder("quickstart", 1, 0, 0)
broker  = bf.brokers.ConcurrentLLMCallBroker(project["cache/llm_broker.jsonl"])

# Rewrite the first three passages of every *.txt file into four‑line poems.

g = (
    ReadMarkdownLines("./data/*.txt", key_field="keyword", directory_str_field="directory")
    | Shuffle(42)
    | TakeFirstN(3)
    | GenerateLLMRequest(
        'Rewrite the passage from "{directory}" titled "{keyword}" as a four‑line poem.',
        model="gpt-4o-mini@openai",
    )
    | ConcurrentLLMCall(project["cache/llm_call.jsonl"], broker)
    | ExtractResponseText()
    | WriteJsonl(project["out/poems.jsonl"], output_fields=["keyword", "text", "directory"])
    | Print()
)

g.compile().execute(dispatch_brokers=True)

Run it twice – everything after the first run is served from the on‑disk ledger.


Why BatchFactory?  Three killer moves

Why BatchFactory? Three killer moves

🏭 Mass data distillation & cleanup 🎭 Multi-agent, multi-round workflows 🌲 Hierarchical spawning (ListParallel)
Chain GenerateLLMRequest → ConcurrentLLMCall → ExtractResponseText behind keyword or file sources to bulk-create, filter, or refine datasets (think millions of Q&A rows, code explanations, translation pairs) with caching and cost tracking built-in. Repeat plus chat helpers let you spin up translation swarms, code-review pairs, or tutoring agents in 5 minutes of code – conversations live in chat_history, cost and revisions are automatic. ListParallel explodes complex items into fine-grained subtasks, runs them in parallel, then auto-collects results – perfect for beat → scene → arc analysis or any long, messy document pipeline.

Loop snippet (Role‑Playing)

Teacher = Character("teacher_name", TEACHER_PROMPT)
Student = Character("student_name", STUDENT_PROMPT)

g = ( ReadMarkdownLines("story.txt", "keyword")
      | SetField({"teacher_name":"Alice", "student_name":"Bob"})
      | Teacher("老师,请先讲解课文", 0)
      | Repeat( Student("同学提问或回答", 1)
                | Teacher("回应或继续讲解", 2), 3)
      | Teacher("请总结", 3)
      | ChatHistoryToText() | Print() )

Spawn snippet (chapter → paragraph → synopsis with ListParallel)

project = bf.CacheFolder("spawn_demo", 1, 0, 0)
broker  = bf.brokers.ConcurrentLLMCallBroker(project["cache/llm.jsonl"])

def ParaSummary():
    s = GenerateLLMRequest("Summarise:\n{paragraph}", model="gpt-4o-mini@openai")
    s |= ConcurrentLLMCall(project["cache/ps.jsonl"], broker)
    s |= ExtractResponseText()
    return s

g = ( ReadMarkdownLines("novel/*.md", "chapter")         # each entry = a chapter
      | ListParallel(ParaSummary(),                      # spawn per paragraph
                     list_field="paragraphs",
                     item_field="paragraph",
                     collect_field="para_summaries")
      | GenerateLLMRequest("Chapter synopsis:\n{para_summaries}",
                           model="gpt-4o-mini@openai")
      | ConcurrentLLMCall(project["cache/ch_sum.jsonl"], broker) )

Core concepts (one‑liner view)

Term Story in one sentence
Entry Tiny record with immutable idx, mutable data, auto‑incrementing rev.
Op Atomic node; compose with `
GraphSegment A lightweight helper: .to_segment() just wraps a node so `
execute() High‑level driver that resumes, pumps, and dispatches brokers.
Broker Pluggable engine handling expensive / async jobs (LLM, search, human labels).
Ledger Append‑only JSONL cache behind every broker & graph enabling instant restart.

(You can call pump() manually, but 99 % of users stick to execute().)


Primitive index (short list)

Family Node Blurb
Sources ReadMarkdownLines, FromList ingest files or raw dicts
Transforms Apply, Filter, SetField python‑powered field tweaks
Spawn / Collect SpawnFromList, CollectAllToList map‑reduce with unique child ids
Control flow If, While, Repeat branch, loop, iterate
LLM GenerateLLMRequest → ConcurrentLLMCall → ExtractResponseText prompt, call, harvest
Utilities CleanupLLMData, PrintTotalCost tidy temporary fields, audit cost
Parallel helper ListParallel one-liner spawn → subgraph → collect
Spawn / Collect (low-level) SpawnFromList, CollectAllToList manual map-reduce with unique child ids

(Shared‑idx ops Replicate / Collect are deprecated and vanish in v0.3.)


Example gallery

✨ Example Demonstrates
01 – Basic pipeline linear LLM transform & caching
02 – Role‑playing loop concise multi‑agent RPG using Repeat + chat helpers
03 – Split & summarise fan‑out/fan‑in summarisation (deprecated style)
04 – Long‑text segmentation Spawn + CollectAll power pattern
05 – Math ops (unit) loop + conditional logic under pure Python

Broker & cache highlights

  • Each expensive call is hashed → job_idxduplicate prompts are free.
  • BrokerFailureBehavior = RETRY | STAY | EMIT lets you decide how failures propagate.
  • On restart, execute() reuses cached results and sends only the missing jobs.

Roadmap → v0.3

  • Enforce unique idx end‑to‑end → new JoinByParent replaces deprecated shared‑idx ops.
  • Built‑in vector‑store & semantic‑search nodes.
  • Streamlined cost & progress reporting.
  • More batteries‑included tutorials.

© 2025 · MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchfactory-0.2.1.tar.gz (37.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchfactory-0.2.1-py3-none-any.whl (40.1 kB view details)

Uploaded Python 3

File details

Details for the file batchfactory-0.2.1.tar.gz.

File metadata

  • Download URL: batchfactory-0.2.1.tar.gz
  • Upload date:
  • Size: 37.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.5

File hashes

Hashes for batchfactory-0.2.1.tar.gz
Algorithm Hash digest
SHA256 631bf0cfee248d5aaeddb7c9a09eea4f9836a75b3935a4b59e8cdf812de73c4d
MD5 3f049ffd23db1558f93c9681fa6261ca
BLAKE2b-256 13bc895a93c204e8dc8543c190f732c9096281341d0cd011e55d623f7b942fa3

See more details on using hashes here.

File details

Details for the file batchfactory-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: batchfactory-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 40.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.5

File hashes

Hashes for batchfactory-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 38df8416d724c3602f94fc18dafb7623874601d05e5f692e1b4628525254d0f1
MD5 51cab3aac62853c3144a033e6afb45b9
BLAKE2b-256 91d613c4e3a63108b2730aef5a35b2ddeeb062d271e006e258f0d581c9ffcd55

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page