Python internal and external DSL for writing generative AI analytics
Project description
Build and iterate on AI workflows efficiently.
TalkPipe is a Python toolkit for creating, testing, and deploying workflows that combine generative AI with your data and tools. Use Python (| pipelines) or ChatterLang (text scripts) with the same building blocks—LLMs as one tool among many.
Who it is for: engineers and researchers who want scriptable AI pipelines—RAG, batch scoring, web ingestion, agents, and automation—without giving up normal Python when you need it. First-party CLIs and apps (workbench, chatterlang_serve) help you go from experiment to batch jobs or a small HTTP surface without a bespoke framework.
When TalkPipe fits well: streaming, composable steps; sharing pipelines as text; quick vector-store and RAG flows; mixing LLM calls with pandas, files, and HTTP. When to look elsewhere: if you need a large prebuilt agent platform or GUI-first orchestration as your primary model, you may prefer another tool or layer one on top—TalkPipe stays close to code and scripts.
TalkPipe emphasizes streaming generators (memory-friendly streams), a dual API (Pipe in Python, ChatterLang as a concise DSL), and built-in tooling (script runner, reference browser, serve). That complements “everything in Python” stacks by keeping pipeline definitions portable where that helps operations and review.
Typical vertical flow: ingest → chunk/embed → index (for example LanceDB) → retrieve → prompt → answer, then expose the same script with chatterlang_serve if you want an API. See Example 5: RAG pipeline for a full walkthrough.
This README is a high-level overview. Use the documentation map below or the documentation hub for depth.
What Can You Do With TalkPipe?
- Chat with LLMs - Create multi-turn conversations with OpenAI, Ollama, or Anthropic models in just 2 lines of code
- Process Documents - Extract text from PDFs, analyze research papers, score content relevance
- Build RAG Pipelines - Create end-to-end Retrieval-Augmented Generation workflows with vector databases
- Analyze Web Content - Download web pages (respecting robots.txt), extract readable text, and summarize
- Build Data Pipelines - Chain together data transformations, filtering, and analysis with Unix-like simplicity
- Deploy Anywhere - Run in Jupyter notebooks, as Docker containers, or as standalone Python applications
How TalkPipe Is Organized
Three layers; use any mix of them in one project:
- Pipe and ChatterLang — Chain sources, segments, and sinks with
|in Python, or write ChatterLang scripts for the same concepts (easy to drive from env vars and CI). - AI and data primitives — LLMs, full-text search, and vector databases behind one style of component.
- Pipelines and applications — Higher-level RAG-style pieces plus CLIs and web apps.
For the full story, see Architecture. Define a pipeline once and run it from Python, Jupyter, Docker, chatterlang_script, or chatterlang_serve.
Key Applications
-
chatterlang_workbench Launches an interactive web interface for writing, testing, and running ChatterLang scripts. It provides real-time execution, logging, and documentation lookup.
-
chatterlang_script Runs ChatterLang scripts from files or directly from the command line, enabling batch processing and automation.
-
chatterlang_serve Exposes ChatterLang pipelines as REST APIs or web forms, allowing you to deploy workflows as web services or user-facing endpoints.
-
makevectordatabase & serverag Create vector databases from documents and run RAG web servers in two commands—no scripts required.
-
chatterlang_reference_browser An interactive command line application for searching and browsing installed ChatterLang sources and segments.
-
chatterlang_reference_generator Generates comprehensive documentation for all available sources and segments in HTML and text formats.
-
talkpipe_plugins TalkPipe includes a plugin system that lets developers register their own sources and segments, extending its functionality. This allows the TalkPipe ecosystem to grow through community contributions and domain-specific extensions. talkpipe_plugins lets users view and manage those plugins.
These applications are entry points for different usage scenarios, from interactive development to production deployment.
Quick Start
Requirements: Python 3.11 or newer.
pip install talkpipe
For LLM support, install the provider(s) you need:
pip install talkpipe[openai] # OpenAI
pip install talkpipe[ollama] # Ollama
pip install talkpipe[anthropic] # Anthropic Claude
# Or: pip install talkpipe[all]
Configure API keys and provider URLs via environment variables (for example TALKPIPE_openai_api_key) or ~/.talkpipe.toml. If TalkPipe runs on a different machine than your Ollama server, set TALKPIPE_OLLAMA_SERVER_URL to that host. See Configuration for details and ChatterLang $var substitution.
Multi-turn chat in a few lines:
from talkpipe.chatterlang import compiler
script = '| print | llmPrompt[model="llama3.2", source="ollama", multi_turn=True] | print'
chat = compiler.compile(script).as_function(single_in=True, single_out=True)
response = chat("Hello! My name is Alice.")
response = chat("What's my name?") # Will remember context
RAG at a glance
Index a list of strings, then ask questions against the store (expand with options in Example 5):
from talkpipe.chatterlang import compiler
docs = ["TalkPipe builds AI pipelines with Python or ChatterLang."]
indexer = compiler.compile(
'| toDict[field_list="_:text"] | makeVectorDatabase[path="./my_kb", embedding_model="nomic-embed-text", embedding_source="ollama", embedding_field="text", overwrite=True]'
).as_function(single_in=False)
indexer(docs)
rag = compiler.compile(
'| toDict[field_list="_:text"] | ragToText[path="./my_kb", embedding_model="nomic-embed-text", embedding_source="ollama", completion_model="llama3.2", completion_source="ollama", content_field="text", limit=3] | print'
).as_function(single_in=True)
rag("What is TalkPipe?")
Core Components
1. The Pipe API (Internal DSL)
TalkPipe's Pipe API is a Pythonic way to build data pipelines using the | operator to chain components:
from talkpipe.pipe import io
from talkpipe.llm import chat
# Create a pipeline that prompts for input, gets an LLM response, and prints it
pipeline = io.Prompt() | chat.LLMPrompt(model="llama3.2", source="ollama") | io.Print()
pipeline = pipeline.as_function()
pipeline() # Run the interactive pipeline
Creating Custom Components
Add new functionality with simple decorators:
from talkpipe.pipe import core, io
@core.segment()
def uppercase(items):
"""Convert each item to uppercase"""
for item in items:
yield item.upper()
# Use it in a pipeline
pipeline = io.echo(data="hello,world") | uppercase() | io.Print()
result = pipeline.as_function(single_out=False)()
print(result)
# Output:
# HELLO
# WORLD
# Returns: ['HELLO', 'WORLD']
2. ChatterLang (External DSL)
ChatterLang provides a Unix-like syntax for building pipelines, perfect for rapid prototyping and experimentation:
INPUT FROM echo[data="1,2,hello,3"] | cast[cast_type="int"] | print
Registering Custom Components for ChatterLang
To make the uppercase segment from section 1 available in ChatterLang, register it with a decorator:
from talkpipe.pipe import core
from talkpipe.chatterlang import registry, compiler
@registry.register_segment("uppercase")
@core.segment()
def uppercase(items):
"""Convert each item to uppercase"""
for item in items:
yield item.upper()
# Now use it in ChatterLang scripts
script = 'INPUT FROM echo[data="hello,world"] | uppercase | print'
pipeline = compiler.compile(script).as_function(single_out=False)
result = pipeline()
# Output:
# HELLO
# WORLD
# Returns: ['HELLO', 'WORLD']
The @registry.register_segment() decorator makes your component discoverable by ChatterLang's compiler, allowing you to use it in scripts alongside built-in segments.
Key ChatterLang Features
- Variables: Store intermediate results with
@variable_name - Constants: Define reusable values with
CONST name = "value" - Loops: Repeat operations with
LOOP n TIMES { ... } - Multiple Pipelines: Chain workflows with
;or newlines
3. Built-in Applications
Command-Line Tools
chatterlang_workbench- Start the interactive web interface for experimenting with ChatterLangchatterlang_script- Run ChatterLang scripts from files or command linechatterlang_reference_generator- Generate documentation for all available sources and segmentschatterlang_reference_browser- Interactive command-line browser for sources and segmentschatterlang_serve- Create a customizable user-accessible web interface and REST API from ChatterLang scriptstalkpipe_plugins- View and manage TalkPipe plugins
Jupyter Integration
TalkPipe components work seamlessly in Jupyter notebooks for interactive data analysis.
Detailed Examples
Example 1: Multi-Agent Debate
Problem: Run two LLM personas on one seed topic for several rounds. Result: Printed turns accumulated in @conversation.
from talkpipe.chatterlang import compiler
script = """
CONST economist_prompt = "You are an economist. Reply in one sentence.";
CONST psychologist_prompt = "You are a child psychologist. Reply in one sentence.";
INPUT FROM echo[data="The US should give free puppies to all children."]
| @topic
| accum[variable=@conversation]
| print;
LOOP 3 TIMES {
INPUT FROM @topic
| llmPrompt[system_prompt=economist_prompt, model="llama3.2", source="ollama"]
| @topic
| accum[variable=@conversation]
| print;
INPUT FROM @topic
| llmPrompt[system_prompt=psychologist_prompt, model="llama3.2", source="ollama"]
| @topic
| accum[variable=@conversation]
| print;
};
INPUT FROM @conversation
"""
pipeline = compiler.compile(script).as_function()
debate = pipeline() # Watch the debate unfold!
Example 2: Document Stream Evaluation
Problem: Score a stream of JSONL rows with an LLM against a fixed rubric. Result: A pandas DataFrame with extracted scores per row.
import pandas as pd
from talkpipe.chatterlang import compiler
# Sample document data
documents = [
'{"title": "Dog", "description": "Dogs are loyal companions..."}',
'{"title": "Cat", "description": "Cats are independent pets..."}',
'{"title": "Wolf", "description": "Wolves are wild canines..."}'
]
script = """
CONST scorePrompt = "Rate 1-10 how related to dogs this is:";
| loadsJsonl
| llmScore[system_prompt=scorePrompt, model="llama3.2", source="ollama", set_as="dog_relevance"]
| setAs[field_list="dog_relevance.score:relevance_score"]
| toDataFrame
"""
pipeline = compiler.compile(script).as_function(single_in=False, single_out=True)
df = pipeline(documents)
print(df)
# df now contains relevance scores for each document
Example 3: Web Page Analysis
Problem: Fetch a page, strip boilerplate, summarize with an LLM. Result: Model output to stdout (here, three bullet points).
from talkpipe.chatterlang import compiler
script = """
| downloadURL
| htmlToText
| llmPrompt[
system_prompt="Summarize this article in 3 bullet points",
model="llama3.2",
source="ollama"
]
| print
"""
analyzer = compiler.compile(script).as_function(single_in=True)
analyzer("http://example.com/")
Example 4: Content Evaluation Pipeline
Problem: Score each article on two axes, keep only strong matches. Result: Printed dicts for items whose best score exceeds a threshold.
from talkpipe.chatterlang import compiler
# Sample article data
articles = [
'{"title": "New LLM Model Released", "summary": "AI Company announces new LLM with improved reasoning"}',
'{"title": "Smart Home IoT Devices", "summary": "Review of latest Arduino-based home automation"}',
'{"title": "Cat Videos Go Viral", "summary": "Funny cats take over social media again"}',
'{"title": "RAG Systems in Production", "summary": "How companies deploy retrieval-augmented generation"}',
]
script = """
# Define evaluation prompts
CONST ai_prompt = "Rate 0-10 how relevant this is to AI practitioners. Consider mentions of AI, ML, algorithms, or applications.";
CONST iot_prompt = "Rate 0-10 how relevant this is to IoT researchers. Consider hardware, sensors, or embedded systems.";
# Process articles
| loadsJsonl
| concat[fields="title,summary", set_as="full_text"]
# Score for AI relevance
| llmScore[system_prompt=ai_prompt, field="full_text", set_as="ai_eval", model="llama3.2", source="ollama"]
| setAs[field_list="ai_eval.score:ai_score,ai_eval.explanation:ai_reason"]
# Score for IoT relevance
| llmScore[system_prompt=iot_prompt, field="full_text", set_as="iot_eval", model="llama3.2", source="ollama"]
| setAs[field_list="iot_eval.score:iot_score,iot_eval.explanation:iot_reason"]
# Find highest score
| lambda[expression="max(item['ai_score'],item['iot_score'])", set_as="max_score"]
# Filter articles with score > 6
| gt[field="max_score", n=6]
# Format output
| toDict[field_list="title,ai_score,iot_score,max_score"]
| print
"""
evaluator = compiler.compile(script).as_function(single_in=False, single_out=False)
results = evaluator(articles)
# Output shows only relevant articles with their scores:
# {'title': 'New LLM Model Released', 'ai_score': 9, 'iot_score': 2, 'max_score': 9}
# {'title': 'Smart Home IoT Devices', 'ai_score': 3, 'iot_score': 9, 'max_score': 9}
# {'title': 'RAG Systems in Production', 'ai_score': 8, 'iot_score': 2, 'max_score': 8}
Example 5: RAG Pipeline with Vector Database
Problem: Embed texts into a local vector store, then answer questions with retrieval + completion. Result: String answers from ragToText, plus patterns for yes/no (ragToBinaryAnswer) and numeric scores (ragToScore).
from talkpipe.chatterlang import compiler
# Sample knowledge base documents
documents = [
"TalkPipe is a Python toolkit for building AI workflows. It provides a Unix-like pipeline syntax for chaining data transformations and LLM operations.",
"TalkPipe supports multiple LLM providers including OpenAI, Ollama, and Anthropic. You can switch between providers easily using configuration.",
"With TalkPipe, you can build RAG systems, multi-agent debates, and document processing pipelines. It uses Python generators for memory-efficient streaming.",
"TalkPipe offers two APIs: the Pipe API (internal DSL) for Python code and ChatterLang (external DSL) for concise script-based workflows.",
"Deployment is flexible with TalkPipe - run in Jupyter notebooks, Docker containers, or as standalone applications. The chatterlang_serve tool creates web APIs from scripts."
]
# First, index your documents into a vector database
indexing_script = """
| toDict[field_list="_:text"]
| makeVectorDatabase[
path="./my_knowledge_base",
embedding_model="nomic-embed-text",
embedding_source="ollama",
embedding_field="text",
overwrite=True
]
"""
indexer = compiler.compile(indexing_script).as_function(single_in=False)
indexer(documents)
# Now query the knowledge base with RAG
query_script = """
| toDict[field_list="_:text"]
| ragToText[
path="./my_knowledge_base",
embedding_model="nomic-embed-text",
embedding_source="ollama",
completion_model="llama3.2",
completion_source="ollama",
content_field="text",
prompt_directive="Answer the question based on the background information provided.",
limit=3
]
| print
"""
rag_pipeline = compiler.compile(query_script).as_function(single_in=True)
answer = rag_pipeline("What are the key benefits of using TalkPipe?")
# Returns an LLM-generated answer based on relevant document chunks
# For yes/no questions, use ragToBinaryAnswer:
binary_rag_script = """
| toDict[field_list="_:text"]
| ragToBinaryAnswer[
path="./my_knowledge_base",
embedding_model="nomic-embed-text",
embedding_source="ollama",
completion_model="llama3.2",
completion_source="ollama",
content_field="text"
]
| print
"""
binary_rag = compiler.compile(binary_rag_script).as_function(single_in=True)
result = binary_rag("Does TalkPipe support Docker?")
result = binary_rag("Does TalkPipe have a podcast about pipes?")
# For scored evaluations, use ragToScore:
score_rag_script = """
| toDict[field_list="_:text"]
| ragToScore[
path="./my_knowledge_base",
embedding_model="nomic-embed-text",
embedding_source="ollama",
completion_model="llama3.2",
completion_source="ollama",
prompt_directive="Answer the provided question on a scale of 1 to 5.",
content_field="text"
]
| print
"""
score_rag = compiler.compile(score_rag_script).as_function(single_in=True)
score = score_rag("How flexible is talkpipe?")
score_rag("How well does this text describe pipe smoking?")
Documentation
For comprehensive documentation and examples, see the docs/ directory.
| Goal | Start here |
|---|---|
| Install and first pipeline | Getting started |
| Commands and components | API reference |
| Walkthroughs | Tutorials |
| Design and extending TalkPipe | Architecture |
| Contributor glossary and conventions | Developer handbook |
- Documentation hub — Index and navigation
- Getting started — Installation, concepts, first pipeline
- API reference — Commands and components
- Architecture — Technical deep dives
- Tutorials — Patterns and real-world examples
Quick Reference
| Command | Purpose | Documentation |
|---|---|---|
chatterlang_serve |
Create web APIs and forms | 📄 |
chatterlang_workbench |
Interactive web interface | 📄 |
chatterlang_script |
Run scripts from command line | 📄 |
chatterlang_reference_generator |
Generate documentation | 📄 |
chatterlang_reference_browser |
Browse sources/segments interactively | - |
talkpipe_plugins |
Manage TalkPipe plugins | 📄 |
Architecture & Development
Design Principles
Dual-Language Architecture
- Internal DSL (Pipe API): Pure Python for maximum flexibility and IDE support
- External DSL (ChatterLang): Concise syntax for rapid prototyping
Streaming Architecture
TalkPipe uses Python generators throughout, enabling:
- Memory-efficient processing of large datasets
- Real-time results as data flows through pipelines
- Natural integration with streaming data sources
Extensibility First
- Simple decorators (
@source,@segment,@field_segment) for adding functionality - Components are just Python functions - easy to test and debug
- Mix TalkPipe with any Python code or library
Project Structure
talkpipe/
├── app/ # Runnable applications (servers, CLIs)
├── chatterlang/ # ChatterLang parser, compiler, and components
├── data/ # Data manipulation and I/O components
├── llm/ # LLM integrations (OpenAI, Ollama, Anthropic)
├── operations/ # Algorithms and data processing
├── pipe/ # Core pipeline infrastructure
├── pipelines/ # High-level pipeline components (RAG, vector DB)
├── search/ # Search engine integrations (Whoosh, LanceDB)
└── util/ # Utility functions and configuration
Configuration
TalkPipe uses a flexible configuration system via ~/.talkpipe.toml or environment variables:
# ~/.talkpipe.toml
default_model_name = "llama3.2"
default_model_source = "ollama"
smtp_server = "smtp.gmail.com"
smtp_port = 587
Environment variables use the TALKPIPE_ prefix:
export TALKPIPE_email_password="your-password"
export TALKPIPE_openai_api_key="sk-..."
Performance Optimization
TalkPipe includes an optional lazy loading feature that can dramatically improve startup performance (up to 18x faster) by deferring module imports until needed:
# Enable lazy loading for faster startup
export TALKPIPE_LAZY_IMPORT=true
This is especially useful for CLI tools and scripts that don't use all TalkPipe features. See the lazy loading documentation for details.
Development Guidelines
Naming Conventions
- Classes:
CamelCase(e.g.,LLMPrompt) - Decorated functions:
camelCase(e.g.,@segment def extractText) - ChatterLang names:
camelCase(e.g.,llmPrompt,toDataFrame)
Creating Components
Sources generate data:
from talkpipe.pipe import core, io
@core.source()
def fibonacci(n=10):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# Use it in a pipeline
pipeline = fibonacci(n=5) | io.Print()
result = pipeline.as_function(single_out=False)()
# Output:
# 0
# 1
# 1
# 2
# 3
# Returns: [0, 1, 1, 2, 3]
Segments transform data:
from talkpipe.pipe.math import arange
from talkpipe.pipe import core, io
@core.segment()
def multiplyBy(items, factor=2):
for item in items:
yield item * factor
# Use it to double the Fibonacci numbers
pipeline = arange(lower=5, upper=10) | multiplyBy(factor=3) | io.Print()
result = pipeline.as_function(single_out=False)()
# Output:
# 0
# 3
# 3
# 6
# 9
# Returns: [0, 3, 3, 6, 9]
Field Segments provide a convenient way to create 1:1 segments:
from datetime import datetime
from talkpipe.pipe import core, io
from talkpipe.chatterlang import registry
@registry.register_segment("addTimestamp")
@core.field_segment()
def addTimestamp(item):
# Handle a single item, not an iterable
# The decorator handles set_as and field parameters automatically
return datetime.now()
# Use it with dictionaries
data = [{'name': 'Alice'}, {'name': 'Bob'}]
pipeline = addTimestamp(set_as="timestamp") | io.Print()
result = pipeline.as_function(single_in=False, single_out=False)(data)
# Output (timestamps will vary):
# {'name': 'Alice', 'timestamp': datetime.datetime(2024, 1, 15, 10, 30, 45, 123456)}
# {'name': 'Bob', 'timestamp': datetime.datetime(2024, 1, 15, 10, 30, 45, 234567)}
# Now it's also available in ChatterLang:
# script = '| addTimestamp[set_as="timestamp"] | print'
Best Practices
- Units with side effects should pass data through - e.g.,
writeFileshould yield items after writing - Use descriptive parameter names with underscores (e.g.,
fail_on_error,set_as) - Handle errors gracefully - use
fail_on_errorparameter pattern - Document with docstrings - they appear in generated documentation
- Test with both APIs - ensure components work in both Python and ChatterLang
Roadmap & Contributing
TalkPipe is under active development. Current priorities:
- Enhanced LLM Support: Additional providers, expanded guided generation
- Data Connectors: More database integrations, API clients, file formats
- Workflow Features: Conditional branching, enhanced error handling, retry logic
- Performance: Parallel processing optimization, enhanced lazy loading, better caching
- Developer Tools: Better debugging, testing utilities, IDE plugins
- RAG & Search: Advanced retrieval strategies, hybrid search, multi-modal embeddings
We welcome contributions! Whether it's new components, bug fixes, documentation, or examples, please check our GitHub repository for contribution guidelines.
Status
TalkPipe is in active development: feature-rich and in use, but APIs may evolve. We follow semantic versioning: minor versions aim for compatibility within a major series; major bumps may include breaking changes. Reasonably stable for everyday use: install from PyPI, the | pipeline model, compiler.compile(...).as_function(...), and optional extras for LLM providers.
License
TalkPipe is licensed under the Apache License 2.0. See LICENSE file for details.
Developer handbook
Contributor-focused glossary, naming conventions, parameter semantics, standard config keys, and segment/source reference notes live in docs/contributing/developer-handbook.md.
Last reviewed: 2026-03-20
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file talkpipe-0.11.7.tar.gz.
File metadata
- Download URL: talkpipe-0.11.7.tar.gz
- Upload date:
- Size: 3.7 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ae2ff0215d536354b543bf612671b0da47448d6e0ab1b415d8313b7eb9d370f2
|
|
| MD5 |
4e0f736a87eb4dc86eb9bd7f81ce0ab5
|
|
| BLAKE2b-256 |
e9ceacb461d58556e0cbba104405360be6c266acbadb65620f7349309fecbdeb
|
File details
Details for the file talkpipe-0.11.7-py3-none-any.whl.
File metadata
- Download URL: talkpipe-0.11.7-py3-none-any.whl
- Upload date:
- Size: 187.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
32fcc7643d30ecf411c49cfa468ed8252703fb16c99d69d80a6f697c8e376a5d
|
|
| MD5 |
dc0c5a041455859e545d49525932246c
|
|
| BLAKE2b-256 |
75e55d500d596c0c6d85974f78ae34464ae441206fb4c8e63006e8cfbd32b84d
|