Skip to main content

JMux: A Python package for demultiplexing a JSON string into multiple awaitable variables.

Project description

JMux: A Python package for demultiplexing a JSON string into multiple awaitable variables.

JMux is a powerful Python package that allows you to demultiplex a JSON stream into multiple awaitable variables. It is specifically designed for asynchronous applications that interact with Large Language Models (LLMs) using libraries like litellm. When an LLM streams a JSON response, jmux enables you to parse and use parts of the JSON object before the complete response has been received, significantly improving responsiveness.

Inspiration

This package is inspired by Snapshot Streaming mentioned in the WWDC25: Meet the Foundation Models framework keynote by Apple.

Features

  • Asynchronous by Design: Built on top of anyio, JMux supports both asyncio and trio backends, making it perfect for modern, high-performance Python applications.
  • Python 3.10+: Supports Python 3.10 and newer versions.
  • Pydantic Integration: Validate your JMux classes against Pydantic models to ensure type safety and consistency.
  • Code Generation: Automatically generate JMux classes from StreamableBaseModel subclasses using the jmux generate CLI command.
  • Awaitable and Streamable Sinks: Use AwaitableValue for single values and StreamableValues for streams of values.
  • Robust Error Handling: JMux provides a comprehensive set of exceptions to handle parsing errors and other issues.
  • Lightweight: JMux has only a few external dependencies, making it easy to integrate into any project.

Installation

You can install JMux from PyPI using pip:

pip install jmux

Code Generation

JMux provides a CLI tool to automatically generate JMux classes from Pydantic models. Instead of manually writing both a Pydantic model and a corresponding JMux class, you can define your models using StreamableBaseModel and let JMux generate the demultiplexer classes for you.

Defining Models with StreamableBaseModel

Use StreamableBaseModel as your base class instead of pydantic.BaseModel:

from typing import Annotated
from jmux import StreamableBaseModel, Streamed

class LlmResponse(StreamableBaseModel):
    thought: str
    tool_code: Annotated[str, Streamed]
    tags: list[str]

Type Mappings

The generator converts your model fields to JMux types as follows:

Model Field Type Generated JMux Type
str, int, float, bool AwaitableValue[T]
Enum AwaitableValue[EnumType]
T | None AwaitableValue[T | None]
list[T] StreamableValues[T]
Annotated[str, Streamed] StreamableValues[str]
Nested StreamableBaseModel AwaitableValue[NestedModelJMux]

The Streamed marker is useful when you want to stream a string field character-by-character (e.g., for real-time display of LLM output) rather than awaiting the complete value.

Using the CLI

Run the jmux generate command to scan your codebase and generate JMux classes:

jmux generate --root <directory>

This will:

  1. Recursively scan <directory> for Python files containing StreamableBaseModel subclasses
  2. Generate corresponding JMux classes with the suffix JMux (e.g., LlmResponseLlmResponseJMux)
  3. Write the generated code to src/jmux/generated/__init__.py

Example

Given this model:

from typing import Annotated
from jmux import StreamableBaseModel, Streamed

class LlmResponse(StreamableBaseModel):
    thought: str
    tool_code: Annotated[str, Streamed]

Running jmux generate produces:

from jmux.awaitable import AwaitableValue, StreamableValues
from jmux.demux import JMux

class LlmResponseJMux(JMux):
    thought: AwaitableValue[str]
    tool_code: StreamableValues[str]

You can then import and use the generated class:

from jmux.generated import LlmResponseJMux

jmux_instance = LlmResponseJMux()

Usage with LLMs (e.g., litellm)

The primary use case for jmux is to process streaming JSON responses from LLMs. This allows you to react to parts of the data as it arrives, rather than waiting for the entire JSON object to be transmitted. While this should be obvious, I should mention, that the order in which the pydantic model defines the properties, defines which stream is filled first.

Using Code Generation (Recommended)

The easiest way to use JMux with LLMs is to define your models using StreamableBaseModel and generate JMux classes automatically:

from typing import Annotated
from jmux import StreamableBaseModel, Streamed

class LlmResponse(StreamableBaseModel): # Use `StreamableBaseModel` so that the CLI can find the `pydantic` models to parse
    thought: str
    tool_code: Annotated[str, Streamed]

Then run jmux generate --root . to generate the LlmResponseJMux class. You can then use it directly:

import anyio
from jmux.generated import LlmResponseJMux

async def mock_llm_stream():
    json_stream = '{"thought": "I need to write some code.", "tool_code": "print(\'Hello, World!\')"}'
    for char in json_stream:
        yield char
        await anyio.sleep(0.01)

async def process_llm_response():
    jmux_instance = LlmResponseJMux()

    async def feed_stream():
        async for chunk in mock_llm_stream():
            await jmux_instance.feed_chunks(chunk)

    async def consume_thought():
        thought = await jmux_instance.thought
        print(f"LLM's thought received: '{thought}'")

    async def consume_tool_code():
        print("Receiving tool code...")
        full_code = ""
        async for code_fragment in jmux_instance.tool_code:
            full_code += code_fragment
            print(f"  -> Received fragment: {code_fragment}")
        print(f"Full tool code received: {full_code}")

    async with anyio.create_task_group() as tg:
        tg.start_soon(feed_stream)
        tg.start_soon(consume_thought)
        tg.start_soon(consume_tool_code)

if __name__ == "__main__":
    anyio.run(process_llm_response, backend="asyncio")

Manual Approach

If you prefer more control, you can manually define both the Pydantic model and the JMux class:

import anyio
from pydantic import BaseModel
from jmux import JMux, AwaitableValue, StreamableValues
# litellm is used conceptually here
# from litellm import acompletion

# 1. Define the Pydantic model for the expected JSON response
class LlmResponse(BaseModel): # No need to use `StreamableBaseModel` here, since it is only used for detection purposes
    thought: str # **This property is filled first**
    tool_code: str

# 2. Define the corresponding JMux class
class LlmResponseMux(JMux):
    thought: AwaitableValue[str]
    tool_code: StreamableValues[str] # Stream the code as it's generated

# 3. Validate that the JMux class matches the Pydantic model
LlmResponseMux.assert_conforms_to(LlmResponse)

# A mock function that simulates a streaming LLM call
async def mock_llm_stream():
    json_stream = '{"thought": "I need to write some code.", "tool_code": "print(\'Hello, World!\')"}'
    for char in json_stream:
        yield char
        await anyio.sleep(0.01) # Simulate network latency

# Main function to orchestrate the call and processing
async def process_llm_response():
    jmux_instance = LlmResponseMux()

    # This task will consume the LLM stream and feed it to jmux
    async def feed_stream():
        async for chunk in mock_llm_stream():
            await jmux_instance.feed_chunks(chunk)

    # These tasks will consume the demultiplexed data from jmux
    async def consume_thought():
        thought = await jmux_instance.thought
        print(f"LLM's thought received: '{thought}'")
        # You can act on the thought immediately
        # without waiting for the tool_code to finish streaming.

    async def consume_tool_code():
        print("Receiving tool code...")
        full_code = ""
        async for code_fragment in jmux_instance.tool_code:
            full_code += code_fragment
            print(f"  -> Received fragment: {code_fragment}")
        print(f"Full tool code received: {full_code}")

    # Run all tasks concurrently using anyio task group
    async with anyio.create_task_group() as tg:
        tg.start_soon(feed_stream)
        tg.start_soon(consume_thought)
        tg.start_soon(consume_tool_code)

# Run with asyncio backend
if __name__ == "__main__":
    anyio.run(process_llm_response, backend="asyncio")
    # Or use trio: anyio.run(process_llm_response, backend="trio")

Example Implementation

