Real-time extraction and processing of structured blocks from text streams
Project description
Streamblocks
Real-time extraction and processing of structured blocks from text streams.
LLMs often embed structured data in their responses—file operations, code blocks, tool calls. Streamblocks extracts these blocks in real-time as the stream arrives, emitting events like BLOCK_START, BLOCK_CONTENT_DELTA, and BLOCK_END. Process a Gemini stream, detect !!save:file\npath/to/file.py\n!!end blocks, and react immediately—no need to wait for the full response.
Table of Contents
- Features
- Installation
- Quick Start
- More Examples
- Built-in Syntaxes
- Event Types
- Optional Extras
- Documentation
- Development
- Contributing
- License
Features
- Pluggable Syntax System: Define your own block syntaxes or use built-in ones
- Async Stream Processing: Process text streams line-by-line with full async support
- AI Provider Adapters: Automatic adapter detection for Gemini, OpenAI, Anthropic
- AG-UI Protocol Support: Bidirectional adapters for AG-UI—consume AG-UI streams and emit AG-UI events
- Type-Safe Models: Use Pydantic models for block metadata and content
- Event-Driven Architecture: React to block detection, updates, completion, and rejection
- Production Ready: Comprehensive error handling, logging, and validation
Installation
pip install streamblocks
Quick Start
import asyncio
from streamblocks import BlockRegistry, DelimiterPreambleSyntax, StreamBlockProcessor, EventType
from streamblocks.content import FileOperationsContent, FileOperationsMetadata
async def main():
registry = BlockRegistry()
syntax = DelimiterPreambleSyntax(
metadata_class=FileOperationsMetadata,
content_class=FileOperationsContent,
)
registry.register_syntax(syntax, block_types=["files_operations"])
processor = StreamBlockProcessor(registry)
async def text_stream():
yield "!!file01:files_operations\n"
yield "src/main.py:C\n"
yield "!!end\n"
async for event in processor.process_stream(text_stream()):
if event.type == EventType.BLOCK_END:
print(f"Block extracted: {event.block_id}")
asyncio.run(main())
More Examples
Processing AI Provider Streams
from google import genai
from streamblocks import StreamBlockProcessor
client = genai.Client(api_key="...")
response = await client.aio.models.generate_content_stream(
model="gemini-2.5-flash",
contents="Create a Python script",
)
# Adapter auto-detected for Gemini streams
async for event in processor.process_stream(response):
match event.type:
case EventType.BLOCK_START:
print(f"Block started: {event.block_id}")
case EventType.BLOCK_END:
print(f"Block complete: {event.block_id}")
Event Handling Pattern
async for event in processor.process_stream(stream):
match event.type:
case EventType.STREAM_STARTED:
print("Processing started")
case EventType.TEXT_DELTA:
print(event.delta, end="")
case EventType.BLOCK_START:
print(f"\n[Block {event.block_id} detected]")
case EventType.BLOCK_CONTENT_DELTA:
print(event.delta, end="")
case EventType.BLOCK_END:
print(f"\n[Block {event.block_id} complete]")
case EventType.STREAM_FINISHED:
print("\nDone")
Using Markdown Frontmatter Syntax
from streamblocks import MarkdownFrontmatterSyntax
syntax = MarkdownFrontmatterSyntax(
metadata_class=CodeBlockMetadata,
content_class=CodeBlockContent,
)
registry.register_syntax(syntax, block_types=["python", "javascript"])
# Detects blocks like:
# ```python
# ---
# name: example
# ---
# print("hello")
# ```
Built-in Syntaxes
| Syntax | Format | Use Case |
|---|---|---|
| Delimiter with Preamble | !!<id>:<type>\n...\n!!end |
Simple structured blocks |
| Markdown with Frontmatter | ```lang\n---\nkey: value\n---\n...\n``` |
Code blocks with metadata |
| Delimiter with Frontmatter | !!start\n---\nkey: value\n---\n...\n!!end |
Hybrid structured blocks |
Event Types
Lifecycle Events
STREAM_STARTED- Stream processing beganSTREAM_FINISHED- Stream processing completedSTREAM_ERROR- Stream processing failed
Text Events
TEXT_CONTENT- Complete text content outside blocksTEXT_DELTA- Incremental text chunk
Block Events
BLOCK_START- Block header detectedBLOCK_HEADER_DELTA- Block header line receivedBLOCK_METADATA_DELTA- Metadata line receivedBLOCK_CONTENT_DELTA- Content line receivedBLOCK_METADATA_END- Metadata section completeBLOCK_CONTENT_END- Content section completeBLOCK_END- Block fully extractedBLOCK_ERROR- Block extraction failed
Custom
CUSTOM- User-defined event
Optional Extras
Streamblocks provides optional extras for AI provider integrations:
| Extra | Dependencies | Purpose |
|---|---|---|
gemini |
google-genai | Google Gemini stream processing |
openai |
openai | OpenAI stream processing |
anthropic |
anthropic | Anthropic Claude stream processing |
agui |
ag-ui | AG-UI protocol adapters |
all-providers |
All above | All AI provider integrations |
# Single provider
pip install streamblocks[gemini]
# Multiple providers
pip install streamblocks[gemini,openai]
# All providers
pip install streamblocks[all-providers]
Documentation
To build and serve the documentation locally:
uv sync --group doc
source .venv/bin/activate
mkdocs serve
Development
Dependency Groups
| Group | Purpose | Key Dependencies |
|---|---|---|
dev |
Development tools | pytest, ruff, basedpyright, detect-secrets |
doc |
Documentation building | mkdocs, mkdocs-material, mike |
Installation
Basic development setup:
uv sync --group dev
source .venv/bin/activate
lefthook install
Full development setup with extras:
uv sync --group dev --all-extras
Quick Reference
Available extras: gemini, openai, anthropic, all-providers
Available groups: dev, doc
Tests
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=hother.streamblocks --cov-report=html
# Run examples
uv run python examples/run_examples.py --skip-api
Release Process
This project uses python-semantic-release for fully automated versioning and releases. Every commit to the main branch is analyzed using conventional commits, and releases are created automatically when needed.
How It Works
- Commit with conventional format to the
mainbranch - GitHub Actions automatically analyzes commits, determines version bump, creates tag, updates changelog, publishes to PyPI, and creates GitHub release
- Documentation is automatically deployed when a release is published
Version Bumping Rules
| Commit Type | Version Bump | Example |
|---|---|---|
feat: |
Minor | 0.5.0 → 0.6.0 |
fix:, perf:, refactor: |
Patch | 0.5.0 → 0.5.1 |
feat!:, BREAKING CHANGE: |
Major | 0.5.0 → 1.0.0 |
docs:, chore:, ci:, style:, test: |
No release | - |
Documentation Deployment
Documentation is automatically built and deployed when:
- A release is published (triggered by semantic-release)
- Changes are pushed to
docs/,mkdocs.yml, or the workflow file onmain
Development Practices
Branching & Pull Requests
Each git branch should have the format <tag>/item_<id> with eventually a descriptive suffix.
We use a Squash & Merge approach.
Conventional Commits
We use Conventional Commits:
feat: New featurefix: Bug fixdocs: Documentation changesrefactor: Code refactoringtest: Test changeschore: Maintenance tasks
Contributing
Contributions are welcome! Please ensure:
- All tests pass (
uv run pytest) - Code quality checks pass (
uv run lefthook run pre-commit --all-files -- --no-stash) - Commits follow conventional commit format
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamblocks-0.3.6.tar.gz.
File metadata
- Download URL: streamblocks-0.3.6.tar.gz
- Upload date:
- Size: 180.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
07528badc9a238ee4b3dbe4f36da5efe45c7658618b63c96683215754dd4e5f1
|
|
| MD5 |
3d32dad36ee748f2a599a089e83d11ce
|
|
| BLAKE2b-256 |
56c76dfb731dbc7b0ade4e13c575869b58e74088313c321c76cce7c197130c24
|
Provenance
The following attestation bundles were made for streamblocks-0.3.6.tar.gz:
Publisher:
semantic-release.yml on hotherio/streamblocks
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamblocks-0.3.6.tar.gz -
Subject digest:
07528badc9a238ee4b3dbe4f36da5efe45c7658618b63c96683215754dd4e5f1 - Sigstore transparency entry: 787360865
- Sigstore integration time:
-
Permalink:
hotherio/streamblocks@a68c6959f3eeabba2aa0724f4ff7ca2759104b3e -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hotherio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
semantic-release.yml@a68c6959f3eeabba2aa0724f4ff7ca2759104b3e -
Trigger Event:
push
-
Statement type:
File details
Details for the file streamblocks-0.3.6-py3-none-any.whl.
File metadata
- Download URL: streamblocks-0.3.6-py3-none-any.whl
- Upload date:
- Size: 185.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8d6be66b41e794556d8f14e0ff7da5d4288b8646ddb0ffaaa2bbf4c220104f2
|
|
| MD5 |
242ee7cb27fa160fdd0cce1338307d83
|
|
| BLAKE2b-256 |
9745a3427ec3a17999370c2eefb3f4803fd5c2e4a8a23711075f7aaf140544ea
|
Provenance
The following attestation bundles were made for streamblocks-0.3.6-py3-none-any.whl:
Publisher:
semantic-release.yml on hotherio/streamblocks
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamblocks-0.3.6-py3-none-any.whl -
Subject digest:
d8d6be66b41e794556d8f14e0ff7da5d4288b8646ddb0ffaaa2bbf4c220104f2 - Sigstore transparency entry: 787360867
- Sigstore integration time:
-
Permalink:
hotherio/streamblocks@a68c6959f3eeabba2aa0724f4ff7ca2759104b3e -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hotherio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
semantic-release.yml@a68c6959f3eeabba2aa0724f4ff7ca2759104b3e -
Trigger Event:
push
-
Statement type: