Skip to main content

Parse streamed JSON data incrementally.

Project description

jsoncurrent

Python Emitter for the jsoncurrent patch protocol — stream structured JSON from your LLM backend incrementally.

LLM tokens → [Emitter] → patch stream → [Collector] → assembled object

JS/TS client and Node Emitter: https://github.com/richardantao/jsoncurrent-js

Installation

pip install jsoncurrent

The problem

LLMs generate JSON token by token. But if you try to parse incomplete JSON mid-stream, standard parsers throw.

jsoncurrent solves this with a patch protocol. The Emitter on your Python server parses raw tokens as they arrive and emits structured patch operations over SSE, WebSocket, or any transport you choose. The JS Collector on your client reconstructs the object incrementally.

// What the LLM emits (incomplete, unparseable mid-stream):
{"title": "Quarterly Report", "sections": [{"heading": "Exec

// What jsoncurrent delivers to your client as it arrives:
{ path: 'title',               value: 'Quarterly Report', op: 'add'    }
{ path: 'sections',            value: [],                 op: 'add'    }
{ path: 'sections[0]',         value: {},                 op: 'add'    }
{ path: 'sections[0].heading', value: 'Exec',             op: 'add'    }
{ path: 'sections[0].heading', value: 'utive Summary',    op: 'append' }

Why a Python Emitter?

If your backend is Python — FastAPI, Flask, Django — there is no client-side option for structured JSON streaming. jsoncurrent is the only way to produce a consumable patch stream from a Python LLM backend.

Beyond the language boundary, the Emitter's middleware chain lets you intercept every patch before it hits the wire:

  • Resolve {{img:chart}} placeholders to presigned S3 URLs
  • Strip fields a given user has no permission to see
  • Normalise inconsistent date formats from the model
  • Inject values from databases or caches

The wire format

Four operations. This is the entire protocol — identical across Python and JS implementations.

op Meaning Example
add Initialise or replace a value at a path { path: 'title', value: 'Hello', op: 'add' }
append Concatenate a string delta { path: 'title', value: ' World', op: 'append' }
insert Push a new element onto an array { path: 'tags', value: 'news', op: 'insert' }
complete The value at this path is fully assembled { path: 'title', value: 'Hello World', op: 'complete' }

Paths use dot-notation with array indices: sections[0].heading.

Patches are plain JSON-serialisable objects. How they travel is entirely up to you — SSE, WebSocket, HTTP streaming. The Emitter serialises each patch with chunk.to_json(); your client deserialises with JSON.parse().


FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from jsoncurrent import Emitter
import anthropic

app = FastAPI()
client = anthropic.Anthropic()

@app.get('/stream')
async def stream():
    queue = asyncio.Queue()

    emitter = Emitter()
    emitter.on('patch', lambda chunk: queue.put_nowait(f"data: {chunk.to_json()}\n\n"))
    emitter.on('complete', lambda: queue.put_nowait("data: [DONE]\n\n"))

    async def generate():
        with client.messages.stream(
            model="claude-opus-4-6",
            max_tokens=4096,
            messages=[{"role": "user", "content": "Generate a report as JSON..."}],
        ) as stream:
            for text in stream.text_stream:
                emitter.write(text)
        emitter.flush()

        while not queue.empty():
            yield await queue.get()

    return StreamingResponse(generate(), media_type="text/event-stream")

Flask

from flask import Flask, Response, stream_with_context
from jsoncurrent import Emitter
import anthropic

app = Flask(__name__)
client = anthropic.Anthropic()

@app.get('/stream')
def stream():
    def generate():
        emitter = Emitter()

        patches = []
        emitter.on('patch', patches.append)

        with client.messages.stream(
            model="claude-opus-4-6",
            max_tokens=4096,
            messages=[{"role": "user", "content": "Generate a report as JSON..."}],
        ) as stream:
            for text in stream.text_stream:
                emitter.write(text)
                for chunk in patches:
                    yield f"data: {chunk.to_json()}\n\n"
                patches.clear()

        emitter.flush()
        yield "data: [DONE]\n\n"

    return Response(stream_with_context(generate()), mimetype="text/event-stream")

Middleware

from jsoncurrent import Emitter

emitter = Emitter()

def resolve_images(patch, next_fn):
    if patch.op == 'add' and isinstance(patch.value, str):
        if patch.value.startswith('{{img:'):
            filename = patch.value[6:-2]
            patch = patch.replace(value=get_presigned_url(filename))
    next_fn(patch)

def strip_internal(patch, next_fn):
    if 'internal' not in patch.path:
        next_fn(patch)

emitter.use(resolve_images)
emitter.use(strip_internal)

Middleware runs in registration order. Call next_fn(patch) to pass through, call it multiple times to fan out, or return without calling it to drop the patch. Receives all four ops including complete.


API reference

Emitter

from jsoncurrent import Emitter

emitter = Emitter(
    root="",          # namespace prefix for all emitted paths
    completions=True  # emit complete patches — set False to suppress lifecycle signals
)

emitter.write(token: str)           # feed a raw LLM token
emitter.flush()                     # end of stream — flushes, emits 'complete', resets
emitter.reset()                     # reset without emitting 'complete'
emitter.use(fn: MiddlewareFn)       # register middleware — chainable
emitter.on(event: str, fn)          # register event listener
emitter.off(event: str, fn)         # remove event listener

Events:

  • patch — fires for each StreamingChunk; serialise with chunk.to_json()
  • complete — fires when flush() is called
  • error — fires on parse errors

StreamingChunk

from jsoncurrent.types import StreamingChunk

chunk.path   # str  — dot-notation path e.g. 'sections[0].heading'
chunk.value  # Any  — patch payload; assembled snapshot for 'complete' patches
chunk.op     # str  — 'add' | 'append' | 'insert' | 'complete'

chunk.to_json()              # serialize to wire format JSON string
chunk.replace(value=x)       # return new chunk with field replaced
StreamingChunk.from_json(s)  # deserialize from wire format JSON string

jsoncurrent-js

The JS/TS package — Collector, Node Emitter, and React hook. Patches from jsoncurrent-py are consumed by the JS Collector without any changes on the client side.

jsoncurrent-js


See also


Contributing

For development and contribution guidelines, see CONTRIBUTING.md.


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jsoncurrent-0.1.0.tar.gz (21.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

jsoncurrent-0.1.0-py3-none-any.whl (15.8 kB view details)

Uploaded Python 3

File details

Details for the file jsoncurrent-0.1.0.tar.gz.

File metadata

  • Download URL: jsoncurrent-0.1.0.tar.gz
  • Upload date:
  • Size: 21.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for jsoncurrent-0.1.0.tar.gz
Algorithm Hash digest
SHA256 afe404351bd6df7bb6bb3731e67caded774383f0c997890ec2ebc7b33687bd1d
MD5 97b0f2bbe488cc739c1871dc88f9c5f8
BLAKE2b-256 229134be6a31938d82c43e0e5c9b92e8b3ed18a11c38fda18b32310b3f4ec3e0

See more details on using hashes here.

Provenance

The following attestation bundles were made for jsoncurrent-0.1.0.tar.gz:

Publisher: publish.yml on richardantao/jsoncurrent-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file jsoncurrent-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: jsoncurrent-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 15.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for jsoncurrent-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 990e05414df74236d5799ccd28e1e6c83b87d45561f5a63e04f50ee74b95757a
MD5 092fdc5d7ddf82310576afe69baa3af8
BLAKE2b-256 ef8e9a69577c76a8ca0211163d365aefb1aefbec0597cf624e0c9d2aa0b59a0c

See more details on using hashes here.

Provenance

The following attestation bundles were made for jsoncurrent-0.1.0-py3-none-any.whl:

Publisher: publish.yml on richardantao/jsoncurrent-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page