Streaming text replacement for AI token streams — handles partial matches across chunk boundaries
Project description
stream-replace
Streaming text replacement for AI token streams — correctly handles partial matches across chunk boundaries.
Install
pip install stream-replace
Quick Start
import re
from stream_replace import Replacer
r = Replacer([
("敏感词", "***"), # string → string
("secret", lambda s: s[0] + "***"), # string → callable
(re.compile(r"1[3-9]\d{9}"), "[PHONE]"), # regex → string
(re.compile(r"<think>[\s\S]*?</think>"), ""), # regex → remove
(re.compile(r"(\d+)"), lambda m: str(int(m.group()) * 2)), # regex → callable
])
for chunk in ai_stream:
safe_text = r.feed(chunk)
print(safe_text, end="")
print(r.flush(), end="")
Why?
AI models stream tokens incrementally. A word you want to replace may be split across chunks:
chunk 1: "hel"
chunk 2: "lo world"
Naive per-chunk replacement would miss "hello". stream-replace buffers just enough text at chunk boundaries to detect partial matches, while emitting safe text as early as possible.
API
Replacer(rules)
Create a replacer with a list of (pattern, replacement) tuples.
| Pattern | Replacement | Description |
|---|---|---|
str |
str |
Exact string replacement |
str |
callable(matched_str) → str |
Dynamic string replacement |
re.Pattern |
str |
Regex replacement (supports \1 backrefs) |
re.Pattern |
callable(re.Match) → str |
Dynamic regex replacement |
r.feed(chunk: str) → str
Process one incoming chunk. Returns text that is safe to emit (fully resolved, no pending partial matches).
r.flush() → str
Flush the internal buffer after the stream ends. Must be called once to get any remaining text.
r.reset()
Clear internal state so the replacer can be reused for another stream.
r.wrap(iterable) → Iterable[str]
Convenience wrapper for a sync chunk stream. Handles feed + flush automatically.
for text in r.wrap(chunks):
print(text, end="")
r.wrap_async(async_iterable) → AsyncIterable[str]
Same as wrap, but for async iterables.
async for text in r.wrap_async(async_chunks):
print(text, end="")
Functional API
For one-off use without creating a Replacer instance:
from stream_replace import stream_replace, astream_replace
# sync
for text in stream_replace(chunks, [("hello", "world")]):
print(text, end="")
# async
async for text in astream_replace(async_chunks, [("hello", "world")]):
print(text, end="")
How It Works
- Buffer: Incoming chunks accumulate in an internal buffer.
- Match: On each
feed(), the buffer is scanned for complete matches across all rules. The earliest match wins. - Replace: Matched text is replaced; scanning continues from after the replacement.
- Hold back: After all matches, the buffer tail is checked for potential partial matches (a suffix that could be the start of a pattern). This tail is held back for the next
feed(). - Flush: On
flush(), the remaining buffer is processed without holding anything back.
For regex rules, the library automatically extracts literal prefixes from the pattern (e.g., "<think>" from r"<think>[\s\S]*?</think>") to detect both partial prefix matches and open-but-unclosed matches spanning multiple chunks.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stream_replace-0.1.0.tar.gz.
File metadata
- Download URL: stream_replace-0.1.0.tar.gz
- Upload date:
- Size: 7.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a3862a92d895fe84f9eb132a0543c8d1622d485f5d4d6329d291a178b736fbf0
|
|
| MD5 |
f3284afcda107797365d00269277c0e0
|
|
| BLAKE2b-256 |
1b4f45b3e2522f67e4193f48490c1ae277a81358251969ffc7f2a6b09c86c1bd
|
Provenance
The following attestation bundles were made for stream_replace-0.1.0.tar.gz:
Publisher:
publish.yml on naaive/stream-replace
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stream_replace-0.1.0.tar.gz -
Subject digest:
a3862a92d895fe84f9eb132a0543c8d1622d485f5d4d6329d291a178b736fbf0 - Sigstore transparency entry: 1004780084
- Sigstore integration time:
-
Permalink:
naaive/stream-replace@e0f150746dc1e7ff5f53c81ec8c256fab4e29373 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/naaive
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e0f150746dc1e7ff5f53c81ec8c256fab4e29373 -
Trigger Event:
release
-
Statement type:
File details
Details for the file stream_replace-0.1.0-py3-none-any.whl.
File metadata
- Download URL: stream_replace-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1c50e72fb983a9d808d2e726ab1412ee945f7e25af31891cbd8a7fa1ee69759a
|
|
| MD5 |
43b28bb5a37512f31ec31c5ccab2b220
|
|
| BLAKE2b-256 |
346d621bc70f7cf99d75ae79bba03f54e34a5b424179fde3d3c612dd28c509fe
|
Provenance
The following attestation bundles were made for stream_replace-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on naaive/stream-replace
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stream_replace-0.1.0-py3-none-any.whl -
Subject digest:
1c50e72fb983a9d808d2e726ab1412ee945f7e25af31891cbd8a7fa1ee69759a - Sigstore transparency entry: 1004780091
- Sigstore integration time:
-
Permalink:
naaive/stream-replace@e0f150746dc1e7ff5f53c81ec8c256fab4e29373 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/naaive
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e0f150746dc1e7ff5f53c81ec8c256fab4e29373 -
Trigger Event:
release
-
Statement type: