Parse streamed JSON data incrementally.
Project description
jsoncurrent
Python Emitter for the jsoncurrent patch protocol — stream structured JSON from your LLM backend incrementally.
LLM tokens → [Emitter] → patch stream → [Collector] → assembled object
JS/TS client and Node Emitter: https://github.com/richardantao/jsoncurrent-js
Installation
pip install jsoncurrent
The problem
LLMs generate JSON token by token. But if you try to parse incomplete JSON mid-stream, standard parsers throw.
jsoncurrent solves this with a patch protocol. The Emitter on your Python server parses raw tokens as they arrive and emits structured patch operations over SSE, WebSocket, or any transport you choose. The JS Collector on your client reconstructs the object incrementally.
// What the LLM emits (incomplete, unparseable mid-stream):
{"title": "Quarterly Report", "sections": [{"heading": "Exec
// What jsoncurrent delivers to your client as it arrives:
{ path: 'title', value: 'Quarterly Report', op: 'add' }
{ path: 'sections', value: [], op: 'add' }
{ path: 'sections[0]', value: {}, op: 'add' }
{ path: 'sections[0].heading', value: 'Exec', op: 'add' }
{ path: 'sections[0].heading', value: 'utive Summary', op: 'append' }
Why a Python Emitter?
If your backend is Python — FastAPI, Flask, Django — there is no client-side option for structured JSON streaming. jsoncurrent is the only way to produce a consumable patch stream from a Python LLM backend.
Beyond the language boundary, the Emitter's middleware chain lets you intercept every patch before it hits the wire:
- Resolve
{{img:chart}}placeholders to presigned S3 URLs - Strip fields a given user has no permission to see
- Normalise inconsistent date formats from the model
- Inject values from databases or caches
The wire format
Four operations. This is the entire protocol — identical across Python and JS implementations.
op |
Meaning | Example |
|---|---|---|
add |
Initialise or replace a value at a path | { path: 'title', value: 'Hello', op: 'add' } |
append |
Concatenate a string delta | { path: 'title', value: ' World', op: 'append' } |
insert |
Push a new element onto an array | { path: 'tags', value: 'news', op: 'insert' } |
complete |
The value at this path is fully assembled | { path: 'title', value: 'Hello World', op: 'complete' } |
Paths use dot-notation with array indices: sections[0].heading.
Patches are plain JSON-serialisable objects. How they travel is entirely up to you — SSE, WebSocket, HTTP streaming. The Emitter serialises each patch with chunk.to_json(); your client deserialises with JSON.parse().
FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from jsoncurrent import Emitter
import anthropic
app = FastAPI()
client = anthropic.Anthropic()
@app.get('/stream')
async def stream():
queue = asyncio.Queue()
emitter = Emitter()
emitter.on('patch', lambda chunk: queue.put_nowait(f"data: {chunk.to_json()}\n\n"))
emitter.on('complete', lambda: queue.put_nowait("data: [DONE]\n\n"))
async def generate():
with client.messages.stream(
model="claude-opus-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": "Generate a report as JSON..."}],
) as stream:
for text in stream.text_stream:
emitter.write(text)
emitter.flush()
while not queue.empty():
yield await queue.get()
return StreamingResponse(generate(), media_type="text/event-stream")
Flask
from flask import Flask, Response, stream_with_context
from jsoncurrent import Emitter
import anthropic
app = Flask(__name__)
client = anthropic.Anthropic()
@app.get('/stream')
def stream():
def generate():
emitter = Emitter()
patches = []
emitter.on('patch', patches.append)
with client.messages.stream(
model="claude-opus-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": "Generate a report as JSON..."}],
) as stream:
for text in stream.text_stream:
emitter.write(text)
for chunk in patches:
yield f"data: {chunk.to_json()}\n\n"
patches.clear()
emitter.flush()
yield "data: [DONE]\n\n"
return Response(stream_with_context(generate()), mimetype="text/event-stream")
Middleware
from jsoncurrent import Emitter
emitter = Emitter()
def resolve_images(patch, next_fn):
if patch.op == 'add' and isinstance(patch.value, str):
if patch.value.startswith('{{img:'):
filename = patch.value[6:-2]
patch = patch.replace(value=get_presigned_url(filename))
next_fn(patch)
def strip_internal(patch, next_fn):
if 'internal' not in patch.path:
next_fn(patch)
emitter.use(resolve_images)
emitter.use(strip_internal)
Middleware runs in registration order. Call next_fn(patch) to pass through, call it multiple times to fan out, or return without calling it to drop the patch. Receives all four ops including complete.
API reference
Emitter
from jsoncurrent import Emitter
emitter = Emitter(
root="", # namespace prefix for all emitted paths
completions=True # emit complete patches — set False to suppress lifecycle signals
)
emitter.write(token: str) # feed a raw LLM token
emitter.flush() # end of stream — flushes, emits 'complete', resets
emitter.reset() # reset without emitting 'complete'
emitter.use(fn: MiddlewareFn) # register middleware — chainable
emitter.on(event: str, fn) # register event listener
emitter.off(event: str, fn) # remove event listener
Events:
patch— fires for eachStreamingChunk; serialise withchunk.to_json()complete— fires whenflush()is callederror— fires on parse errors
StreamingChunk
from jsoncurrent.types import StreamingChunk
chunk.path # str — dot-notation path e.g. 'sections[0].heading'
chunk.value # Any — patch payload; assembled snapshot for 'complete' patches
chunk.op # str — 'add' | 'append' | 'insert' | 'complete'
chunk.to_json() # serialize to wire format JSON string
chunk.replace(value=x) # return new chunk with field replaced
StreamingChunk.from_json(s) # deserialize from wire format JSON string
jsoncurrent-js
The JS/TS package — Collector, Node Emitter, and React hook. Patches from jsoncurrent-py are consumed by the JS Collector without any changes on the client side.
See also
- jsonriver — client-side incremental JSON parsing for pure JS stacks where the server forwards the raw LLM stream unchanged and no server-side transformation is needed
- Anthropic streaming docs
- OpenAI streaming docs
Contributing
For development and contribution guidelines, see CONTRIBUTING.md.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jsoncurrent-0.1.0.tar.gz.
File metadata
- Download URL: jsoncurrent-0.1.0.tar.gz
- Upload date:
- Size: 21.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
afe404351bd6df7bb6bb3731e67caded774383f0c997890ec2ebc7b33687bd1d
|
|
| MD5 |
97b0f2bbe488cc739c1871dc88f9c5f8
|
|
| BLAKE2b-256 |
229134be6a31938d82c43e0e5c9b92e8b3ed18a11c38fda18b32310b3f4ec3e0
|
Provenance
The following attestation bundles were made for jsoncurrent-0.1.0.tar.gz:
Publisher:
publish.yml on richardantao/jsoncurrent-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jsoncurrent-0.1.0.tar.gz -
Subject digest:
afe404351bd6df7bb6bb3731e67caded774383f0c997890ec2ebc7b33687bd1d - Sigstore transparency entry: 1315078762
- Sigstore integration time:
-
Permalink:
richardantao/jsoncurrent-py@893279e3422ee5e144ada5160499cb73bfafab71 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/richardantao
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@893279e3422ee5e144ada5160499cb73bfafab71 -
Trigger Event:
push
-
Statement type:
File details
Details for the file jsoncurrent-0.1.0-py3-none-any.whl.
File metadata
- Download URL: jsoncurrent-0.1.0-py3-none-any.whl
- Upload date:
- Size: 15.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
990e05414df74236d5799ccd28e1e6c83b87d45561f5a63e04f50ee74b95757a
|
|
| MD5 |
092fdc5d7ddf82310576afe69baa3af8
|
|
| BLAKE2b-256 |
ef8e9a69577c76a8ca0211163d365aefb1aefbec0597cf624e0c9d2aa0b59a0c
|
Provenance
The following attestation bundles were made for jsoncurrent-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on richardantao/jsoncurrent-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jsoncurrent-0.1.0-py3-none-any.whl -
Subject digest:
990e05414df74236d5799ccd28e1e6c83b87d45561f5a63e04f50ee74b95757a - Sigstore transparency entry: 1315078870
- Sigstore integration time:
-
Permalink:
richardantao/jsoncurrent-py@893279e3422ee5e144ada5160499cb73bfafab71 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/richardantao
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@893279e3422ee5e144ada5160499cb73bfafab71 -
Trigger Event:
push
-
Statement type: