Skip to main content

Modern Python stream processing framework inspired by Apache Camel

Project description

sq live narrator --url rtsp://192.168.1.1:554/h264Preview_01_main --tts --duration 20 --yaml --frames changed --focus person --tts --file report.html --model llava:13b --motion people

🚀 Streamware

PyPI Downloads Stars License Python

🎯 One-line automation • 🤖 AI-powered • 🎤 Voice control • 🖥️ Desktop automation


⚡ What Can You Do?

# 🤖 AI: Convert natural language to SQL
sq llm "Get all users older than 30" --to-sql

# 🎤 Voice: Type with your voice  
sq voice-keyboard "wpisz hello world"

# 🖱️ AI Vision: Click anywhere by description
sq voice-click "click on the Submit button"

# 📧 Send notifications everywhere
sq slack general "Deploy complete! ✅"
sq telegram @channel "Server status: OK"

# 🎬 Analyze video with AI
sq media describe_video --file presentation.mp4

# 🔄 Data pipelines
sq get api.example.com/users | sq transform --json | sq file save users.json

Streamware is a modern Python framework that combines:

  • Apache Camel-style data pipelines
  • AI/LLM integration (OpenAI, Ollama, Groq, Anthropic...)
  • Voice control and desktop automation
  • Multi-channel communication (Email, Slack, Telegram, Discord...)

🎯 Why Streamware?

