Skip to main content

Real-time extraction and processing of structured blocks from text streams

Project description

Streamblocks

PyPI version Python Versions License: MIT Tests Coverage Docs Ruff Conventional Commits Renovate enabled

Real-time extraction and processing of structured blocks from text streams.

Overview

Streamblocks is a Python 3.13+ library for detecting and extracting structured blocks from streaming text. It provides:

  • Pluggable syntax system - Define your own block syntaxes or use built-in ones
  • Async stream processing - Process text streams line-by-line with full async support
  • Type-safe metadata - Use Pydantic models for block metadata and content
  • Event-driven architecture - React to block detection, updates, completion, and rejection
  • Built-in syntaxes - Delimiter preamble, Markdown frontmatter, and hybrid syntaxes

Installation

Basic Installation

pip install streamblocks

With AI Provider Support

Streamblocks supports multiple AI providers through optional dependencies:

# For Google Gemini (gemini-2.5-flash)
pip install streamblocks[gemini]

# For OpenAI (gpt-5-nano-2025-08-07)
pip install streamblocks[openai]

# For Anthropic Claude (claude-3.5-haiku)
pip install streamblocks[anthropic]

# All providers at once
pip install streamblocks[all-providers]

# Multiple specific providers
pip install streamblocks[gemini,openai]

From Source

git clone https://github.com/streamblocks/streamblocks.git
cd streamblocks
pip install -e ".[all-providers]"

Quick Start

import asyncio
from streamblocks import (
    BlockRegistry,
    DelimiterPreambleSyntax,
    StreamBlockProcessor,
    EventType,
)
from streamblocks.content import FileOperationsContent, FileOperationsMetadata

async def main():
    # Setup registry
    registry = BlockRegistry()

    # Register a syntax
    syntax = DelimiterPreambleSyntax(
        metadata_class=FileOperationsMetadata,
        content_class=FileOperationsContent,
    )
    registry.register_syntax(syntax, block_types=["files_operations"])

    # Create processor
    processor = StreamBlockProcessor(registry)

    # Process a stream
    async def text_stream():
        text = """
!!file01:files_operations
src/main.py:C
src/utils.py:E
!!end
"""
        for line in text.strip().split("\n"):
            yield line + "\n"

    # Handle events
    async for event in processor.process_stream(text_stream()):
        if event.type == EventType.BLOCK_EXTRACTED:
            block = event.metadata["extracted_block"]
            print(f"Extracted block: {block.metadata.id}")
            for op in block.content.operations:
                print(f"  - {op.action}: {op.path}")

asyncio.run(main())

Processing Modes

Streamblocks supports two processing modes to fit different use cases:

1. Automatic Stream Processing (Recommended)

Process entire streams automatically with process_stream(). This is the simplest and most common approach:

from google import genai

# Get a stream from any AI provider
client = genai.Client(api_key=api_key)
response = await client.aio.models.generate_content_stream(
    model="gemini-2.5-flash",
    contents="Create a Python hello world script",
)

# Pass the stream directly - no wrapper function needed!
async for event in processor.process_stream(response):
    if event.type == EventType.BLOCK_EXTRACTED:
        print(f"Block: {event.block.metadata.id}")

Benefits:

  • Automatic adapter detection for Gemini, OpenAI, Anthropic
  • Original chunks preserved in event stream
  • Handles all line accumulation and buffering
  • Simplest API for most use cases

2. Manual Chunk Processing

For fine-grained control, process chunks one at a time with process_chunk():

processor = StreamBlockProcessor(registry)

async for chunk in response:
    # Process this chunk and get all resulting events
    events = processor.process_chunk(chunk)

    for event in events:
        if isinstance(event, BlockExtractedEvent):
            print(f"Block: {event.block.metadata.id}")

# Don't forget to finalize!
final_events = processor.finalize()

When to use manual processing:

  • Custom buffering or batching strategies
  • Selective processing based on chunk content
  • Integration with existing async pipelines
  • Processing chunks from multiple sources
  • Need synchronous processing API

Important: Always call finalize() after processing all chunks to get rejection events for incomplete blocks.

See examples/adapters/13_manual_chunk_processing.py for detailed examples of manual processing patterns.

Built-in Syntaxes

1. Delimiter with Preamble

!!<id>:<type>[:param1:param2...]
content
!!end

2. Markdown with Frontmatter

```[info_string]
---
key: value
---
content

### 3. Delimiter with Frontmatter

!!start

key: value

content !!end


## Creating Custom Content Models

```python
from pydantic import BaseModel
from typing import Literal

class MyMetadata(BaseModel):
    id: str
    block_type: Literal["my_type"]
    custom_field: str | None = None

class MyContent(BaseModel):
    data: str

    @classmethod
    def parse(cls, raw_text: str) -> "MyContent":
        # Custom parsing logic
        return cls(data=raw_text.strip())

