Skip to main content

A callback-driven, prefix-safe, lazy LLM stream sanitization library.

Project description

๐ŸŒŠ LLM Stream Processor

PyPI Python Versions License PRs Welcome

A callback-driven, prefix-safe, lazy LLM stream sanitization library.

Real-time filtering, redaction, and control of streaming LLM outputs with sub-microsecond overhead.


โœจ Features

  • ๐Ÿ”’ Prefix-Safe Pattern Matching โ€” Uses Aho-Corasick automaton to ensure no partial sensitive content leaks before full match confirmation
  • โšก Ultra-Low Latency โ€” Target <5ฮผs per-token overhead, designed for real-time streaming
  • ๐Ÿ”„ Sync & Async Support โ€” Works seamlessly with both synchronous and asynchronous LLM SDKs
  • ๐ŸŽฏ Flexible Actions โ€” PASS, DROP, REPLACE, HALT, or CONTINUE_DROP/PASS based on pattern matches
  • ๐Ÿ“Š History Tracking โ€” Optional input/output/action history for debugging and analytics
  • ๐Ÿ”Œ Runtime Updates โ€” Dynamically register/deregister patterns without restarting streams

๐Ÿ“ฆ Installation

pip install llm-stream-processor

For development:

git clone https://github.com/DevOpRohan/llm_stream_processor.git
cd llm_stream_processor
pip install -e .

๐Ÿš€ Quickstart

from stream_processor import KeywordRegistry, llm_stream_processor, replace, halt

# Create a registry and register pattern callbacks
reg = KeywordRegistry()
reg.register("secret", lambda ctx: replace("[REDACTED]"))
reg.register("STOP", halt)  # Halt stream on this keyword

@llm_stream_processor(reg, yield_mode="token")
def generate_response():
    yield "The secret password is hidden. "
    yield "Do not STOP here."
    yield "This won't be seen."

# Consume the filtered stream
for token in generate_response():
    print(token, end="", flush=True)
# Output: The [REDACTED] password is hidden. Do not 

๐Ÿ“– API Reference

Core Classes

Class Description
KeywordRegistry Register/deregister keywords and their callbacks, compiles to Aho-Corasick automaton
StreamProcessor Low-level processor for character-by-character filtering
ActionContext Context passed to callbacks with keyword, buffer, position, and history
StreamHistory Tracks input/output/actions for debugging

Decorator

@llm_stream_processor(registry, yield_mode='token', record_history=True)
Parameter Options Description
registry KeywordRegistry Registry with registered patterns
yield_mode 'char', 'token', 'chunk:N' Output mode: per-character, per-token, or N-char chunks
record_history True/False Enable/disable history tracking

Action Helpers

Function Description
drop() Remove the matched keyword from output
replace(text) Replace matched keyword with custom text
halt() Immediately abort the stream
passthrough() Leave matched keyword unchanged (no-op)
continuous_drop() Start dropping all content until continuous_pass
continuous_pass() Resume normal output after continuous_drop

๐ŸŽฏ Use Cases

PII Redaction

import re
from stream_processor import KeywordRegistry, llm_stream_processor, replace

reg = KeywordRegistry()

# Redact email-like patterns (register common domains)
for domain in ["@gmail.com", "@yahoo.com", "@outlook.com"]:
    reg.register(domain, lambda ctx: replace("@[REDACTED]"))

# Redact SSN patterns
reg.register("SSN:", lambda ctx: replace("SSN: [REDACTED]"))

Content Moderation (Drop Segments)

from stream_processor import KeywordRegistry, llm_stream_processor, continuous_drop, continuous_pass

reg = KeywordRegistry()

# Drop internal "thinking" blocks
reg.register("<think>", continuous_drop)
reg.register("</think>", continuous_pass)

@llm_stream_processor(reg, yield_mode="token")
def llm_stream():
    yield "Hello! <think>internal reasoning here</think>Here's my response."

print("".join(llm_stream()))
# Output: Hello! </think>Here's my response.

Async Streaming (OpenAI Pattern)

import asyncio
from stream_processor import KeywordRegistry, llm_stream_processor, replace

reg = KeywordRegistry()
reg.register("API_KEY", lambda ctx: replace("[HIDDEN]"))

@llm_stream_processor(reg, yield_mode="token")
async def stream_chat():
    # Simulating async LLM response chunks
    chunks = ["Your ", "API_KEY ", "is safe."]
    for chunk in chunks:
        yield chunk
        await asyncio.sleep(0.1)

async def main():
    async for token in stream_chat():
        print(token, end="", flush=True)

asyncio.run(main())

๐Ÿ—๏ธ Architecture

Token Generator (sync/async)
         โ”‚
         โ–ผ
@llm_stream_processor    โ† Decorator API
         โ”‚
         โ–ผ
   StreamProcessor       โ† Character-level engine
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
   โ”‚ Aho-Corasick โ”‚
   โ”‚  Automaton   โ”‚
   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚
         โ–ผ
  Lazy Buffer + Callbacks
         โ”‚
         โ–ผ
   Re-packer (char/token/chunk)
         โ”‚
         โ–ผ
      Consumer

For detailed architecture, see docs/ARCHITECTURE.md.

๐Ÿงช Development

# Install in editable mode
pip install -e .

# Run tests
python -m pytest tests/ -v

# Run the example
python -m examples.example

๐Ÿ“š Documentation

๐Ÿค Contributing

Contributions are welcome! Please read our Contributing Guide for details on:

  • Code of Conduct
  • Development setup
  • Submitting pull requests

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • Aho-Corasick algorithm for efficient multi-pattern matching
  • Inspired by the need for real-time LLM output sanitization in production systems

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_stream_processor-0.1.0.tar.gz (13.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_stream_processor-0.1.0-py3-none-any.whl (15.0 kB view details)

Uploaded Python 3

File details

Details for the file llm_stream_processor-0.1.0.tar.gz.

File metadata

  • Download URL: llm_stream_processor-0.1.0.tar.gz
  • Upload date:
  • Size: 13.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for llm_stream_processor-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d59aa04a111a1d422d3b527c8f11bee8f99b0020bf93b2c8ff20ea75ae140e3b
MD5 012675c6c97f988c39c2eb4eea3d688e
BLAKE2b-256 2361a0a381dd253d5dff2e8a7fe3ea6875bfd8f4e2af2c9b90ec6e374a36ac1c

See more details on using hashes here.

Provenance

The following attestation bundles were made for llm_stream_processor-0.1.0.tar.gz:

Publisher: publish.yml on DevOpRohan/llm_stream_processor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file llm_stream_processor-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for llm_stream_processor-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 181f449f6074e1acc5d00d9a5214830fbfc1fd2007c425e57cc8912433147e96
MD5 0b9a6ad039e56b809cf24a8bee978d3c
BLAKE2b-256 0cd8ab9f9b48918f16f8e7cd824de0844a4ba89f429442dc16c27476fbbaa705

See more details on using hashes here.

Provenance

The following attestation bundles were made for llm_stream_processor-0.1.0-py3-none-any.whl:

Publisher: publish.yml on DevOpRohan/llm_stream_processor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page