Skip to main content

Python internal and external DSL for writing generative AI analytics

Project description

PyPI version Python 3.11+ License: Apache 2.0

Build and iterate on AI workflows efficiently.

TalkPipe is a Python toolkit for creating, testing, and deploying workflows that combine generative AI with your data and tools. Use Python (| pipelines) or ChatterLang (text scripts) with the same building blocks—LLMs as one tool among many.

Who it is for: engineers and researchers who want scriptable AI pipelines—RAG, batch scoring, web ingestion, agents, and automation—without giving up normal Python when you need it. First-party CLIs and apps (workbench, chatterlang_serve) help you go from experiment to batch jobs or a small HTTP surface without a bespoke framework.

When TalkPipe fits well: streaming, composable steps; sharing pipelines as text; quick vector-store and RAG flows; mixing LLM calls with pandas, files, and HTTP. When to look elsewhere: if you need a large prebuilt agent platform or GUI-first orchestration as your primary model, you may prefer another tool or layer one on top—TalkPipe stays close to code and scripts.

TalkPipe emphasizes streaming generators (memory-friendly streams), a dual API (Pipe in Python, ChatterLang as a concise DSL), and built-in tooling (script runner, reference browser, serve). That complements “everything in Python” stacks by keeping pipeline definitions portable where that helps operations and review.

Typical vertical flow: ingest → chunk/embed → index (for example LanceDB) → retrieve → prompt → answer, then expose the same script with chatterlang_serve if you want an API. See Example 5: RAG pipeline for a full walkthrough.

This README is a high-level overview. Use the documentation map below or the documentation hub for depth.

What Can You Do With TalkPipe?

  • Chat with LLMs - Create multi-turn conversations with OpenAI, Ollama, or Anthropic models in just 2 lines of code
  • Process Documents - Extract text from PDFs, analyze research papers, score content relevance
  • Build RAG Pipelines - Create end-to-end Retrieval-Augmented Generation workflows with vector databases
  • Analyze Web Content - Download web pages (respecting robots.txt), extract readable text, and summarize
  • Build Data Pipelines - Chain together data transformations, filtering, and analysis with Unix-like simplicity
  • Deploy Anywhere - Run in Jupyter notebooks, as Docker containers, or as standalone Python applications

How TalkPipe Is Organized

Three layers; use any mix of them in one project:

  • Pipe and ChatterLang — Chain sources, segments, and sinks with | in Python, or write ChatterLang scripts for the same concepts (easy to drive from env vars and CI).
  • AI and data primitives — LLMs, full-text search, and vector databases behind one style of component.
  • Pipelines and applications — Higher-level RAG-style pieces plus CLIs and web apps.

For the full story, see Architecture. Define a pipeline once and run it from Python, Jupyter, Docker, chatterlang_script, or chatterlang_serve.

Key Applications

  • chatterlang_workbench Launches an interactive web interface for writing, testing, and running ChatterLang scripts. It provides real-time execution, logging, and documentation lookup.

  • chatterlang_script Runs ChatterLang scripts from files or directly from the command line, enabling batch processing and automation.

  • chatterlang_serve Exposes ChatterLang pipelines as REST APIs or web forms, allowing you to deploy workflows as web services or user-facing endpoints.

  • makevectordatabase & serverag Create vector databases from documents and run RAG web servers in two commands—no scripts required.

  • chatterlang_reference_browser An interactive command line application for searching and browsing installed ChatterLang sources and segments.

  • chatterlang_reference_generator Generates comprehensive documentation for all available sources and segments in HTML and text formats.

  • talkpipe_plugins TalkPipe includes a plugin system that lets developers register their own sources and segments, extending its functionality. This allows the TalkPipe ecosystem to grow through community contributions and domain-specific extensions. talkpipe_plugins lets users view and manage those plugins.

These applications are entry points for different usage scenarios, from interactive development to production deployment.

Quick Start

Requirements: Python 3.11 or newer.

pip install talkpipe

For LLM support, install the provider(s) you need:

pip install talkpipe[openai]    # OpenAI
pip install talkpipe[ollama]    # Ollama
pip install talkpipe[anthropic] # Anthropic Claude
# Or: pip install talkpipe[all]

Configure API keys and provider URLs via environment variables (for example TALKPIPE_openai_api_key) or ~/.talkpipe.toml. If TalkPipe runs on a different machine than your Ollama server, set TALKPIPE_OLLAMA_SERVER_URL to that host. See Configuration for details and ChatterLang $var substitution.

Multi-turn chat in a few lines:

from talkpipe.chatterlang import compiler

script = '| print | llmPrompt[model="llama3.2", source="ollama", multi_turn=True] | print'
chat = compiler.compile(script).as_function(single_in=True, single_out=True)

response = chat("Hello! My name is Alice.")
response = chat("What's my name?")  # Will remember context

RAG at a glance

Index a list of strings, then ask questions against the store (expand with options in Example 5):

from talkpipe.chatterlang import compiler

docs = ["TalkPipe builds AI pipelines with Python or ChatterLang."]
indexer = compiler.compile(
    '| toDict[field_list="_:text"] | makeVectorDatabase[path="./my_kb", embedding_model="nomic-embed-text", embedding_source="ollama", embedding_field="text", overwrite=True]'
).as_function(single_in=False)
indexer(docs)

rag = compiler.compile(
    '| toDict[field_list="_:text"] | ragToText[path="./my_kb", embedding_model="nomic-embed-text", embedding_source="ollama", completion_model="llama3.2", completion_source="ollama", content_field="text", limit=3] | print'
).as_function(single_in=True)
rag("What is TalkPipe?")

Core Components

1. The Pipe API (Internal DSL)

TalkPipe's Pipe API is a Pythonic way to build data pipelines using the | operator to chain components:

from talkpipe.pipe import io
from talkpipe.llm import chat

# Create a pipeline that prompts for input, gets an LLM response, and prints it
pipeline = io.Prompt() | chat.LLMPrompt(model="llama3.2", source="ollama") | io.Print()
pipeline = pipeline.as_function()
pipeline()  # Run the interactive pipeline

Creating Custom Components

Add new functionality with simple decorators:

from talkpipe.pipe import core, io

@core.segment()
def uppercase(items):
    """Convert each item to uppercase"""
    for item in items:
        yield item.upper()

# Use it in a pipeline
pipeline = io.echo(data="hello,world") | uppercase() | io.Print()
result = pipeline.as_function(single_out=False)()
print(result)

# Output:
# HELLO
# WORLD
# Returns: ['HELLO', 'WORLD']

2. ChatterLang (External DSL)

ChatterLang provides a Unix-like syntax for building pipelines, perfect for rapid prototyping and experimentation:

INPUT FROM echo[data="1,2,hello,3"] | cast[cast_type="int"] | print

Registering Custom Components for ChatterLang

To make the uppercase segment from section 1 available in ChatterLang, register it with a decorator:

from talkpipe.pipe import core
from talkpipe.chatterlang import registry, compiler

@registry.register_segment("uppercase")
@core.segment()
def uppercase(items):
    """Convert each item to uppercase"""
    for item in items:
        yield item.upper()

# Now use it in ChatterLang scripts
script = 'INPUT FROM echo[data="hello,world"] | uppercase | print'
pipeline = compiler.compile(script).as_function(single_out=False)
result = pipeline()

# Output:
# HELLO
# WORLD
# Returns: ['HELLO', 'WORLD']

The @registry.register_segment() decorator makes your component discoverable by ChatterLang's compiler, allowing you to use it in scripts alongside built-in segments.

Key ChatterLang Features

  • Variables: Store intermediate results with @variable_name
  • Constants: Define reusable values with CONST name = "value"
  • Loops: Repeat operations with LOOP n TIMES { ... }
  • Multiple Pipelines: Chain workflows with ; or newlines

3. Built-in Applications

Command-Line Tools

  • chatterlang_workbench - Start the interactive web interface for experimenting with ChatterLang
  • chatterlang_script - Run ChatterLang scripts from files or command line
  • chatterlang_reference_generator - Generate documentation for all available sources and segments
  • chatterlang_reference_browser - Interactive command-line browser for sources and segments
  • chatterlang_serve - Create a customizable user-accessible web interface and REST API from ChatterLang scripts
  • talkpipe_plugins - View and manage TalkPipe plugins

Jupyter Integration

TalkPipe components work seamlessly in Jupyter notebooks for interactive data analysis.

Detailed Examples

Example 1: Multi-Agent Debate

Problem: Run two LLM personas on one seed topic for several rounds. Result: Printed turns accumulated in @conversation.

from talkpipe.chatterlang import compiler

script = """
CONST economist_prompt = "You are an economist. Reply in one sentence.";
CONST psychologist_prompt = "You are a child psychologist. Reply in one sentence.";

INPUT FROM echo[data="The US should give free puppies to all children."] 
    | @topic 
    | accum[variable=@conversation] 
    | print;

LOOP 3 TIMES {
    INPUT FROM @topic 
        | llmPrompt[system_prompt=economist_prompt, model="llama3.2", source="ollama"] 
        | @topic 
        | accum[variable=@conversation] 
        | print;
    
    INPUT FROM @topic 
        | llmPrompt[system_prompt=psychologist_prompt, model="llama3.2", source="ollama"] 
        | @topic 
        | accum[variable=@conversation] 
        | print;
};

INPUT FROM @conversation
"""

pipeline = compiler.compile(script).as_function()
debate = pipeline()  # Watch the debate unfold!

Example 2: Document Stream Evaluation

Problem: Score a stream of JSONL rows with an LLM against a fixed rubric. Result: A pandas DataFrame with extracted scores per row.

import pandas as pd
from talkpipe.chatterlang import compiler

# Sample document data
documents = [
    '{"title": "Dog", "description": "Dogs are loyal companions..."}',
    '{"title": "Cat", "description": "Cats are independent pets..."}',
    '{"title": "Wolf", "description": "Wolves are wild canines..."}'
]

script = """
CONST scorePrompt = "Rate 1-10 how related to dogs this is:";

| loadsJsonl 
| llmScore[system_prompt=scorePrompt, model="llama3.2", source="ollama", set_as="dog_relevance"] 
| setAs[field_list="dog_relevance.score:relevance_score"] 
| toDataFrame
"""

pipeline = compiler.compile(script).as_function(single_in=False, single_out=True)
df = pipeline(documents)
print(df)
# df now contains relevance scores for each document

Example 3: Web Page Analysis

Problem: Fetch a page, strip boilerplate, summarize with an LLM. Result: Model output to stdout (here, three bullet points).

from talkpipe.chatterlang import compiler

script = """
| downloadURL
| htmlToText
| llmPrompt[
    system_prompt="Summarize this article in 3 bullet points",
    model="llama3.2",
    source="ollama"
  ]
| print
"""

analyzer = compiler.compile(script).as_function(single_in=True)
analyzer("http://example.com/")

Example 4: Content Evaluation Pipeline

Problem: Score each article on two axes, keep only strong matches. Result: Printed dicts for items whose best score exceeds a threshold.

from talkpipe.chatterlang import compiler

# Sample article data
articles = [
    '{"title": "New LLM Model Released", "summary": "AI Company announces new LLM with improved reasoning"}',
    '{"title": "Smart Home IoT Devices", "summary": "Review of latest Arduino-based home automation"}',
    '{"title": "Cat Videos Go Viral", "summary": "Funny cats take over social media again"}',
    '{"title": "RAG Systems in Production", "summary": "How companies deploy retrieval-augmented generation"}',
]

