Skip to main content

Stream your DSPy application's inner-workings back to users in real-time

Project description

streamll

Stream your DSPy application's inner-workings back to users in real-time. Route reasoning steps, token generation, and progress updates directly through your existing infrastructure.

Installation

# Basic (terminal output only)
uv add streamll

# With Redis for production
uv add "streamll[redis]"

# With RabbitMQ
uv add "streamll[rabbitmq]"

# Everything
uv add "streamll[all]"

Quick start

asciicast

import dspy
import streamll

# Stream tokens to terminal
@streamll.instrument(stream_fields=["answer"])
class QA(dspy.Module):
    def __init__(self):
        self.generate = dspy.ChainOfThought("question -> answer")

    def forward(self, question):
        return self.generate(question=question)

# Configure DSPy
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

qa = QA()
result = qa("Explain quantum computing")

Production use

Send events to Redis or RabbitMQ instead of the terminal:

from streamll.sinks import RedisSink

sink = RedisSink(redis_url="redis://localhost:6379", stream_key="ml_events")

@streamll.instrument(sinks=[sink], stream_fields=["answer"])
class QA(dspy.Module):
    def __init__(self):
        self.generate = dspy.ChainOfThought("question -> answer")

    def forward(self, question):
        return self.generate(question=question)

Consume events in another service:

from streamll import EventConsumer

consumer = EventConsumer("redis://localhost:6379", target="ml_events")

@consumer.on("token")
async def handle_token(event):
    print(event.data["token"], end="", flush=True)

await consumer.run()

Custom events

Emit custom events within your processing:

@streamll.instrument
class RAGPipeline(dspy.Module):
    def forward(self, question):
        with streamll.trace("retrieval") as ctx:
            docs = self.retrieve(question)
            ctx.emit("docs_found", data={"count": len(docs)})

        answer = self.generate(docs=docs, question=question)
        return answer

Event correlation

Attach correlation IDs that persist across all events:

# In your API handler
streamll.set_context(
    conversation_id="conv_123",
    request_id="req_456"
)

# All subsequent events include this context
qa = QA()
result = qa("What is quantum computing?")

# Consumer can filter by context
@consumer.on("token")
async def handle_token(event):
    if event.data.get("conversation_id") == "conv_123":
        # Handle this specific conversation
        pass

Development

# Run tests
uv run pytest

# With coverage
uv run pytest --cov=src/streamll

# Start test services (Redis, RabbitMQ)
docker-compose -f tests/docker-compose.yml up -d

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamll-0.1.1.tar.gz (9.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamll-0.1.1-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file streamll-0.1.1.tar.gz.

File metadata

  • Download URL: streamll-0.1.1.tar.gz
  • Upload date:
  • Size: 9.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.17

File hashes

Hashes for streamll-0.1.1.tar.gz
Algorithm Hash digest
SHA256 d8ac0ba29da555de12f04bf157ec8c329a2f256e52ad82822cf683b4d7910006
MD5 bc0648b7e739ce8418336cbcb58e4b55
BLAKE2b-256 6a0463e5c88002e7da5839149c381a6e7c9992c15859274631e436c866449ba1

See more details on using hashes here.

File details

Details for the file streamll-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: streamll-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 13.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.17

File hashes

Hashes for streamll-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 64df5cb55d82e7597421153356c8f4a3e8d8a507d31e3aea5547f1a8495cec40
MD5 daf9b6b2a30fbd7b13d44510249618dd
BLAKE2b-256 2bfbb4e74340c88ea99a61f1deb25ad3ace3563b7c455be4dfd97293f582991c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page