Event Types

  • RAW_TEXT - Non-block text passed through
  • BLOCK_DELTA - Partial block update (new line added)
  • BLOCK_EXTRACTED - Complete block successfully extracted
  • BLOCK_REJECTED - Block failed validation or stream ended

Custom Validators

def my_validator(metadata: BaseModel, content: BaseModel) -> bool:
    # Custom validation logic
    return True

registry.add_validator("my_type", my_validator)

Interactive Blocks

Streamblocks includes built-in support for interactive content blocks that can capture user interactions. These are useful for building conversational interfaces, forms, surveys, and other interactive applications.

Available Interactive Block Types

  1. YesNo - Simple yes/no questions
  2. Choice - Single choice from multiple options
  3. MultiChoice - Multiple selections from a list
  4. Input - Text/number/email input fields
  5. Scale - Numeric rating scales
  6. Ranking - Rank items in order
  7. Confirm - Confirmation dialogs
  8. Form - Multi-field forms

Interactive Block Example

from streamblocks.content import YesNoMetadata, YesNoContent

# Example block in your text stream:
"""
!!start
---
id: setup-question
block_type: yesno
yes_label: "Continue"
no_label: "Skip"
---
prompt: "Would you like to configure settings now?"
!!end
"""

Using Interactive Blocks

import asyncio
from streamblocks import BlockRegistry, DelimiterFrontmatterSyntax, StreamBlockProcessor
from streamblocks.content import (
    YesNoMetadata, YesNoContent,
    ChoiceMetadata, ChoiceContent,
    # ... other interactive content types
)

# Set up registry with interactive block mapping
block_type_mapping = {
    "yesno": (YesNoMetadata, YesNoContent),
    "choice": (ChoiceMetadata, ChoiceContent),
    # ... other mappings
}

# Custom syntax that handles block type detection
class InteractiveSyntax(DelimiterFrontmatterSyntax):
    def parse_block(self, candidate):
        # Parse metadata to determine block type
        # Then use appropriate metadata/content classes
        # See examples/interactive_blocks_example.py for full implementation
        pass

Interactive UI Example

The library includes a complete example of building an interactive terminal UI using Textual:

python examples/interactive_ui_demo.py

This demonstrates:

  • Dynamic widget creation based on block types
  • Response capture and validation
  • History tracking
  • Real-time stream processing

See examples/interactive_blocks_example.py for a simpler example of parsing interactive blocks.

Development

Dependency Groups

Streamblocks uses dependency groups for development and documentation:

Group Purpose Key Dependencies
dev Development tools pytest, ruff, basedpyright, detect-secrets
doc Documentation building mkdocs, mkdocs-material, mike

Installation

Basic development setup:

uv sync --group dev
source .venv/bin/activate
lefthook install

Full development setup with extras:

uv sync --group dev --all-extras

Running Tests

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=hother.streamblocks --cov-report=html

# Run specific test file
uv run pytest tests/test_processor.py

Code Quality

# Run pre-commit hooks
uv run lefthook run pre-commit --all-files -- --no-stash

# Run type checking
uv run basedpyright src

# Run examples
uv run python examples/run_examples.py --skip-api

Conventional Commits

We use Conventional Commits:

  • feat: New feature
  • fix: Bug fix
  • docs: Documentation changes
  • refactor: Code refactoring
  • test: Test changes
  • chore: Maintenance tasks

Contributing

Contributions are welcome! Please ensure:

  1. All tests pass (uv run pytest)
  2. Code quality checks pass (uv run lefthook run pre-commit --all-files -- --no-stash)
  3. Commits follow conventional commit format

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamblocks-0.3.0.tar.gz (180.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamblocks-0.3.0-py3-none-any.whl (185.6 kB view details)

Uploaded Python 3

File details

Details for the file streamblocks-0.3.0.tar.gz.

File metadata

  • Download URL: streamblocks-0.3.0.tar.gz
  • Upload date:
  • Size: 180.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamblocks-0.3.0.tar.gz
Algorithm Hash digest
SHA256 cdd6a0091608d7ce0e58c70b1da53f2a1c5c1fc902178760cbe8c2efcf9fd050
MD5 f8902ab99a53707cca3f189ff19c2124
BLAKE2b-256 fae9e139abb95cac2e0ef1a688ae30dbe12f257ceb5513937184ad4cc6fc078f

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamblocks-0.3.0.tar.gz:

Publisher: semantic-release.yml on hotherio/streamblocks

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamblocks-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: streamblocks-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 185.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamblocks-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ff46bc32d432d8da2c4498cb61dd16db45984ff6ace6a3be3fac39b3612cfd00
MD5 e8181e407cd263ee4287a9a7b628acfb
BLAKE2b-256 b1f6c154650e1892fa44656ac4afdc27e5d0470b9a1665da2e0979ff08c03f67

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamblocks-0.3.0-py3-none-any.whl:

Publisher: semantic-release.yml on hotherio/streamblocks

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page