script = """
# Define evaluation prompts
CONST ai_prompt = "Rate 0-10 how relevant this is to AI practitioners. Consider mentions of AI, ML, algorithms, or applications.";
CONST iot_prompt = "Rate 0-10 how relevant this is to IoT researchers. Consider hardware, sensors, or embedded systems.";

# Process articles
| loadsJsonl
| concat[fields="title,summary", set_as="full_text"]

# Score for AI relevance
| llmScore[system_prompt=ai_prompt, field="full_text", set_as="ai_eval", model="llama3.2", source="ollama"]
| setAs[field_list="ai_eval.score:ai_score,ai_eval.explanation:ai_reason"]

# Score for IoT relevance
| llmScore[system_prompt=iot_prompt, field="full_text", set_as="iot_eval", model="llama3.2", source="ollama"]
| setAs[field_list="iot_eval.score:iot_score,iot_eval.explanation:iot_reason"]

# Find highest score
| lambda[expression="max(item['ai_score'],item['iot_score'])", set_as="max_score"]

# Filter articles with score > 6
| gt[field="max_score", n=6]

# Format output
| toDict[field_list="title,ai_score,iot_score,max_score"]
| print
"""

evaluator = compiler.compile(script).as_function(single_in=False, single_out=False)
results = evaluator(articles)

# Output shows only relevant articles with their scores:
# {'title': 'New LLM Model Released', 'ai_score': 9, 'iot_score': 2, 'max_score': 9}
# {'title': 'Smart Home IoT Devices', 'ai_score': 3, 'iot_score': 9, 'max_score': 9}
# {'title': 'RAG Systems in Production', 'ai_score': 8, 'iot_score': 2, 'max_score': 8}

Example 5: RAG Pipeline with Vector Database

Problem: Embed texts into a local vector store, then answer questions with retrieval + completion. Result: String answers from ragToText, plus patterns for yes/no (ragToBinaryAnswer) and numeric scores (ragToScore).

from talkpipe.chatterlang import compiler

# Sample knowledge base documents
documents = [
    "TalkPipe is a Python toolkit for building AI workflows. It provides a Unix-like pipeline syntax for chaining data transformations and LLM operations.",
    "TalkPipe supports multiple LLM providers including OpenAI, Ollama, and Anthropic. You can switch between providers easily using configuration.",
    "With TalkPipe, you can build RAG systems, multi-agent debates, and document processing pipelines. It uses Python generators for memory-efficient streaming.",
    "TalkPipe offers two APIs: the Pipe API (internal DSL) for Python code and ChatterLang (external DSL) for concise script-based workflows.",
    "Deployment is flexible with TalkPipe - run in Jupyter notebooks, Docker containers, or as standalone applications. The chatterlang_serve tool creates web APIs from scripts."
]

# First, index your documents into a vector database
indexing_script = """
| toDict[field_list="_:text"]
| makeVectorDatabase[
    path="./my_knowledge_base",
    embedding_model="nomic-embed-text",
    embedding_source="ollama",
    embedding_field="text",
    overwrite=True
  ]
"""
indexer = compiler.compile(indexing_script).as_function(single_in=False)
indexer(documents)

# Now query the knowledge base with RAG
query_script = """
| toDict[field_list="_:text"]
| ragToText[
    path="./my_knowledge_base",
    embedding_model="nomic-embed-text",
    embedding_source="ollama",
    completion_model="llama3.2",
    completion_source="ollama",
    content_field="text",
    prompt_directive="Answer the question based on the background information provided.",
    limit=3
  ]
| print
"""

rag_pipeline = compiler.compile(query_script).as_function(single_in=True)
answer = rag_pipeline("What are the key benefits of using TalkPipe?")
# Returns an LLM-generated answer based on relevant document chunks

# For yes/no questions, use ragToBinaryAnswer:
binary_rag_script = """
| toDict[field_list="_:text"]
| ragToBinaryAnswer[
    path="./my_knowledge_base",
    embedding_model="nomic-embed-text",
    embedding_source="ollama",
    completion_model="llama3.2",
    completion_source="ollama",
    content_field="text"
]
| print
"""
binary_rag = compiler.compile(binary_rag_script).as_function(single_in=True)
result = binary_rag("Does TalkPipe support Docker?")
result = binary_rag("Does TalkPipe have a podcast about pipes?")

# For scored evaluations, use ragToScore:
score_rag_script = """
| toDict[field_list="_:text"]
| ragToScore[
    path="./my_knowledge_base",
    embedding_model="nomic-embed-text",
    embedding_source="ollama",
    completion_model="llama3.2",
    completion_source="ollama",
    prompt_directive="Answer the provided question on a scale of 1 to 5.",
    content_field="text"
  ]
| print
"""
score_rag = compiler.compile(score_rag_script).as_function(single_in=True)
score = score_rag("How flexible is talkpipe?")
score_rag("How well does this text describe pipe smoking?")

Documentation

For comprehensive documentation and examples, see the docs/ directory.

Goal Start here
Install and first pipeline Getting started
Commands and components API reference
Walkthroughs Tutorials
Design and extending TalkPipe Architecture
Contributor glossary and conventions Developer handbook

Quick Reference

Command Purpose Documentation
chatterlang_serve Create web APIs and forms 📄
chatterlang_workbench Interactive web interface 📄
chatterlang_script Run scripts from command line 📄
chatterlang_reference_generator Generate documentation 📄
chatterlang_reference_browser Browse sources/segments interactively -
talkpipe_plugins Manage TalkPipe plugins 📄

Architecture & Development

Design Principles

Dual-Language Architecture

  • Internal DSL (Pipe API): Pure Python for maximum flexibility and IDE support
  • External DSL (ChatterLang): Concise syntax for rapid prototyping

Streaming Architecture

TalkPipe uses Python generators throughout, enabling:

  • Memory-efficient processing of large datasets
  • Real-time results as data flows through pipelines
  • Natural integration with streaming data sources

Extensibility First

  • Simple decorators (@source, @segment, @field_segment) for adding functionality
  • Components are just Python functions - easy to test and debug
  • Mix TalkPipe with any Python code or library

Project Structure

talkpipe/
├── app/          # Runnable applications (servers, CLIs)
├── chatterlang/  # ChatterLang parser, compiler, and components
├── data/         # Data manipulation and I/O components
├── llm/          # LLM integrations (OpenAI, Ollama, Anthropic)
├── operations/   # Algorithms and data processing
├── pipe/         # Core pipeline infrastructure
├── pipelines/    # High-level pipeline components (RAG, vector DB)
├── search/       # Search engine integrations (Whoosh, LanceDB)
└── util/         # Utility functions and configuration

Configuration

TalkPipe uses a flexible configuration system via ~/.talkpipe.toml or environment variables:

# ~/.talkpipe.toml
default_model_name = "llama3.2"
default_model_source = "ollama"
smtp_server = "smtp.gmail.com"
smtp_port = 587

Environment variables use the TALKPIPE_ prefix:

export TALKPIPE_email_password="your-password"
export TALKPIPE_openai_api_key="sk-..."

Performance Optimization

TalkPipe includes an optional lazy loading feature that can dramatically improve startup performance (up to 18x faster) by deferring module imports until needed:

# Enable lazy loading for faster startup
export TALKPIPE_LAZY_IMPORT=true

This is especially useful for CLI tools and scripts that don't use all TalkPipe features. See the lazy loading documentation for details.

Development Guidelines

Naming Conventions

  • Classes: CamelCase (e.g., LLMPrompt)
  • Decorated functions: camelCase (e.g., @segment def extractText)
  • ChatterLang names: camelCase (e.g., llmPrompt, toDataFrame)

Creating Components

Sources generate data:

from talkpipe.pipe import core, io

@core.source()
def fibonacci(n=10):
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

# Use it in a pipeline
pipeline = fibonacci(n=5) | io.Print()
result = pipeline.as_function(single_out=False)()

# Output:
# 0
# 1
# 1
# 2
# 3
# Returns: [0, 1, 1, 2, 3]

Segments transform data:

from talkpipe.pipe.math import arange
from talkpipe.pipe import core, io

@core.segment()
def multiplyBy(items, factor=2):
    for item in items:
        yield item * factor

# Use it to double the Fibonacci numbers
pipeline = arange(lower=5, upper=10) | multiplyBy(factor=3) | io.Print()
result = pipeline.as_function(single_out=False)()

# Output:
# 0
# 3
# 3
# 6
# 9
# Returns: [0, 3, 3, 6, 9]

Field Segments provide a convenient way to create 1:1 segments:

from datetime import datetime
from talkpipe.pipe import core, io
from talkpipe.chatterlang import registry

@registry.register_segment("addTimestamp")
@core.field_segment()
def addTimestamp(item):
    # Handle a single item, not an iterable
    # The decorator handles set_as and field parameters automatically
    return datetime.now()

# Use it with dictionaries
data = [{'name': 'Alice'}, {'name': 'Bob'}]
pipeline = addTimestamp(set_as="timestamp") | io.Print()
    
result = pipeline.as_function(single_in=False, single_out=False)(data)

# Output (timestamps will vary):
# {'name': 'Alice', 'timestamp': datetime.datetime(2024, 1, 15, 10, 30, 45, 123456)}
# {'name': 'Bob', 'timestamp': datetime.datetime(2024, 1, 15, 10, 30, 45, 234567)}

# Now it's also available in ChatterLang:
# script = '| addTimestamp[set_as="timestamp"] | print'

Best Practices

  1. Units with side effects should pass data through - e.g., writeFile should yield items after writing
  2. Use descriptive parameter names with underscores (e.g., fail_on_error, set_as)
  3. Handle errors gracefully - use fail_on_error parameter pattern
  4. Document with docstrings - they appear in generated documentation
  5. Test with both APIs - ensure components work in both Python and ChatterLang

Roadmap & Contributing

TalkPipe is under active development. Current priorities:

  • Enhanced LLM Support: Additional providers, expanded guided generation
  • Data Connectors: More database integrations, API clients, file formats
  • Workflow Features: Conditional branching, enhanced error handling, retry logic
  • Performance: Parallel processing optimization, enhanced lazy loading, better caching
  • Developer Tools: Better debugging, testing utilities, IDE plugins
  • RAG & Search: Advanced retrieval strategies, hybrid search, multi-modal embeddings

We welcome contributions! Whether it's new components, bug fixes, documentation, or examples, please check our GitHub repository for contribution guidelines.

Status

TalkPipe is in active development: feature-rich and in use, but APIs may evolve. We follow semantic versioning: minor versions aim for compatibility within a major series; major bumps may include breaking changes. Reasonably stable for everyday use: install from PyPI, the | pipeline model, compiler.compile(...).as_function(...), and optional extras for LLM providers.

License

TalkPipe is licensed under the Apache License 2.0. See LICENSE file for details.

Developer handbook

Contributor-focused glossary, naming conventions, parameter semantics, standard config keys, and segment/source reference notes live in docs/contributing/developer-handbook.md.


Last reviewed: 2026-03-20

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

talkpipe-0.11.7.tar.gz (3.7 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

talkpipe-0.11.7-py3-none-any.whl (187.7 kB view details)

Uploaded Python 3

File details

Details for the file talkpipe-0.11.7.tar.gz.

File metadata

  • Download URL: talkpipe-0.11.7.tar.gz
  • Upload date:
  • Size: 3.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for talkpipe-0.11.7.tar.gz
Algorithm Hash digest
SHA256 ae2ff0215d536354b543bf612671b0da47448d6e0ab1b415d8313b7eb9d370f2
MD5 4e0f736a87eb4dc86eb9bd7f81ce0ab5
BLAKE2b-256 e9ceacb461d58556e0cbba104405360be6c266acbadb65620f7349309fecbdeb

See more details on using hashes here.

File details

Details for the file talkpipe-0.11.7-py3-none-any.whl.

File metadata

  • Download URL: talkpipe-0.11.7-py3-none-any.whl
  • Upload date:
  • Size: 187.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for talkpipe-0.11.7-py3-none-any.whl
Algorithm Hash digest
SHA256 32fcc7643d30ecf411c49cfa468ed8252703fb16c99d69d80a6f697c8e376a5d
MD5 dc0c5a041455859e545d49525932246c
BLAKE2b-256 75e55d500d596c0c6d85974f78ae34464ae441206fb4c8e63006e8cfbd32b84d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page