Python Code
def create_json_streaming_completion[T: BaseModel, J: IJsonDemuxer](
        self,
        messages: List[ILlmMessage],
        ReturnType: Type[T],
        JMux: Type[J],
        retries: int = 3,
    ) -> StreamResponseTuple[T, J]:
        try:
            JMux.assert_conforms_to(ReturnType)
            litellm_messages = self._convert_messages(messages)
            jmux_instance: J = JMux()

            async def stream_feeding_llm_call() -> T:
                nonlocal jmux_instance
                buffer = ""
                stream: CustomStreamWrapper = await self._router.acompletion( # see litellm `router`
                    model=self._internal_model_name.value,
                    messages=litellm_messages,
                    stream=True,
                    num_retries=retries,
                    response_format=ReturnType,
                    **self._maybe_google_credentials_param,
                    **self._model_params.model_dump(exclude_none=True),
                    **self._additional_params,
                )

                async for chunk in stream:
                    content_fragment: str | None = None

                    tool_calls = chunk.choices[0].delta.tool_calls
                    if tool_calls:
                        content_fragment = tool_calls[0].function.arguments
                    elif chunk.choices[0].delta.content:
                        content_fragment = chunk.choices[0].delta.content

                    if content_fragment:
                        try:
                            buffer += content_fragment
                            await jmux_instance.feed_chunks(content_fragment)
                        except Exception as e:
                            logger.warning(f"error in JMux feed_chunks: {e}")
                            raise e

                return ReturnType.model_validate_json(buffer)

            awaitable_llm_result = create_task(stream_feeding_llm_call())
            return (awaitable_llm_result, jmux_instance)
        except Exception as e:
            logger.warning(f"error in create_json_streaming_completion: {e}")
            raise e

The code above shows an example implementation that uses a litellm router for acompletion.

You can either await awaitable_llm_result if you need the full result, or use await jmux_instance.your_awaitable_value or async for ele in jmux_instance.your_streamable_values to access partial results.

Basic Usage

Here is a simple example of how to use JMux to parse a JSON stream:

import anyio
from enum import Enum
from types import NoneType
from pydantic import BaseModel

from jmux import JMux, AwaitableValue, StreamableValues

# 1. Define your JMux class
class SObject(JMux):
    class SNested(JMux):
        key_str: AwaitableValue[str]

    class SEnum(Enum):
        VALUE1 = "value1"
        VALUE2 = "value2"

    key_str: AwaitableValue[str]
    key_int: AwaitableValue[int]
    key_float: AwaitableValue[float]
    key_bool: AwaitableValue[bool]
    key_none: AwaitableValue[NoneType]
    key_stream: StreamableValues[str]
    key_enum: AwaitableValue[SEnum]
    key_nested: AwaitableValue[SNested]

# 2. (Optional) Define a Pydantic model for validation
class PObject(BaseModel):
    class PNested(BaseModel):
        key_str: str

    class PEnum(Enum):
        VALUE1 = "value1"
        VALUE2 = "value2"

    key_str: str
    key_int: int
    key_float: float
    key_bool: bool
    key_none: NoneType
    key_stream: str
    key_enum: PEnum
    key_nested: PNested

# 3. Validate the JMux class against the Pydantic model
SObject.assert_conforms_to(PObject)

# 4. Create an instance of your JMux class
s_object = SObject()

# 5. Feed the JSON stream to the JMux instance
async def main():
    json_stream = '{"key_str": "hello", "key_int": 42, "key_float": 3.14, "key_bool": true, "key_none": null, "key_stream": "world", "key_enum": "value1", "key_nested": {"key_str": "nested"}}'

    async def produce():
        for char in json_stream:
            await s_object.feed_char(char)

    async def consume():
        key_str = await s_object.key_str
        print(f"key_str: {key_str}")

        key_int = await s_object.key_int
        print(f"key_int: {key_int}")

        key_float = await s_object.key_float
        print(f"key_float: {key_float}")

        key_bool = await s_object.key_bool
        print(f"key_bool: {key_bool}")

        key_none = await s_object.key_none
        print(f"key_none: {key_none}")

        key_stream = ""
        async for char in s_object.key_stream:
            key_stream += char
        print(f"key_stream: {key_stream}")

        key_enum = await s_object.key_enum
        print(f"key_enum: {key_enum}")

        key_nested = await s_object.key_nested
        nested_key_str = await key_nested.key_str
        print(f"nested_key_str: {nested_key_str}")

    async with anyio.create_task_group() as tg:
        tg.start_soon(produce)
        tg.start_soon(consume)