Problem Streamware Solution
"I need to automate repetitive tasks" sq auto type "Hello" - one command
"I want AI without complex setup" sq llm "explain this code" - works out of the box
"Voice control is complicated" sq voice-keyboard - just speak
"Sending notifications is tedious" sq slack #channel "message" - done
"ETL pipelines need too much code" `sq get api

✨ Features

Category Features
🤖 AI/LLM OpenAI, Ollama, Groq, Anthropic, Gemini, DeepSeek, Mistral
🎤 Voice Speech-to-text, text-to-speech, voice commands
🖥️ Automation Mouse, keyboard, screenshots, AI-powered clicking
📡 Communication Email, Slack, Telegram, Discord, WhatsApp, SMS
🔄 Pipelines HTTP, files, transforms, Kafka, RabbitMQ, PostgreSQL
🎬 Media Video analysis, image description, audio transcription

📦 Installation

# Basic install
pip install streamware

# With all features
pip install streamware[all]

# Or specific features
pip install streamware[llm,voice,automation]

🛠️ Auto-Configuration

After installation, run the setup wizard to automatically detect your environment (Ollama, API keys, etc.):

streamware --setup

This will detect available LLM providers (Ollama, OpenAI, Anthropic) and configure the best available models for you.

System Dependencies (optional but recommended)

# Linux/Ubuntu - for voice and automation
sudo apt-get install xdotool espeak scrot ffmpeg

# macOS
brew install xdotool espeak ffmpeg

🚀 Quick Start CLI (sq)

🤖 AI/LLM Commands

# Generate text
sq llm "Write a haiku about coding"

# Convert to SQL
sq llm "Get users who signed up last week" --to-sql
# Output: SELECT * FROM users WHERE created_at >= DATE_SUB(NOW(), INTERVAL 1 WEEK)

# Analyze code
sq llm --analyze --input main.py

# Use different providers (auto-detects API keys)
sq llm "Hello" --provider openai/gpt-4o
sq llm "Hello" --provider groq/llama3-70b-8192
sq llm "Hello" --provider ollama/qwen2.5:14b

🎤 Voice Control

# Type with voice (Polish/English)
sq voice-keyboard "wpisz hello world"
sq voice-keyboard --interactive  # Continuous mode

# AI-powered clicking (finds elements visually!)
sq voice-click "click on the blue Submit button"
sq voice-click "kliknij w menu File"

# Text to speech
sq voice speak "Hello, I am Streamware"

🖥️ Desktop Automation

# Mouse
sq auto click --x 100 --y 200
sq auto move --x 500 --y 300

# Keyboard
sq auto type --text "Hello World"
sq auto press --key enter
sq auto hotkey --keys ctrl+s

# Screenshot
sq auto screenshot /tmp/screen.png

📡 Communication

# Slack
sq slack general "Deploy complete! 🚀"

# Telegram
sq telegram @mychannel "Server status: OK"

# Email
sq email user@example.com --subject "Report" --body "See attached"

# Discord
sq discord --webhook URL --message "Alert!"

🎬 Media Analysis

# Describe image with AI
sq media describe_image --file photo.jpg

# Analyze video (scene tracking!)
sq media describe_video --file video.mp4

# Transcribe audio
sq media transcribe --file audio.mp3

🔄 Data Pipelines

# API to file
sq get api.example.com/users | sq file save users.json

# Transform data
sq file read data.csv | sq transform --csv --json | sq file save data.json

# PostgreSQL
sq postgres "SELECT * FROM users" --json

📖 Python API

Simple Pipeline

from streamware import flow
from streamware.dsl import configure

# Configure environment (optional)
configure(SQ_MODEL="llama3", SQ_DEBUG="true")

# Basic data transformation pipeline
result = (
    flow("http://api.example.com/data")
    | "transform://jsonpath?query=$.items[*]"
    | "file://write?path=/tmp/output.json"
).run()

Fluent DSL with Configuration

from streamware.dsl import Pipeline

# Configure and run in one chain
Pipeline() \
    .configure("SQ_MODEL", "gpt-4-vision") \
    .http_get("https://api.example.com/data") \
    .to_json() \
    .save("output.json") \
    .run()

Streaming Pipeline

# Real-time video processing
for frame in (
    flow("rtsp://camera/live")
    | "transcode://mp4?codec=h264"
    | "detect://faces"
    | "annotate://bbox"
).stream():
    process_frame(frame)

CurLLM Integration

# Web automation with LLM
result = (
    flow("curllm://browse?url=https://example.com")
    | "curllm://extract?instruction=Find all product prices under $50"
    | "transform://csv"
    | "file://write?path=products.csv"
).run()

🧩 Core Components

HTTP/REST Component

# GET request
flow("http://api.example.com/data").run()

# POST with data
flow("http://api.example.com/users?method=post").run({"name": "John"})

# GraphQL query
flow("graphql://api.example.com").run({"query": "{ users { id name } }"})

Communication Components

Email

# Send email
flow("email://send?to=user@example.com&subject=Hello").run("Message body")

# Watch inbox
for email in flow("email-watch://interval=60").stream():
    print(f"New email: {email['subject']}")

Telegram

# Send message to Telegram
flow("telegram://send?chat_id=@channel&token=BOT_TOKEN").run("Hello!")

# Telegram bot
bot = flow("telegram-bot://token=BOT_TOKEN") | "telegram-command://"

WhatsApp

# Send WhatsApp message (via Twilio)
flow("whatsapp://send?provider=twilio&to=+1234567890").run("Hello!")

Discord

# Send to Discord channel
flow("discord://send?channel_id=123456&token=BOT_TOKEN").run("Announcement")

# Discord webhook
flow("discord://webhook?url=WEBHOOK_URL").run({"content": "Alert!"})

Slack

# Post to Slack
flow("slack://send?channel=general&token=xoxb-TOKEN").run("Team update")

# Upload file to Slack
flow("slack://upload?channel=reports").run({"file": "report.pdf"})

SMS

# Send SMS via Twilio
flow("sms://send?provider=twilio&to=+1234567890").run("Alert: System down!")

# Bulk SMS
flow("sms://bulk?numbers=+123,+456,+789").run("Broadcast message")
flow("http://api.example.com/users")

POST with data

flow("http://api.example.com/users?method=post") \
    .with_data({"name": "John", "email": "john@example.com"})

File Component

# Read file
flow("file://read?path=/tmp/input.json")

# Write file
flow("file://write?path=/tmp/output.csv&mode=append")

Transform Component

# JSONPath extraction
flow("transform://jsonpath?query=$.users[?(@.age>18)]")

# Jinja2 template
flow("transform://template?file=report.j2")

# CSV conversion
flow("transform://csv?delimiter=;")

CurLLM Component

# Web scraping with LLM
flow("curllm://browse?url=https://example.com&visual=true&stealth=true") \
    | "curllm://extract?instruction=Extract all email addresses" \
    | "curllm://fill_form?data={'name':'John','email':'john@example.com'}"

# BQL (Browser Query Language)
flow("curllm://bql?query={page(url:'https://example.com'){title,links{text,url}}}")

🔥 Advanced Workflow Patterns

Split/Join Pattern

from streamware import flow, split, join

# Process items in parallel
result = (
    flow("http://api.example.com/items")
    | split("$.items[*]")  # Split array into individual items
    | "enrich://product_details"  # Process each item
    | join()  # Collect results back
    | "file://write?path=enriched.json"
).run()

Multicast Pattern

from streamware import flow, multicast

# Send to multiple destinations
flow("kafka://orders?topic=new-orders") \
    | multicast([
        "postgres://insert?table=orders",
        "rabbitmq://publish?exchange=notifications",
        "file://append?path=orders.log"
    ]).run()

Choice/Switch Pattern

from streamware import flow, choose

# Conditional routing
flow("http://api.example.com/events") \
    | choose() \
        .when("$.priority == 'high'", "kafka://high-priority") \
        .when("$.priority == 'low'", "rabbitmq://low-priority") \
        .otherwise("file://write?path=unknown.log") \
    .run()

🔌 Message Broker Integration

Kafka

# Consume from Kafka
flow("kafka://consume?topic=events&group=processor") \
    | "transform://json" \
    | "postgres://insert?table=events"

# Produce to Kafka
flow("file://watch?path=/tmp/uploads") \
    | "transform://json" \
    | "kafka://produce?topic=files&key=filename"

RabbitMQ

# Consume from RabbitMQ
flow("rabbitmq://consume?queue=tasks&auto_ack=false") \
    | "process://task_handler" \
    | "rabbitmq://ack"

# Publish to exchange
flow("postgres://query?sql=SELECT * FROM orders WHERE status='pending'") \
    | "rabbitmq://publish?exchange=orders&routing_key=pending"

PostgreSQL

# Query and transform
flow("postgres://query?sql=SELECT * FROM users WHERE active=true") \
    | "transform://jsonpath?query=$[?(@.age>25)]" \
    | "kafka://produce?topic=adult-users"

# Stream changes (CDC-like)
flow("postgres://stream?table=orders&events=insert,update") \
    | "transform://normalize" \
    | "elasticsearch://index?index=orders"

🎬 Multimedia Processing

Video Streaming

# RTSP to MP4 with face detection
flow("rtsp://camera/live") \
    | "transcode://mp4?codec=h264&fps=30" \
    | "detect://faces?model=haar" \
    | "annotate://bbox?color=green" \
    | "stream://hls?segment=10"

Audio Processing

# Speech to text pipeline
flow("audio://capture?device=default") \
    | "audio://denoise" \
    | "stt://whisper?lang=en" \
    | "transform://correct_grammar" \
    | "file://append?path=transcript.txt"

📊 Diagnostics and Monitoring

Enable Debug Logging

import streamware
streamware.enable_diagnostics(level="DEBUG")

# Detailed Camel-style logging
flow("http://api.example.com/data") \
    .with_diagnostics(trace=True) \
    | "transform://json" \
    | "file://write"

Metrics Collection

from streamware import flow, metrics

# Track pipeline metrics
with metrics.track("pipeline_name"):
    flow("kafka://consume?topic=events") \
        | "process://handler" \
        | "postgres://insert"
        
# Access metrics
print(metrics.get_stats("pipeline_name"))
# {'processed': 1000, 'errors': 2, 'avg_time': 0.034}

🔧 Creating Custom Components

from streamware import Component, register

@register("mycustom")
class MyCustomComponent(Component):
    input_mime = "application/json"
    output_mime = "application/json"
    
    def process(self, data):
        # Synchronous processing
        return transform_data(data)
    
    async def process_async(self, data):
        # Async processing
        return await async_transform(data)
    
    def stream(self, input_stream):
        # Streaming processing
        for item in input_stream:
            yield process_item(item)

# Use your custom component
flow("http://api.example.com/data") \
    | "mycustom://transform?param=value" \
    | "file://write"

🌐 System Protocol Handler

Install system-wide stream:// protocol:

# Install handler
streamware install-protocol

# Now you can use in terminal:
curl stream://http/get?url=https://api.example.com

# Or in browser:
stream://curllm/browse?url=https://example.com

🧪 Testing

import pytest
from streamware import flow, mock_component

def test_pipeline():
    # Mock external components
    with mock_component("http://api.example.com/data", returns={"items": [1, 2, 3]}):
        result = (
            flow("http://api.example.com/data")
            | "transform://jsonpath?query=$.items"
            | "transform://sum"
        ).run()
        
        assert result == 6

📚 Examples

Web Scraping Pipeline

# Extract product data with CurLLM
(
    flow("curllm://browse?url=https://shop.example.com&stealth=true")
    | "curllm://extract?instruction=Find all products under $50"
    | "transform://enrich_with_metadata"
    | "postgres://upsert?table=products&key=sku"
    | "kafka://produce?topic=price-updates"
).run()

Real-time Data Processing

# Process IoT sensor data
(
    flow("mqtt://subscribe?topic=sensors/+/temperature")
    | "transform://celsius_to_fahrenheit"
    | "filter://threshold?min=32&max=100"
    | "aggregate://average?window=5m"
    | "influxdb://write?measurement=temperature"
).run_forever()

ETL Pipeline

# Daily ETL job
(
    flow("postgres://query?sql=SELECT * FROM raw_events WHERE date=TODAY()")
    | "transform://clean_data"
    | "transform://validate"
    | "split://batch?size=1000"
    | "s3://upload?bucket=processed-events&prefix=daily/"
    | "notify://slack?channel=data-team"
).schedule(cron="0 2 * * *")

🔗 Component Reference

Core Components

  • HTTP/REST: HTTP client, REST API, webhooks, GraphQL
  • File: Read, write, watch, delete files
  • Transform: JSON, CSV, JSONPath, templates, base64, regex
  • CurLLM: Web automation, browsing, extraction, form filling

Communication Components

  • Email: SMTP/IMAP, send, receive, watch, filter emails
  • Telegram: Bot API, send messages, photos, documents, commands
  • WhatsApp: Business API, Twilio, templates, media
  • Discord: Bot API, webhooks, embeds, threads
  • Slack: Web API, events, slash commands, file uploads
  • SMS: Twilio, Vonage, Plivo, bulk messaging, verification

Message Queue Components

  • Kafka: Producer, consumer, topics, partitions
  • RabbitMQ: Publish, subscribe, RPC, exchanges
  • Redis: Pub/sub, queues, caching

Database Components

  • PostgreSQL: Query, insert, update, upsert, streaming
  • MongoDB: CRUD operations, aggregation
  • Elasticsearch: Search, index, aggregation

📡 Multi-Channel Communication

Unified Messaging

# Send notification to all user's preferred channels
user_preferences = get_user_preferences(user_id)

notification = "Important: Your order has been shipped!"

flow("choose://") \
    .when(f"'email' in {user_preferences}", 
          f"email://send?to={{user_email}}") \
    .when(f"'sms' in {user_preferences}", 
          f"sms://send?to={{user_phone}}") \
    .when(f"'telegram' in {user_preferences}", 
          f"telegram://send?chat_id={{telegram_id}}") \
    .run(notification)

Customer Support Hub

# Centralized support system handling all channels
support_hub = (
    flow("multicast://sources")
    .add_source("email-watch://folder=support")
    .add_source("telegram-bot://commands=/help,/support")
    .add_source("whatsapp-webhook://")
    .add_source("slack-events://channel=customer-support")
    | "transform://normalize_message"
    | "curllm://analyze?instruction=Categorize issue and suggest response"
    | "postgres://insert?table=support_tickets"
    | "auto_respond://template={{suggested_response}}"
)

# Run support hub
support_hub.run_forever()

Marketing Automation

# Personalized campaign across channels
campaign = (
    flow("postgres://query?sql=SELECT * FROM subscribers")
    | "split://parallel"
    | "enrich://behavioral_data"
    | "curllm://personalize?instruction=Create personalized message"
    | "choose://"
      .when("$.engagement_score > 80", [
          "email://send?template=vip_offer",
          "sms://send?priority=high"
      ])
      .when("$.engagement_score > 50", 
            "email://send?template=standard_offer")
      .when("$.last_interaction > '30 days'", [
          "email://send?template=win_back",
          "wait://days=3",
          "sms://send?message=We miss you! 20% off"
      ])
)

Incident Response System

# Multi-tier escalation with failover
incident_response = (
    flow("monitoring://alerts?severity=critical")
    | "create_incident://pagerduty"
    | "notify://tier1"
    .add_channel("slack://send?channel=oncall")
    .add_channel("sms://send?to={{oncall_primary}}")
    .add_channel("telegram://send?chat_id={{oncall_chat}}")
    | "wait://minutes=5"
    | "check://acknowledged"
    | "choose://"
      .when("$.acknowledged == false", [
          "notify://tier2",
          "phone://call?to={{oncall_secondary}}",
          "email://send?to=managers@company.com&priority=urgent"
      ])
    | "wait://minutes=10"
    | "choose://"
      .when("$.acknowledged == false", [
          "notify://tier3",
          "sms://send?to={{cto_phone}}",
          "create_conference://zoom?participants={{emergency_team}}"
      ])
)

📖 Documentation

Component URI Pattern Description
HTTP http://host/path HTTP requests
File file://operation?path=... File operations
Transform transform://type?params Data transformation
CurLLM curllm://action?params Web automation with LLM
Kafka kafka://operation?params Kafka integration
RabbitMQ rabbitmq://operation?params RabbitMQ integration
PostgreSQL postgres://operation?params PostgreSQL operations
Split split://pattern Split data into parts
Join join://strategy Join split data
Multicast multicast:// Send to multiple destinations
Choose choose:// Conditional routing
Filter filter://condition Filter data
Aggregate aggregate://function Aggregate over window

🤝 Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

# Development setup
git clone https://github.com/softreck/streamware.git
cd streamware
pip install -e ".[dev]"
pytest

📄 License

Licensed under the Apache License, Version 2.0. See LICENSE for details.

🙏 Acknowledgments

  • Apache Camel for inspiration
  • CurLLM for web automation capabilities
  • The Python streaming community

📞 Support


Built with ❤️ by Softreck

⭐ Star us on GitHub!

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamware-0.2.11.tar.gz (257.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamware-0.2.11-py3-none-any.whl (225.9 kB view details)

Uploaded Python 3

File details

Details for the file streamware-0.2.11.tar.gz.

File metadata

  • Download URL: streamware-0.2.11.tar.gz
  • Upload date:
  • Size: 257.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for streamware-0.2.11.tar.gz
Algorithm Hash digest
SHA256 1ee9d8db948d8e293d39c8f473004d3a6d5f2d8561bdbfed6fb819fda406b5eb
MD5 25f27e85693fa0dd1443f0e0e0296349
BLAKE2b-256 82359ae76e6220236424753ace51b2a19c007bea2ff279539bca8e521e577f38

See more details on using hashes here.

File details

Details for the file streamware-0.2.11-py3-none-any.whl.

File metadata

  • Download URL: streamware-0.2.11-py3-none-any.whl
  • Upload date:
  • Size: 225.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for streamware-0.2.11-py3-none-any.whl
Algorithm Hash digest
SHA256 c05d1807194ceaa3baabf51739392e4b9622dba9f1ae3b8bd356214ededefb4d
MD5 71761588e5123b1b862b10582032d66b
BLAKE2b-256 a77271b29c71537c6106f2c28f911e886a93633d33bc6e64d6eb3c46fb290f1b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page