Skip to main content

Streaming text replacement for AI token streams — handles partial matches across chunk boundaries

Project description

stream-replace

Streaming text replacement for AI token streams — correctly handles partial matches across chunk boundaries.

Install

pip install stream-replace

Quick Start

import re
from stream_replace import Replacer

r = Replacer([
    ("敏感词", "***"),                                        # string → string
    ("secret", lambda s: s[0] + "***"),                       # string → callable
    (re.compile(r"1[3-9]\d{9}"), "[PHONE]"),                  # regex  → string
    (re.compile(r"<think>[\s\S]*?</think>"), ""),              # regex  → remove
    (re.compile(r"(\d+)"), lambda m: str(int(m.group()) * 2)), # regex  → callable
])

for chunk in ai_stream:
    safe_text = r.feed(chunk)
    print(safe_text, end="")

print(r.flush(), end="")

Why?

AI models stream tokens incrementally. A word you want to replace may be split across chunks:

chunk 1: "hel"
chunk 2: "lo world"

Naive per-chunk replacement would miss "hello". stream-replace buffers just enough text at chunk boundaries to detect partial matches, while emitting safe text as early as possible.

API

Replacer(rules)

Create a replacer with a list of (pattern, replacement) tuples.

Pattern Replacement Description
str str Exact string replacement
str callable(matched_str) → str Dynamic string replacement
re.Pattern str Regex replacement (supports \1 backrefs)
re.Pattern callable(re.Match) → str Dynamic regex replacement

r.feed(chunk: str) → str

Process one incoming chunk. Returns text that is safe to emit (fully resolved, no pending partial matches).

r.flush() → str

Flush the internal buffer after the stream ends. Must be called once to get any remaining text.

r.reset()

Clear internal state so the replacer can be reused for another stream.

r.wrap(iterable) → Iterable[str]

Convenience wrapper for a sync chunk stream. Handles feed + flush automatically.

for text in r.wrap(chunks):
    print(text, end="")

r.wrap_async(async_iterable) → AsyncIterable[str]

Same as wrap, but for async iterables.

async for text in r.wrap_async(async_chunks):
    print(text, end="")

Functional API

For one-off use without creating a Replacer instance:

from stream_replace import stream_replace, astream_replace

# sync
for text in stream_replace(chunks, [("hello", "world")]):
    print(text, end="")

# async
async for text in astream_replace(async_chunks, [("hello", "world")]):
    print(text, end="")

How It Works

  1. Buffer: Incoming chunks accumulate in an internal buffer.
  2. Match: On each feed(), the buffer is scanned for complete matches across all rules. The earliest match wins.
  3. Replace: Matched text is replaced; scanning continues from after the replacement.
  4. Hold back: After all matches, the buffer tail is checked for potential partial matches (a suffix that could be the start of a pattern). This tail is held back for the next feed().
  5. Flush: On flush(), the remaining buffer is processed without holding anything back.

For regex rules, the library automatically extracts literal prefixes from the pattern (e.g., "<think>" from r"<think>[\s\S]*?</think>") to detect both partial prefix matches and open-but-unclosed matches spanning multiple chunks.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stream_replace-0.1.0.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stream_replace-0.1.0-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file stream_replace-0.1.0.tar.gz.

File metadata

  • Download URL: stream_replace-0.1.0.tar.gz
  • Upload date:
  • Size: 7.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stream_replace-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a3862a92d895fe84f9eb132a0543c8d1622d485f5d4d6329d291a178b736fbf0
MD5 f3284afcda107797365d00269277c0e0
BLAKE2b-256 1b4f45b3e2522f67e4193f48490c1ae277a81358251969ffc7f2a6b09c86c1bd

See more details on using hashes here.

Provenance

The following attestation bundles were made for stream_replace-0.1.0.tar.gz:

Publisher: publish.yml on naaive/stream-replace

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stream_replace-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: stream_replace-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 6.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stream_replace-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1c50e72fb983a9d808d2e726ab1412ee945f7e25af31891cbd8a7fa1ee69759a
MD5 43b28bb5a37512f31ec31c5ccab2b220
BLAKE2b-256 346d621bc70f7cf99d75ae79bba03f54e34a5b424179fde3d3c612dd28c509fe

See more details on using hashes here.

Provenance

The following attestation bundles were made for stream_replace-0.1.0-py3-none-any.whl:

Publisher: publish.yml on naaive/stream-replace

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page