Skip to main content

Real-time extraction and processing of structured blocks from text streams

Project description

Streamblocks

PyPI version Python Versions License: MIT Tests codecov Docs

Real-time extraction and processing of structured blocks from text streams.

LLMs often embed structured data in their responses—file operations, code blocks, tool calls. Streamblocks extracts these blocks in real-time as the stream arrives, emitting events like BLOCK_START, BLOCK_CONTENT_DELTA, and BLOCK_END. Process a Gemini stream, detect !!save:file\npath/to/file.py\n!!end blocks, and react immediately—no need to wait for the full response.

Table of Contents

Features

  • Pluggable Syntax System: Define your own block syntaxes or use built-in ones
  • Async Stream Processing: Process text streams line-by-line with full async support
  • AI Provider Adapters: Automatic adapter detection for Gemini, OpenAI, Anthropic
  • AG-UI Protocol Support: Bidirectional adapters for AG-UI—consume AG-UI streams and emit AG-UI events
  • Type-Safe Models: Use Pydantic models for block metadata and content
  • Event-Driven Architecture: React to block detection, updates, completion, and rejection
  • Production Ready: Comprehensive error handling, logging, and validation

Installation

pip install streamblocks

Quick Start

import asyncio
from streamblocks import BlockRegistry, DelimiterPreambleSyntax, StreamBlockProcessor, EventType
from streamblocks.content import FileOperationsContent, FileOperationsMetadata

async def main():
    registry = BlockRegistry()
    syntax = DelimiterPreambleSyntax(
        metadata_class=FileOperationsMetadata,
        content_class=FileOperationsContent,
    )
    registry.register_syntax(syntax, block_types=["files_operations"])
    processor = StreamBlockProcessor(registry)

    async def text_stream():
        yield "!!file01:files_operations\n"
        yield "src/main.py:C\n"
        yield "!!end\n"

    async for event in processor.process_stream(text_stream()):
        if event.type == EventType.BLOCK_END:
            print(f"Block extracted: {event.block_id}")

asyncio.run(main())

More Examples

Processing AI Provider Streams

from google import genai
from streamblocks import StreamBlockProcessor

client = genai.Client(api_key="...")
response = await client.aio.models.generate_content_stream(
    model="gemini-2.5-flash",
    contents="Create a Python script",
)

# Adapter auto-detected for Gemini streams
async for event in processor.process_stream(response):
    match event.type:
        case EventType.BLOCK_START:
            print(f"Block started: {event.block_id}")
        case EventType.BLOCK_END:
            print(f"Block complete: {event.block_id}")

Event Handling Pattern

async for event in processor.process_stream(stream):
    match event.type:
        case EventType.STREAM_STARTED:
            print("Processing started")
        case EventType.TEXT_DELTA:
            print(event.delta, end="")
        case EventType.BLOCK_START:
            print(f"\n[Block {event.block_id} detected]")
        case EventType.BLOCK_CONTENT_DELTA:
            print(event.delta, end="")
        case EventType.BLOCK_END:
            print(f"\n[Block {event.block_id} complete]")
        case EventType.STREAM_FINISHED:
            print("\nDone")

Using Markdown Frontmatter Syntax

from streamblocks import MarkdownFrontmatterSyntax

syntax = MarkdownFrontmatterSyntax(
    metadata_class=CodeBlockMetadata,
    content_class=CodeBlockContent,
)
registry.register_syntax(syntax, block_types=["python", "javascript"])

# Detects blocks like:
# ```python
# ---
# name: example
# ---
# print("hello")
# ```

Built-in Syntaxes

Syntax Format Use Case
Delimiter with Preamble !!<id>:<type>\n...\n!!end Simple structured blocks
Markdown with Frontmatter ```lang\n---\nkey: value\n---\n...\n``` Code blocks with metadata
Delimiter with Frontmatter !!start\n---\nkey: value\n---\n...\n!!end Hybrid structured blocks

Event Types

Lifecycle Events

  • STREAM_STARTED - Stream processing began
  • STREAM_FINISHED - Stream processing completed
  • STREAM_ERROR - Stream processing failed

Text Events

  • TEXT_CONTENT - Complete text content outside blocks
  • TEXT_DELTA - Incremental text chunk

Block Events

  • BLOCK_START - Block header detected
  • BLOCK_HEADER_DELTA - Block header line received
  • BLOCK_METADATA_DELTA - Metadata line received
  • BLOCK_CONTENT_DELTA - Content line received
  • BLOCK_METADATA_END - Metadata section complete
  • BLOCK_CONTENT_END - Content section complete
  • BLOCK_END - Block fully extracted
  • BLOCK_ERROR - Block extraction failed

Custom

  • CUSTOM - User-defined event

Optional Extras

Streamblocks provides optional extras for AI provider integrations:

Extra Dependencies Purpose
gemini google-genai Google Gemini stream processing
openai openai OpenAI stream processing
anthropic anthropic Anthropic Claude stream processing
agui ag-ui AG-UI protocol adapters
all-providers All above All AI provider integrations
# Single provider
pip install streamblocks[gemini]

# Multiple providers
pip install streamblocks[gemini,openai]

# All providers
pip install streamblocks[all-providers]

Documentation

To build and serve the documentation locally:

uv sync --group doc
source .venv/bin/activate
mkdocs serve

Development

Dependency Groups

Group Purpose Key Dependencies
dev Development tools pytest, ruff, basedpyright, detect-secrets
doc Documentation building mkdocs, mkdocs-material, mike

Installation

Basic development setup:

uv sync --group dev
source .venv/bin/activate
lefthook install

Full development setup with extras:

uv sync --group dev --all-extras

Quick Reference

Available extras: gemini, openai, anthropic, all-providers

Available groups: dev, doc

Tests

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=hother.streamblocks --cov-report=html

# Run examples
uv run python examples/run_examples.py --skip-api

Release Process

This project uses python-semantic-release for fully automated versioning and releases. Every commit to the main branch is analyzed using conventional commits, and releases are created automatically when needed.

How It Works

  1. Commit with conventional format to the main branch
  2. GitHub Actions automatically analyzes commits, determines version bump, creates tag, updates changelog, publishes to PyPI, and creates GitHub release
  3. Documentation is automatically deployed when a release is published

Version Bumping Rules

Commit Type Version Bump Example
feat: Minor 0.5.0 → 0.6.0
fix:, perf:, refactor: Patch 0.5.0 → 0.5.1
feat!:, BREAKING CHANGE: Major 0.5.0 → 1.0.0
docs:, chore:, ci:, style:, test: No release -

Documentation Deployment

Documentation is automatically built and deployed when:

  • A release is published (triggered by semantic-release)
  • Changes are pushed to docs/, mkdocs.yml, or the workflow file on main

Development Practices

Branching & Pull Requests

Each git branch should have the format <tag>/item_<id> with eventually a descriptive suffix.

We use a Squash & Merge approach.

Conventional Commits

We use Conventional Commits:

  • feat: New feature
  • fix: Bug fix
  • docs: Documentation changes
  • refactor: Code refactoring
  • test: Test changes
  • chore: Maintenance tasks

Contributing

Contributions are welcome! Please ensure:

  1. All tests pass (uv run pytest)
  2. Code quality checks pass (uv run lefthook run pre-commit --all-files -- --no-stash)
  3. Commits follow conventional commit format

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamblocks-0.3.5.tar.gz (180.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamblocks-0.3.5-py3-none-any.whl (185.0 kB view details)

Uploaded Python 3

File details

Details for the file streamblocks-0.3.5.tar.gz.

File metadata

  • Download URL: streamblocks-0.3.5.tar.gz
  • Upload date:
  • Size: 180.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamblocks-0.3.5.tar.gz
Algorithm Hash digest
SHA256 24e72dba6bcda46822dd6b327f13a8b1d4e96b0fcd8ad550fe65b89caf9624ae
MD5 75da6931d2c92c6d0a263baf211adcdf
BLAKE2b-256 605f5f9b40a09c10826cb388f14662c6adff127e9e348d470c940c3c6baa333e

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamblocks-0.3.5.tar.gz:

Publisher: semantic-release.yml on hotherio/streamblocks

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamblocks-0.3.5-py3-none-any.whl.

File metadata

  • Download URL: streamblocks-0.3.5-py3-none-any.whl
  • Upload date:
  • Size: 185.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamblocks-0.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 658171c54f4043ff02d233a702e510af1736590b79741f1457fff2c6bceca455
MD5 43d95a04b33e327136b424f2fc1c709a
BLAKE2b-256 15a09c9b38f4a3306babb0726f6732daf6441cbb4a88b816c4df62c1fa02fb78

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamblocks-0.3.5-py3-none-any.whl:

Publisher: semantic-release.yml on hotherio/streamblocks

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page