Modern Python stream processing framework inspired by Apache Camel
Project description
๐ Streamware
๐ฏ One-line automation โข ๐ค AI-powered โข ๐ค Voice control โข ๐ฅ๏ธ Desktop automation
โก What Can You Do?
# ๐ค AI: Convert natural language to SQL
sq llm "Get all users older than 30" --to-sql
# ๐ค Voice: Type with your voice
sq voice-keyboard "wpisz hello world"
# ๐ฑ๏ธ AI Vision: Click anywhere by description
sq voice-click "click on the Submit button"
# ๐ง Send notifications everywhere
sq slack general "Deploy complete! โ
"
sq telegram @channel "Server status: OK"
# ๐ฌ Analyze video with AI
sq media describe_video --file presentation.mp4
# ๐ Data pipelines
sq get api.example.com/users | sq transform --json | sq file save users.json
Streamware is a modern Python framework that combines:
- Apache Camel-style data pipelines
- AI/LLM integration (OpenAI, Ollama, Groq, Anthropic...)
- Voice control and desktop automation
- Multi-channel communication (Email, Slack, Telegram, Discord...)
๐ฏ Why Streamware?
| Problem | Streamware Solution |
|---|---|
| "I need to automate repetitive tasks" | sq auto type "Hello" - one command |
| "I want AI without complex setup" | sq llm "explain this code" - works out of the box |
| "Voice control is complicated" | sq voice-keyboard - just speak |
| "Sending notifications is tedious" | sq slack #channel "message" - done |
| "ETL pipelines need too much code" | `sq get api |
โจ Features
| Category | Features |
|---|---|
| ๐ค AI/LLM | OpenAI, Ollama, Groq, Anthropic, Gemini, DeepSeek, Mistral |
| ๐ค Voice | Speech-to-text, text-to-speech, voice commands |
| ๐ฅ๏ธ Automation | Mouse, keyboard, screenshots, AI-powered clicking |
| ๐ก Communication | Email, Slack, Telegram, Discord, WhatsApp, SMS |
| ๐ Pipelines | HTTP, files, transforms, Kafka, RabbitMQ, PostgreSQL |
| ๐ฌ Media | Video analysis, image description, audio transcription |
๐ฆ Installation
# Basic install
pip install streamware
# With all features
pip install streamware[all]
# Or specific features
pip install streamware[llm,voice,automation]
๐ ๏ธ Auto-Configuration
After installation, run the setup wizard to automatically detect your environment (Ollama, API keys, voice settings, etc.):
# Full setup (LLM + voice) with mode presets
streamware --setup --mode balance # default
streamware --setup --mode eco # light models
streamware --setup --mode performance # maximum quality
# TTS-only setup (does not touch LLM/STT)
streamware --setup tts
The setup will detect available LLM providers (Ollama, OpenAI, Anthropic), configure models, and write configuration to your .env file.
๐ Diagnostics
Verify your setup with built-in diagnostic checks:
# Check camera/RTSP connectivity + Ollama
streamware --check camera "rtsp://admin:pass@192.168.1.100:554/stream"
# Check TTS engine (will speak a test message)
streamware --check tts
# Check Ollama connection and model availability
streamware --check ollama
# Run all checks
streamware --check all "rtsp://camera/live"
Example output:
๐ Streamware Diagnostics
==================================================
๐ท Camera / RTSP Check:
camera_url: rtsp://admin:pass@192.168.1.100:554/stream
ffmpeg_capture: โ
OK (45231 bytes)
๐ค Ollama / LLM Check:
ollama_url: http://localhost:11434
model: llava:7b
ollama_connection: โ
OK
model_available: โ
llava:7b found
๐ TTS / Voice Check:
tts_engine: auto
tts_test: โ
OK (using espeak)
==================================================
โ
All checks passed!
System Dependencies (optional but recommended)
# Linux/Ubuntu - for voice and automation
sudo apt-get install xdotool espeak scrot ffmpeg
# macOS
brew install xdotool espeak ffmpeg
๐ Quick Start CLI (sq)
๐ค AI/LLM Commands
# Generate text
sq llm "Write a haiku about coding"
# Convert to SQL
sq llm "Get users who signed up last week" --to-sql
# Output: SELECT * FROM users WHERE created_at >= DATE_SUB(NOW(), INTERVAL 1 WEEK)
# Analyze code
sq llm --analyze --input main.py
# Use different providers (auto-detects API keys)
sq llm "Hello" --provider openai/gpt-4o
sq llm "Hello" --provider groq/llama3-70b-8192
sq llm "Hello" --provider ollama/qwen2.5:14b
๐ค Voice Control
# Type with voice (Polish/English)
sq voice-keyboard "wpisz hello world"
sq voice-keyboard --interactive # Continuous mode
# AI-powered clicking (finds elements visually!)
sq voice-click "click on the blue Submit button"
sq voice-click "kliknij w menu File"
# Text to speech (uses TTS config from .env)
sq voice speak "Hello, I am Streamware"
๐ง Voice / TTS Configuration (.env)
The setup wizard saves audio configuration into .env so all tools (sq voice, sq live narrator, etc.) use the same settings.
Key variables:
SQ_STT_PROVIDERโgoogle,whisper_local,whisper_apiSQ_WHISPER_MODELโtiny,base,small,medium,largeSQ_TTS_ENGINEโauto,pyttsx3,espeak,say,powershellSQ_TTS_VOICEโ fragment nazwy gลosu (np.polski,English)SQ_TTS_RATEโ szybkoลฤ mowy (sลowa na minutฤ, np.150)
Example: lokalne STT Whisper + polski TTS przez pyttsx3:
SQ_STT_PROVIDER=whisper_local
SQ_WHISPER_MODEL=small
SQ_TTS_ENGINE=pyttsx3
SQ_TTS_VOICE=polski
SQ_TTS_RATE=160
Example: lekkie STT Google + systemowy TTS na Linux (espeak):
SQ_STT_PROVIDER=google
SQ_TTS_ENGINE=espeak
SQ_TTS_RATE=150
๐ฅ๏ธ Desktop Automation
# Mouse
sq auto click --x 100 --y 200
sq auto move --x 500 --y 300
# Keyboard
sq auto type --text "Hello World"
sq auto press --key enter
sq auto hotkey --keys ctrl+s
# Screenshot
sq auto screenshot /tmp/screen.png
๐ก Communication
# Slack
sq slack general "Deploy complete! ๐"
# Telegram
sq telegram @mychannel "Server status: OK"
# Email
sq email user@example.com --subject "Report" --body "See attached"
# Discord
sq discord --webhook URL --message "Alert!"
๐ฌ Media Analysis
# Describe image with AI
sq media describe_image --file photo.jpg
# Analyze video (scene tracking!)
sq media describe_video --file video.mp4
# Transcribe audio
sq media transcribe --file audio.mp3
โก Image Optimization for LLM
Streamware automatically optimizes images before sending to vision LLMs to reduce latency and API costs:
| Preset | Max Size | Quality | Colors | Use Case |
|---|---|---|---|---|
fast |
384px | 55% | 32 | Real-time monitoring, low latency |
balanced |
512px | 65% | full | Default, good quality/speed balance |
quality |
768px | 75% | full | Detailed analysis, accuracy priority |
minimal |
256px | 50% | 16+grayscale | Extreme speed, basic detection |
Configure in .env:
# Use preset
SQ_IMAGE_PRESET=fast
# Or custom settings
SQ_IMAGE_MAX_SIZE=512 # max dimension in pixels
SQ_IMAGE_QUALITY=65 # JPEG quality 1-100
SQ_IMAGE_POSTERIZE=0 # 0=off, 8-256=reduce colors
SQ_IMAGE_GRAYSCALE=false # convert to grayscale
Optimization pipeline:
- Crop to motion region โ only send changed area to LLM
- Downscale โ reduce to max 384-768px (configurable)
- JPEG compression โ quality 55-75% (minimal visual loss)
- Optional posterization โ reduce colors for faster processing
- Sharpening โ preserve edges after downscaling
๐ Logging & Reports
# Real-time logs in terminal
sq live narrator --url "rtsp://..." --mode diff --tts
# Save to file while watching
sq live narrator --url "rtsp://..." --mode diff 2>&1 | tee live.log
# Generate Markdown summary after run
sq watch --url "rtsp://..." --detect person --log md
# -> watch_log.md
sq live narrator --url "rtsp://..." --log md --file logs/live.md
# -> logs/live.md
๐ Data Pipelines
# API to file
sq get api.example.com/users | sq file save users.json
# Transform data
sq file read data.csv | sq transform --csv --json | sq file save data.json
# PostgreSQL
sq postgres "SELECT * FROM users" --json
๐ Custom Prompts
All LLM prompts are stored in streamware/prompts/*.txt and can be customized:
# List available prompts
ls streamware/prompts/
# stream_diff.txt, trigger_check.txt, motion_region.txt, ...
# Edit a prompt
nano streamware/prompts/stream_diff.txt
Override via environment:
export SQ_PROMPT_STREAM_DIFF="Your custom prompt template with {variables}..."
Available prompts:
stream_diffโ frame comparison for sq streamstream_focusโ focused object detectiontrigger_checkโ trigger condition checkingmotion_regionโ motion region analysistracking_detectโ object trackinglive_narrator_*โ live narration modes
๐ Python API
Simple Pipeline
from streamware import flow
from streamware.dsl import configure
# Configure environment (optional)
configure(SQ_MODEL="llama3", SQ_DEBUG="true")
# Basic data transformation pipeline
result = (
flow("http://api.example.com/data")
| "transform://jsonpath?query=$.items[*]"
| "file://write?path=/tmp/output.json"
).run()
Fluent DSL with Configuration
from streamware.dsl import Pipeline
# Configure and run in one chain
Pipeline() \
.configure("SQ_MODEL", "gpt-4-vision") \
.http_get("https://api.example.com/data") \
.to_json() \
.save("output.json") \
.run()
Streaming Pipeline
# Real-time video processing
for frame in (
flow("rtsp://camera/live")
| "transcode://mp4?codec=h264"
| "detect://faces"
| "annotate://bbox"
).stream():
process_frame(frame)
CurLLM Integration
# Web automation with LLM
result = (
flow("curllm://browse?url=https://example.com")
| "curllm://extract?instruction=Find all product prices under $50"
| "transform://csv"
| "file://write?path=products.csv"
).run()
๐งฉ Core Components
HTTP/REST Component
# GET request
flow("http://api.example.com/data").run()
# POST with data
flow("http://api.example.com/users?method=post").run({"name": "John"})
# GraphQL query
flow("graphql://api.example.com").run({"query": "{ users { id name } }"})
Communication Components
# Send email
flow("email://send?to=user@example.com&subject=Hello").run("Message body")
# Watch inbox
for email in flow("email-watch://interval=60").stream():
print(f"New email: {email['subject']}")
Telegram
# Send message to Telegram
flow("telegram://send?chat_id=@channel&token=BOT_TOKEN").run("Hello!")
# Telegram bot
bot = flow("telegram-bot://token=BOT_TOKEN") | "telegram-command://"
# Send WhatsApp message (via Twilio)
flow("whatsapp://send?provider=twilio&to=+1234567890").run("Hello!")
Discord
# Send to Discord channel
flow("discord://send?channel_id=123456&token=BOT_TOKEN").run("Announcement")
# Discord webhook
flow("discord://webhook?url=WEBHOOK_URL").run({"content": "Alert!"})
Slack
# Post to Slack
flow("slack://send?channel=general&token=xoxb-TOKEN").run("Team update")
# Upload file to Slack
flow("slack://upload?channel=reports").run({"file": "report.pdf"})
SMS
# Send SMS via Twilio
flow("sms://send?provider=twilio&to=+1234567890").run("Alert: System down!")
# Bulk SMS
flow("sms://bulk?numbers=+123,+456,+789").run("Broadcast message")
flow("http://api.example.com/users")
POST with data
flow("http://api.example.com/users?method=post") \
.with_data({"name": "John", "email": "john@example.com"})
File Component
# Read file
flow("file://read?path=/tmp/input.json")
# Write file
flow("file://write?path=/tmp/output.csv&mode=append")
Transform Component
# JSONPath extraction
flow("transform://jsonpath?query=$.users[?(@.age>18)]")
# Jinja2 template
flow("transform://template?file=report.j2")
# CSV conversion
flow("transform://csv?delimiter=;")
CurLLM Component
# Web scraping with LLM
flow("curllm://browse?url=https://example.com&visual=true&stealth=true") \
| "curllm://extract?instruction=Extract all email addresses" \
| "curllm://fill_form?data={'name':'John','email':'john@example.com'}"
# BQL (Browser Query Language)
flow("curllm://bql?query={page(url:'https://example.com'){title,links{text,url}}}")
๐ฅ Advanced Workflow Patterns
Split/Join Pattern
from streamware import flow, split, join
# Process items in parallel
result = (
flow("http://api.example.com/items")
| split("$.items[*]") # Split array into individual items
| "enrich://product_details" # Process each item
| join() # Collect results back
| "file://write?path=enriched.json"
).run()
Multicast Pattern
from streamware import flow, multicast
# Send to multiple destinations
flow("kafka://orders?topic=new-orders") \
| multicast([
"postgres://insert?table=orders",
"rabbitmq://publish?exchange=notifications",
"file://append?path=orders.log"
]).run()
Choice/Switch Pattern
from streamware import flow, choose
# Conditional routing
flow("http://api.example.com/events") \
| choose() \
.when("$.priority == 'high'", "kafka://high-priority") \
.when("$.priority == 'low'", "rabbitmq://low-priority") \
.otherwise("file://write?path=unknown.log") \
.run()
๐ Message Broker Integration
Kafka
# Consume from Kafka
flow("kafka://consume?topic=events&group=processor") \
| "transform://json" \
| "postgres://insert?table=events"
# Produce to Kafka
flow("file://watch?path=/tmp/uploads") \
| "transform://json" \
| "kafka://produce?topic=files&key=filename"
RabbitMQ
# Consume from RabbitMQ
flow("rabbitmq://consume?queue=tasks&auto_ack=false") \
| "process://task_handler" \
| "rabbitmq://ack"
# Publish to exchange
flow("postgres://query?sql=SELECT * FROM orders WHERE status='pending'") \
| "rabbitmq://publish?exchange=orders&routing_key=pending"
PostgreSQL
# Query and transform
flow("postgres://query?sql=SELECT * FROM users WHERE active=true") \
| "transform://jsonpath?query=$[?(@.age>25)]" \
| "kafka://produce?topic=adult-users"
# Stream changes (CDC-like)
flow("postgres://stream?table=orders&events=insert,update") \
| "transform://normalize" \
| "elasticsearch://index?index=orders"
๐ฌ Multimedia Processing
Video Streaming
# RTSP to MP4 with face detection
flow("rtsp://camera/live") \
| "transcode://mp4?codec=h264&fps=30" \
| "detect://faces?model=haar" \
| "annotate://bbox?color=green" \
| "stream://hls?segment=10"
Audio Processing
# Speech to text pipeline
flow("audio://capture?device=default") \
| "audio://denoise" \
| "stt://whisper?lang=en" \
| "transform://correct_grammar" \
| "file://append?path=transcript.txt"
๐ Diagnostics and Monitoring
Enable Debug Logging
import streamware
streamware.enable_diagnostics(level="DEBUG")
# Detailed Camel-style logging
flow("http://api.example.com/data") \
.with_diagnostics(trace=True) \
| "transform://json" \
| "file://write"
Metrics Collection
from streamware import flow, metrics
# Track pipeline metrics
with metrics.track("pipeline_name"):
flow("kafka://consume?topic=events") \
| "process://handler" \
| "postgres://insert"
# Access metrics
print(metrics.get_stats("pipeline_name"))
# {'processed': 1000, 'errors': 2, 'avg_time': 0.034}
๐ง Creating Custom Components
from streamware import Component, register
@register("mycustom")
class MyCustomComponent(Component):
input_mime = "application/json"
output_mime = "application/json"
def process(self, data):
# Synchronous processing
return transform_data(data)
async def process_async(self, data):
# Async processing
return await async_transform(data)
def stream(self, input_stream):
# Streaming processing
for item in input_stream:
yield process_item(item)
# Use your custom component
flow("http://api.example.com/data") \
| "mycustom://transform?param=value" \
| "file://write"
๐ System Protocol Handler
Install system-wide stream:// protocol:
# Install handler
streamware install-protocol
# Now you can use in terminal:
curl stream://http/get?url=https://api.example.com
# Or in browser:
stream://curllm/browse?url=https://example.com
๐งช Testing
import pytest
from streamware import flow, mock_component
def test_pipeline():
# Mock external components
with mock_component("http://api.example.com/data", returns={"items": [1, 2, 3]}):
result = (
flow("http://api.example.com/data")
| "transform://jsonpath?query=$.items"
| "transform://sum"
).run()
assert result == 6
๐ Examples
Web Scraping Pipeline
# Extract product data with CurLLM
(
flow("curllm://browse?url=https://shop.example.com&stealth=true")
| "curllm://extract?instruction=Find all products under $50"
| "transform://enrich_with_metadata"
| "postgres://upsert?table=products&key=sku"
| "kafka://produce?topic=price-updates"
).run()
Real-time Data Processing
# Process IoT sensor data
(
flow("mqtt://subscribe?topic=sensors/+/temperature")
| "transform://celsius_to_fahrenheit"
| "filter://threshold?min=32&max=100"
| "aggregate://average?window=5m"
| "influxdb://write?measurement=temperature"
).run_forever()
ETL Pipeline
# Daily ETL job
(
flow("postgres://query?sql=SELECT * FROM raw_events WHERE date=TODAY()")
| "transform://clean_data"
| "transform://validate"
| "split://batch?size=1000"
| "s3://upload?bucket=processed-events&prefix=daily/"
| "notify://slack?channel=data-team"
).schedule(cron="0 2 * * *")
๐ Component Reference
Core Components
- HTTP/REST: HTTP client, REST API, webhooks, GraphQL
- File: Read, write, watch, delete files
- Transform: JSON, CSV, JSONPath, templates, base64, regex
- CurLLM: Web automation, browsing, extraction, form filling
Communication Components
- Email: SMTP/IMAP, send, receive, watch, filter emails
- Telegram: Bot API, send messages, photos, documents, commands
- WhatsApp: Business API, Twilio, templates, media
- Discord: Bot API, webhooks, embeds, threads
- Slack: Web API, events, slash commands, file uploads
- SMS: Twilio, Vonage, Plivo, bulk messaging, verification
Message Queue Components
- Kafka: Producer, consumer, topics, partitions
- RabbitMQ: Publish, subscribe, RPC, exchanges
- Redis: Pub/sub, queues, caching
Database Components
- PostgreSQL: Query, insert, update, upsert, streaming
- MongoDB: CRUD operations, aggregation
- Elasticsearch: Search, index, aggregation
๐ก Multi-Channel Communication
Unified Messaging
# Send notification to all user's preferred channels
user_preferences = get_user_preferences(user_id)
notification = "Important: Your order has been shipped!"
flow("choose://") \
.when(f"'email' in {user_preferences}",
f"email://send?to={{user_email}}") \
.when(f"'sms' in {user_preferences}",
f"sms://send?to={{user_phone}}") \
.when(f"'telegram' in {user_preferences}",
f"telegram://send?chat_id={{telegram_id}}") \
.run(notification)
Customer Support Hub
# Centralized support system handling all channels
support_hub = (
flow("multicast://sources")
.add_source("email-watch://folder=support")
.add_source("telegram-bot://commands=/help,/support")
.add_source("whatsapp-webhook://")
.add_source("slack-events://channel=customer-support")
| "transform://normalize_message"
| "curllm://analyze?instruction=Categorize issue and suggest response"
| "postgres://insert?table=support_tickets"
| "auto_respond://template={{suggested_response}}"
)
# Run support hub
support_hub.run_forever()
Marketing Automation
# Personalized campaign across channels
campaign = (
flow("postgres://query?sql=SELECT * FROM subscribers")
| "split://parallel"
| "enrich://behavioral_data"
| "curllm://personalize?instruction=Create personalized message"
| "choose://"
.when("$.engagement_score > 80", [
"email://send?template=vip_offer",
"sms://send?priority=high"
])
.when("$.engagement_score > 50",
"email://send?template=standard_offer")
.when("$.last_interaction > '30 days'", [
"email://send?template=win_back",
"wait://days=3",
"sms://send?message=We miss you! 20% off"
])
)
Incident Response System
# Multi-tier escalation with failover
incident_response = (
flow("monitoring://alerts?severity=critical")
| "create_incident://pagerduty"
| "notify://tier1"
.add_channel("slack://send?channel=oncall")
.add_channel("sms://send?to={{oncall_primary}}")
.add_channel("telegram://send?chat_id={{oncall_chat}}")
| "wait://minutes=5"
| "check://acknowledged"
| "choose://"
.when("$.acknowledged == false", [
"notify://tier2",
"phone://call?to={{oncall_secondary}}",
"email://send?to=managers@company.com&priority=urgent"
])
| "wait://minutes=10"
| "choose://"
.when("$.acknowledged == false", [
"notify://tier3",
"sms://send?to={{cto_phone}}",
"create_conference://zoom?participants={{emergency_team}}"
])
)
๐ Documentation
- Communication Components Guide - Detailed guide for email, chat, and SMS
- API Reference - Complete API documentation
- Examples - Full example implementations
- Advanced Examples - Production-ready communication patterns
| Component | URI Pattern | Description |
|---|---|---|
| HTTP | http://host/path |
HTTP requests |
| File | file://operation?path=... |
File operations |
| Transform | transform://type?params |
Data transformation |
| CurLLM | curllm://action?params |
Web automation with LLM |
| Kafka | kafka://operation?params |
Kafka integration |
| RabbitMQ | rabbitmq://operation?params |
RabbitMQ integration |
| PostgreSQL | postgres://operation?params |
PostgreSQL operations |
| Split | split://pattern |
Split data into parts |
| Join | join://strategy |
Join split data |
| Multicast | multicast:// |
Send to multiple destinations |
| Choose | choose:// |
Conditional routing |
| Filter | filter://condition |
Filter data |
| Aggregate | aggregate://function |
Aggregate over window |
๐ค Contributing
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
# Development setup
git clone https://github.com/softreck/streamware.git
cd streamware
pip install -e ".[dev]"
pytest
๐ License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
๐ Acknowledgments
- Apache Camel for inspiration
- CurLLM for web automation capabilities
- The Python streaming community
๐ Support
- ๐ง Email: info@softreck.com
- ๐ Issues: GitHub Issues
- ๐ฌ Discussions: GitHub Discussions
Built with โค๏ธ by Softreck
โญ Star us on GitHub!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamware-0.2.21.tar.gz.
File metadata
- Download URL: streamware-0.2.21.tar.gz
- Upload date:
- Size: 283.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
82457de4bd07c22eef4d397c34711b54c0eca7699a0db4f1580de078be03eb6c
|
|
| MD5 |
d9c1705400fb1d4fbd9e2153c9e29da2
|
|
| BLAKE2b-256 |
e58ece60bb6b93d7321ddce09542d69dfbd182523aa6683afb89307f2a3b1dad
|
File details
Details for the file streamware-0.2.21-py3-none-any.whl.
File metadata
- Download URL: streamware-0.2.21-py3-none-any.whl
- Upload date:
- Size: 243.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8969d4eede801aea3c155b7bcc6c95c7837e554753899eddae24d859918f538b
|
|
| MD5 |
7a4a0f85025cdcf6c50c9b0ccd5d7289
|
|
| BLAKE2b-256 |
dce4db5fcdcb0baafc7fa7ee8bfa72b3949cae34d161ed28aa2841d50c442fd9
|