if __name__ == "__main__":
    anyio.run(main, backend="asyncio")
    # Or use trio: anyio.run(main, backend="trio")

API Reference

Abstract Calss jmux.JMux

The abstract base class for creating JSON demultiplexers.

JMux.assert_conforms_to(pydantic_model: Type[BaseModel]) -> None

Asserts that the JMux class conforms to a given Pydantic model.

async JMux.feed_char(ch: str) -> None

Feeds a character to the JMux parser.

async JMux.feed_chunks(chunks: str) -> None

Feeds a string of characters to the JMux parser.

Class jmux.AwaitableValue[T]

A class that represents a value that will be available in the future. You are awaiting the full value and do not get partial results.

Allowed types here are (they can all be combined with Optional):

  • int, float, str, bool, NoneType
  • JMux
  • Enum

In all cases, the corresponding pydantic.BaseModel should not be list

Class jmux.StreamableValues[T]

A class that represents a stream of values that can be asynchronously iterated over.

Allowed types are listed below and should all be wrapped in a list on the pydantic model:

  • int, float, str, bool, NoneType
  • JMux
  • Enum

Additionally the following type is supported without being wrapped into list:

  • str

This allows you to fully stream strings directly to a sink.

Class jmux.StreamableBaseModel

A Pydantic BaseModel subclass used for defining models that can be automatically converted to JMux classes via the jmux generate CLI command.

from jmux import StreamableBaseModel

class MyModel(StreamableBaseModel):
    name: str
    age: int

Class jmux.Streamed

A marker class used with typing.Annotated to indicate that a string field should be streamed character-by-character rather than awaited as a complete value.

from typing import Annotated
from jmux import StreamableBaseModel, Streamed

class MyModel(StreamableBaseModel):
    content: Annotated[str, Streamed]

When generating JMux classes, fields annotated with Streamed will be converted to StreamableValues[str] instead of AwaitableValue[str].

License

This project is licensed under the terms of the MIT license. See the LICENSE file for details.

Contributions

As you might see, this repo has only been created recently and so far I am the only developer working on it. If you want to contribute, reach out via johannes@unruh.ai or johannes.a.unruh@gmail.com.

If you have suggestions or find any errors in my implementation, feel free to create an issue or also reach out via email.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jmux-0.1.0.tar.gz (45.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

jmux-0.1.0-py3-none-any.whl (23.9 kB view details)

Uploaded Python 3

File details

Details for the file jmux-0.1.0.tar.gz.

File metadata

  • Download URL: jmux-0.1.0.tar.gz
  • Upload date:
  • Size: 45.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for jmux-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8767dee3eaa4026ab3a1a32d0f0a3f0a2fc679e5dc4ffb4623f5c55265add0f1
MD5 9e6a710baf24d0fb2a85ebe49beddfdb
BLAKE2b-256 9509c5515fe26a3c968bd3e539ee092b801305b47c5b5d29f2b82400ab606f64

See more details on using hashes here.

Provenance

The following attestation bundles were made for jmux-0.1.0.tar.gz:

Publisher: ci.yml on jaunruh/jmux

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file jmux-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: jmux-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 23.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for jmux-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9fdecd282929b2f9db5bcd2bd3dc8d2dc2939fc0b51d8846bd0bf51f5688a6f6
MD5 90cabc7fde58bc42fb335dda36797edc
BLAKE2b-256 1e6e526f2daa40f23075291672e2b77f925ac200e84f3430ed03e795bc30183d

See more details on using hashes here.

Provenance

The following attestation bundles were made for jmux-0.1.0-py3-none-any.whl:

Publisher: ci.yml on jaunruh/jmux

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page