High-performance streaming XML parser for real-time applications
Project description
xmlstream
A high-performance, configurable streaming XML parser designed for real-time processing of XML content in streaming applications. Perfect for handling XML output from language models, API responses, and other streaming data sources.
Features
- 🚀 High Performance: Optimized for minimal latency and maximum throughput
- 🔄 Real-time Streaming: Process XML content as it arrives, not after completion
- 🎯 Configurable Behaviors: Different handling modes for different XML tags
- 📚 Stack-based Nesting: Proper XML nesting support with streaming-aware rules
- ⚡ Zero Dependencies: Pure Python implementation with no external requirements
- 🛡️ Production Ready: Comprehensive error handling and safety limits
- 🏗️ Event-driven Architecture: Clean separation of parsing and output handling
Tag Behaviors
The parser supports three distinct behaviors for XML tags:
- STREAMING: Content is streamed immediately as it arrives (blocks nesting)
- PLACEHOLDER: Shows status messages during processing (allows nesting)
- SILENT: Processes content without user feedback (allows nesting)
Installation
pip install xmlstream
Quick Start
from xmlstream import StreamingXMLParser, TagConfig, TagBehavior
# Configure tag behaviors
tag_configs = {
"reply": TagConfig(
name="reply",
behavior=TagBehavior.STREAMING, # Stream content immediately
),
"thinking": TagConfig(
name="thinking",
behavior=TagBehavior.PLACEHOLDER, # Show status message
placeholder_message="🤔 Thinking..."
),
"variables": TagConfig(
name="variables",
behavior=TagBehavior.SILENT, # Process silently
)
}
# Create parser
parser = StreamingXMLParser(tag_configs)
# Process streaming content
xml_content = "<reply>Hello, this is streaming content!</reply>"
for event in parser.process_chunk(xml_content):
if event.event_type == "tag_content" and event.tag_name == "reply":
print(event.content, end="", flush=True)
Advanced Usage
Real-time Processing with Callbacks
from xmlstream import (
StreamingXMLParser, TagConfig, TagBehavior,
StreamingOutputHandler
)
def on_reply_start(tag_name):
print(f"\n🤖 {tag_name.title()}: ", end="", flush=True)
def on_reply_complete(tag_name, content):
print(f"\n✅ {tag_name} completed ({len(content)} chars)")
# Configure with callbacks
reply_config = TagConfig(
name="reply",
behavior=TagBehavior.STREAMING,
start_callback=on_reply_start,
complete_callback=on_reply_complete
)
parser = StreamingXMLParser({"reply": reply_config})
output_handler = StreamingOutputHandler()
# Process with output handler
for event in parser.process_chunk("<reply>Streaming response...</reply>"):
output_handler.handle_event(event)
Multiple Output Handlers
from xmlstream import CollectingOutputHandler, CallbackOutputHandler
# Collecting handler for testing
collector = CollectingOutputHandler()
# Custom callback handler
def handle_content(content):
print(f"Received: {content}")
callback_handler = CallbackOutputHandler(on_content=handle_content)
# Use both handlers
for event in parser.process_chunk(xml_data):
collector.handle_event(event)
callback_handler.handle_event(event)
# Get collected content
full_content = collector.get_content()
all_events = collector.get_events()
Configuration Validation
from xmlstream import TagConfig, TagBehavior
from xmlstream.exceptions import ConfigurationError
try:
# This will raise ConfigurationError
invalid_config = TagConfig(
name="", # Empty name not allowed
behavior=TagBehavior.PLACEHOLDER,
placeholder_message=None # Required for PLACEHOLDER
)
except ConfigurationError as e:
print(f"Configuration error: {e}")
API Reference
Core Classes
StreamingXMLParser
Main parser class for processing streaming XML content.
Constructor:
StreamingXMLParser(tag_configs: Dict[str, TagConfig], max_buffer_size: int = 1024*1024)
Key Methods:
process_chunk(chunk: str) -> Generator[StreamingEvent, None, None]add_tag_config(config: TagConfig) -> Noneremove_tag_config(tag_name: str) -> boolreset() -> None
TagConfig
Configuration for XML tag behavior.
TagConfig(
name: str,
behavior: TagBehavior,
placeholder_message: Optional[str] = None,
start_callback: Optional[Callable[[str], Any]] = None,
content_callback: Optional[Callable[[str], Any]] = None,
complete_callback: Optional[Callable[[str, str], Any]] = None
)
StreamingEvent
Immutable event object representing parsing progress.
Attributes:
event_type: str- Type of event (content, tag_start, tag_content, tag_complete)tag_name: Optional[str]- Associated tag namecontent: Optional[str]- Content datadata: Optional[Any]- Additional payload
Methods:
is_content_event() -> boolis_tag_event() -> boolhas_content() -> bool
Output Handlers
StreamingOutputHandler
Default handler for streaming output with placeholder support.
CollectingOutputHandler
Collects all output for batch processing or testing.
CallbackOutputHandler
Flexible handler using custom callbacks for each event type.
Performance Optimizations
The parser includes several performance optimizations:
- Conditional Logging: Debug logging only when enabled
- Single-pass Scanning: Optimized buffer scanning algorithm
- Minimal Allocations: Efficient memory usage patterns
- Buffer Management: Smart buffering with safety limits
Error Handling
The package includes comprehensive error handling:
from xmlstream.exceptions import (
StreamingXMLError,
TagNotFoundError,
InvalidTagError,
ConfigurationError,
BufferOverflowError
)
try:
parser.process_chunk(malformed_xml)
except StreamingXMLError as e:
print(f"Parser error: {e}")
Examples
See the /examples directory for complete working examples:
- Basic Streaming: Simple real-time content processing
- LLM Integration: Integration with language model APIs
- Custom Handlers: Building custom output processors
- Error Handling: Robust error management patterns
Development
# Clone repository
git clone https://github.com/example/xmlstream.git
cd xmlstream
# Install development dependencies
pip install -e
# Run tests
pytest
# Code formatting
black xmlstream/
# Type checking
mypy xmlstream/
Performance Benchmarks
- Latency: < 1ms for typical chunks
- Throughput: > 100MB/s on modern hardware
- Memory: Constant memory usage with buffer limits
- CPU: Optimized single-pass algorithms
License
MIT License - see LICENSE file for details.
Contributing
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
Changelog
v1.0.0
- Initial release
- Core streaming parser implementation
- Multiple output handlers
- Comprehensive error handling
- Performance optimizations
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streaming_xml_parser-1.0.0.tar.gz.
File metadata
- Download URL: streaming_xml_parser-1.0.0.tar.gz
- Upload date:
- Size: 30.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f3135a97d5c240c5314b9c47f19e1c4a3a10dbbd8fce88bd99cc7073d7c96166
|
|
| MD5 |
f5a322fa3bc9ba72c9143c0ff59f59cc
|
|
| BLAKE2b-256 |
01574a7f68ede611361df18daad61ec7d3863d9f3e778f93fe32d92d7b539636
|
File details
Details for the file streaming_xml_parser-1.0.0-py3-none-any.whl.
File metadata
- Download URL: streaming_xml_parser-1.0.0-py3-none-any.whl
- Upload date:
- Size: 16.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1e6c4b288334297801504c2ddf25de25117b08cd1bbacbfc8274dc5e55d80383
|
|
| MD5 |
15bae4587f72c023b2f085a86f80963b
|
|
| BLAKE2b-256 |
ad0c2da7da365c635478016ac32ab7d1f1d6580ed74c85daf510398053c88fe7
|