Real-time extraction and processing of structured blocks from text streams
Project description
Streamblocks
Real-time extraction and processing of structured blocks from text streams.
Overview
Streamblocks is a Python 3.13+ library for detecting and extracting structured blocks from streaming text. It provides:
- Pluggable syntax system - Define your own block syntaxes or use built-in ones
- Async stream processing - Process text streams line-by-line with full async support
- Type-safe metadata - Use Pydantic models for block metadata and content
- Event-driven architecture - React to block detection, updates, completion, and rejection
- Built-in syntaxes - Delimiter preamble, Markdown frontmatter, and hybrid syntaxes
Installation
Basic Installation
pip install streamblocks
With AI Provider Support
Streamblocks supports multiple AI providers through optional dependencies:
# For Google Gemini (gemini-2.5-flash)
pip install streamblocks[gemini]
# For OpenAI (gpt-5-nano-2025-08-07)
pip install streamblocks[openai]
# For Anthropic Claude (claude-3.5-haiku)
pip install streamblocks[anthropic]
# All providers at once
pip install streamblocks[all-providers]
# Multiple specific providers
pip install streamblocks[gemini,openai]
From Source
git clone https://github.com/streamblocks/streamblocks.git
cd streamblocks
pip install -e ".[all-providers]"
Quick Start
import asyncio
from streamblocks import (
BlockRegistry,
DelimiterPreambleSyntax,
StreamBlockProcessor,
EventType,
)
from streamblocks.content import FileOperationsContent, FileOperationsMetadata
async def main():
# Setup registry
registry = BlockRegistry()
# Register a syntax
syntax = DelimiterPreambleSyntax(
metadata_class=FileOperationsMetadata,
content_class=FileOperationsContent,
)
registry.register_syntax(syntax, block_types=["files_operations"])
# Create processor
processor = StreamBlockProcessor(registry)
# Process a stream
async def text_stream():
text = """
!!file01:files_operations
src/main.py:C
src/utils.py:E
!!end
"""
for line in text.strip().split("\n"):
yield line + "\n"
# Handle events
async for event in processor.process_stream(text_stream()):
if event.type == EventType.BLOCK_EXTRACTED:
block = event.metadata["extracted_block"]
print(f"Extracted block: {block.metadata.id}")
for op in block.content.operations:
print(f" - {op.action}: {op.path}")
asyncio.run(main())
Processing Modes
Streamblocks supports two processing modes to fit different use cases:
1. Automatic Stream Processing (Recommended)
Process entire streams automatically with process_stream(). This is the simplest and most common approach:
from google import genai
# Get a stream from any AI provider
client = genai.Client(api_key=api_key)
response = await client.aio.models.generate_content_stream(
model="gemini-2.5-flash",
contents="Create a Python hello world script",
)
# Pass the stream directly - no wrapper function needed!
async for event in processor.process_stream(response):
if event.type == EventType.BLOCK_EXTRACTED:
print(f"Block: {event.block.metadata.id}")
Benefits:
- Automatic adapter detection for Gemini, OpenAI, Anthropic
- Original chunks preserved in event stream
- Handles all line accumulation and buffering
- Simplest API for most use cases
2. Manual Chunk Processing
For fine-grained control, process chunks one at a time with process_chunk():
processor = StreamBlockProcessor(registry)
async for chunk in response:
# Process this chunk and get all resulting events
events = processor.process_chunk(chunk)
for event in events:
if isinstance(event, BlockExtractedEvent):
print(f"Block: {event.block.metadata.id}")
# Don't forget to finalize!
final_events = processor.finalize()
When to use manual processing:
- Custom buffering or batching strategies
- Selective processing based on chunk content
- Integration with existing async pipelines
- Processing chunks from multiple sources
- Need synchronous processing API
Important: Always call finalize() after processing all chunks to get rejection events for incomplete blocks.
See examples/adapters/13_manual_chunk_processing.py for detailed examples of manual processing patterns.
Built-in Syntaxes
1. Delimiter with Preamble
!!<id>:<type>[:param1:param2...]
content
!!end
2. Markdown with Frontmatter
```[info_string]
---
key: value
---
content
### 3. Delimiter with Frontmatter
!!start
key: value
content !!end
## Creating Custom Content Models
```python
from pydantic import BaseModel
from typing import Literal
class MyMetadata(BaseModel):
id: str
block_type: Literal["my_type"]
custom_field: str | None = None
class MyContent(BaseModel):
data: str
@classmethod
def parse(cls, raw_text: str) -> "MyContent":
# Custom parsing logic
return cls(data=raw_text.strip())
Event Types
RAW_TEXT- Non-block text passed throughBLOCK_DELTA- Partial block update (new line added)BLOCK_EXTRACTED- Complete block successfully extractedBLOCK_REJECTED- Block failed validation or stream ended
Custom Validators
def my_validator(metadata: BaseModel, content: BaseModel) -> bool:
# Custom validation logic
return True
registry.add_validator("my_type", my_validator)
Interactive Blocks
Streamblocks includes built-in support for interactive content blocks that can capture user interactions. These are useful for building conversational interfaces, forms, surveys, and other interactive applications.
Available Interactive Block Types
- YesNo - Simple yes/no questions
- Choice - Single choice from multiple options
- MultiChoice - Multiple selections from a list
- Input - Text/number/email input fields
- Scale - Numeric rating scales
- Ranking - Rank items in order
- Confirm - Confirmation dialogs
- Form - Multi-field forms
Interactive Block Example
from streamblocks.content import YesNoMetadata, YesNoContent
# Example block in your text stream:
"""
!!start
---
id: setup-question
block_type: yesno
yes_label: "Continue"
no_label: "Skip"
---
prompt: "Would you like to configure settings now?"
!!end
"""
Using Interactive Blocks
import asyncio
from streamblocks import BlockRegistry, DelimiterFrontmatterSyntax, StreamBlockProcessor
from streamblocks.content import (
YesNoMetadata, YesNoContent,
ChoiceMetadata, ChoiceContent,
# ... other interactive content types
)
# Set up registry with interactive block mapping
block_type_mapping = {
"yesno": (YesNoMetadata, YesNoContent),
"choice": (ChoiceMetadata, ChoiceContent),
# ... other mappings
}
# Custom syntax that handles block type detection
class InteractiveSyntax(DelimiterFrontmatterSyntax):
def parse_block(self, candidate):
# Parse metadata to determine block type
# Then use appropriate metadata/content classes
# See examples/interactive_blocks_example.py for full implementation
pass
Interactive UI Example
The library includes a complete example of building an interactive terminal UI using Textual:
python examples/interactive_ui_demo.py
This demonstrates:
- Dynamic widget creation based on block types
- Response capture and validation
- History tracking
- Real-time stream processing
See examples/interactive_blocks_example.py for a simpler example of parsing interactive blocks.
Development
Dependency Groups
Streamblocks uses dependency groups for development and documentation:
| Group | Purpose | Key Dependencies |
|---|---|---|
dev |
Development tools | pytest, ruff, basedpyright, detect-secrets |
doc |
Documentation building | mkdocs, mkdocs-material, mike |
Installation
Basic development setup:
uv sync --group dev
source .venv/bin/activate
lefthook install
Full development setup with extras:
uv sync --group dev --all-extras
Running Tests
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=hother.streamblocks --cov-report=html
# Run specific test file
uv run pytest tests/test_processor.py
Code Quality
# Run pre-commit hooks
uv run lefthook run pre-commit --all-files -- --no-stash
# Run type checking
uv run basedpyright src
# Run examples
uv run python examples/run_examples.py --skip-api
Conventional Commits
We use Conventional Commits:
feat: New featurefix: Bug fixdocs: Documentation changesrefactor: Code refactoringtest: Test changeschore: Maintenance tasks
Contributing
Contributions are welcome! Please ensure:
- All tests pass (
uv run pytest) - Code quality checks pass (
uv run lefthook run pre-commit --all-files -- --no-stash) - Commits follow conventional commit format
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamblocks-0.1.0.tar.gz.
File metadata
- Download URL: streamblocks-0.1.0.tar.gz
- Upload date:
- Size: 180.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6f2e659f9013be82818a5649054f2de4a04f2b0f0a89e98912bd2c9de2c10f7
|
|
| MD5 |
68bdf1e8cd08b5ca5a0181d775f41216
|
|
| BLAKE2b-256 |
75fae3142f1e9f8ba7d334c215b7b0256dc98f554725e2a3fbf294bc8936ab6c
|
Provenance
The following attestation bundles were made for streamblocks-0.1.0.tar.gz:
Publisher:
semantic-release.yml on hotherio/streamblocks
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamblocks-0.1.0.tar.gz -
Subject digest:
a6f2e659f9013be82818a5649054f2de4a04f2b0f0a89e98912bd2c9de2c10f7 - Sigstore transparency entry: 780745811
- Sigstore integration time:
-
Permalink:
hotherio/streamblocks@49e15278c1c9422f5289a8f5112c04c1d269be8e -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hotherio
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
semantic-release.yml@49e15278c1c9422f5289a8f5112c04c1d269be8e -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file streamblocks-0.1.0-py3-none-any.whl.
File metadata
- Download URL: streamblocks-0.1.0-py3-none-any.whl
- Upload date:
- Size: 185.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6eb6f3e2bebd33f78ae3f02c38a505fd34fc63d3223def241160698da74e1dae
|
|
| MD5 |
432295012c4e2baf91ff091c4320b062
|
|
| BLAKE2b-256 |
5913b8f13202523ce217c3b82f7579b2dd12e1a9455870de58e0ec5262c72510
|
Provenance
The following attestation bundles were made for streamblocks-0.1.0-py3-none-any.whl:
Publisher:
semantic-release.yml on hotherio/streamblocks
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamblocks-0.1.0-py3-none-any.whl -
Subject digest:
6eb6f3e2bebd33f78ae3f02c38a505fd34fc63d3223def241160698da74e1dae - Sigstore transparency entry: 780745812
- Sigstore integration time:
-
Permalink:
hotherio/streamblocks@49e15278c1c9422f5289a8f5112c04c1d269be8e -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hotherio
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
semantic-release.yml@49e15278c1c9422f5289a8f5112c04c1d269be8e -
Trigger Event:
workflow_dispatch
-
Statement type: