A callback-driven, prefix-safe, lazy LLM stream sanitization library.
Project description
๐ LLM Stream Processor
A callback-driven, prefix-safe, lazy LLM stream sanitization library.
Real-time filtering, redaction, and control of streaming LLM outputs with sub-microsecond overhead.
โจ Features
- ๐ Prefix-Safe Pattern Matching โ Uses Aho-Corasick automaton to ensure no partial sensitive content leaks before full match confirmation
- โก Ultra-Low Latency โ Target <5ฮผs per-token overhead, designed for real-time streaming
- ๐ Sync & Async Support โ Works seamlessly with both synchronous and asynchronous LLM SDKs
- ๐ฏ Flexible Actions โ PASS, DROP, REPLACE, HALT, or CONTINUE_DROP/PASS based on pattern matches
- ๐ History Tracking โ Optional input/output/action history for debugging and analytics
- ๐ Runtime Updates โ Dynamically register/deregister patterns without restarting streams
๐ฆ Installation
pip install llm-stream-processor
For development:
git clone https://github.com/DevOpRohan/llm_stream_processor.git
cd llm_stream_processor
pip install -e .
๐ Quickstart
from stream_processor import KeywordRegistry, llm_stream_processor, replace, halt
# Create a registry and register pattern callbacks
reg = KeywordRegistry()
reg.register("secret", lambda ctx: replace("[REDACTED]"))
reg.register("STOP", halt) # Halt stream on this keyword
@llm_stream_processor(reg, yield_mode="token")
def generate_response():
yield "The secret password is hidden. "
yield "Do not STOP here."
yield "This won't be seen."
# Consume the filtered stream
for token in generate_response():
print(token, end="", flush=True)
# Output: The [REDACTED] password is hidden. Do not
๐ API Reference
Core Classes
| Class | Description |
|---|---|
KeywordRegistry |
Register/deregister keywords and their callbacks, compiles to Aho-Corasick automaton |
StreamProcessor |
Low-level processor for character-by-character filtering |
ActionContext |
Context passed to callbacks with keyword, buffer, position, and history |
StreamHistory |
Tracks input/output/actions for debugging |
Decorator
@llm_stream_processor(registry, yield_mode='token', record_history=True)
| Parameter | Options | Description |
|---|---|---|
registry |
KeywordRegistry | Registry with registered patterns |
yield_mode |
'char', 'token', 'chunk:N' |
Output mode: per-character, per-token, or N-char chunks |
record_history |
True/False |
Enable/disable history tracking |
Action Helpers
| Function | Description |
|---|---|
drop() |
Remove the matched keyword from output |
replace(text) |
Replace matched keyword with custom text |
halt() |
Immediately abort the stream |
passthrough() |
Leave matched keyword unchanged (no-op) |
continuous_drop() |
Start dropping all content until continuous_pass |
continuous_pass() |
Resume normal output after continuous_drop |
๐ฏ Use Cases
PII Redaction
import re
from stream_processor import KeywordRegistry, llm_stream_processor, replace
reg = KeywordRegistry()
# Redact email-like patterns (register common domains)
for domain in ["@gmail.com", "@yahoo.com", "@outlook.com"]:
reg.register(domain, lambda ctx: replace("@[REDACTED]"))
# Redact SSN patterns
reg.register("SSN:", lambda ctx: replace("SSN: [REDACTED]"))
Content Moderation (Drop Segments)
from stream_processor import KeywordRegistry, llm_stream_processor, continuous_drop, continuous_pass
reg = KeywordRegistry()
# Drop internal "thinking" blocks
reg.register("<think>", continuous_drop)
reg.register("</think>", continuous_pass)
@llm_stream_processor(reg, yield_mode="token")
def llm_stream():
yield "Hello! <think>internal reasoning here</think>Here's my response."
print("".join(llm_stream()))
# Output: Hello! </think>Here's my response.
Async Streaming (OpenAI Pattern)
import asyncio
from stream_processor import KeywordRegistry, llm_stream_processor, replace
reg = KeywordRegistry()
reg.register("API_KEY", lambda ctx: replace("[HIDDEN]"))
@llm_stream_processor(reg, yield_mode="token")
async def stream_chat():
# Simulating async LLM response chunks
chunks = ["Your ", "API_KEY ", "is safe."]
for chunk in chunks:
yield chunk
await asyncio.sleep(0.1)
async def main():
async for token in stream_chat():
print(token, end="", flush=True)
asyncio.run(main())
๐๏ธ Architecture
Token Generator (sync/async)
โ
โผ
@llm_stream_processor โ Decorator API
โ
โผ
StreamProcessor โ Character-level engine
โโโโโโโโโโโโโโโโ
โ Aho-Corasick โ
โ Automaton โ
โโโโโโโโโโโโโโโโ
โ
โผ
Lazy Buffer + Callbacks
โ
โผ
Re-packer (char/token/chunk)
โ
โผ
Consumer
For detailed architecture, see docs/ARCHITECTURE.md.
๐งช Development
# Install in editable mode
pip install -e .
# Run tests
python -m pytest tests/ -v
# Run the example
python -m examples.example
๐ Documentation
- Problem Statement: docs/PROBLEM.md
- Architecture & Design: docs/ARCHITECTURE.md
- Contributing Guide: CONTRIBUTING.md
๐ค Contributing
Contributions are welcome! Please read our Contributing Guide for details on:
- Code of Conduct
- Development setup
- Submitting pull requests
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments
- Aho-Corasick algorithm for efficient multi-pattern matching
- Inspired by the need for real-time LLM output sanitization in production systems
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file llm_stream_processor-0.1.0.tar.gz.
File metadata
- Download URL: llm_stream_processor-0.1.0.tar.gz
- Upload date:
- Size: 13.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d59aa04a111a1d422d3b527c8f11bee8f99b0020bf93b2c8ff20ea75ae140e3b
|
|
| MD5 |
012675c6c97f988c39c2eb4eea3d688e
|
|
| BLAKE2b-256 |
2361a0a381dd253d5dff2e8a7fe3ea6875bfd8f4e2af2c9b90ec6e374a36ac1c
|
Provenance
The following attestation bundles were made for llm_stream_processor-0.1.0.tar.gz:
Publisher:
publish.yml on DevOpRohan/llm_stream_processor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
llm_stream_processor-0.1.0.tar.gz -
Subject digest:
d59aa04a111a1d422d3b527c8f11bee8f99b0020bf93b2c8ff20ea75ae140e3b - Sigstore transparency entry: 774190498
- Sigstore integration time:
-
Permalink:
DevOpRohan/llm_stream_processor@5bcc25cd1e80b82345e11a390abee0547916f32c -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/DevOpRohan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5bcc25cd1e80b82345e11a390abee0547916f32c -
Trigger Event:
release
-
Statement type:
File details
Details for the file llm_stream_processor-0.1.0-py3-none-any.whl.
File metadata
- Download URL: llm_stream_processor-0.1.0-py3-none-any.whl
- Upload date:
- Size: 15.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
181f449f6074e1acc5d00d9a5214830fbfc1fd2007c425e57cc8912433147e96
|
|
| MD5 |
0b9a6ad039e56b809cf24a8bee978d3c
|
|
| BLAKE2b-256 |
0cd8ab9f9b48918f16f8e7cd824de0844a4ba89f429442dc16c27476fbbaa705
|
Provenance
The following attestation bundles were made for llm_stream_processor-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on DevOpRohan/llm_stream_processor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
llm_stream_processor-0.1.0-py3-none-any.whl -
Subject digest:
181f449f6074e1acc5d00d9a5214830fbfc1fd2007c425e57cc8912433147e96 - Sigstore transparency entry: 774190499
- Sigstore integration time:
-
Permalink:
DevOpRohan/llm_stream_processor@5bcc25cd1e80b82345e11a390abee0547916f32c -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/DevOpRohan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5bcc25cd1e80b82345e11a390abee0547916f32c -
Trigger Event:
release
-
Statement type: