Skip to main content

Python SDK for the Emergent event-driven workflow platform

Project description

emergent-client

Python SDK for building event-driven workflows on the Emergent engine. Connect to a running engine over Unix IPC and publish or consume messages through three typed primitives: Source, Handler, and Sink.

from emergent import EmergentSink

async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
    print(msg.payload)

Install

pip install emergent-client

Or with uv:

uv add emergent-client

Then import:

from emergent import EmergentSource, EmergentHandler, EmergentSink

Three Primitives

Every Emergent workflow is composed of Sources, Handlers, and Sinks. Each primitive has a single, well-defined role:

Primitive Subscribe Publish Role
Source -- Yes Ingress -- bring data into the system
Handler Yes Yes Processing -- transform, enrich, or route
Sink Yes -- Egress -- persist, display, or forward data

Quick Start

Sink -- consume messages

A Sink subscribes to message types and processes each one as it arrives. EmergentSink.messages is a convenience method that connects, subscribes, and yields messages in a single call:

from emergent import EmergentSink

async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
    data = msg.payload_as(dict)
    print(f"Tick #{data['sequence']}")

For explicit lifecycle control, connect and subscribe separately:

async with await EmergentSink.connect("my_sink") as sink:
    async with await sink.subscribe(["timer.tick", "timer.filtered"]) as stream:
        async for msg in stream:
            print(msg.message_type, msg.payload)

Source -- publish messages

A Source publishes messages into the engine. It cannot subscribe:

from emergent import EmergentSource

async with await EmergentSource.connect("my_source") as source:
    await source.publish("sensor.reading", {"value": 42.5, "unit": "celsius"})

Handler -- subscribe and publish

A Handler subscribes to incoming messages and publishes new ones. Use caused_by to link output messages to the input that triggered them:

from emergent import EmergentHandler, create_message

async with await EmergentHandler.connect("order_processor") as handler:
    async with await handler.subscribe(["order.created"]) as stream:
        async for msg in stream:
            await handler.publish(
                create_message("order.processed")
                .caused_by(msg.id)
                .payload({"status": "ok"})
            )

Publishing Messages

Every primitive that can publish supports three calling styles. All three produce the same result:

# Shorthand -- type string and payload dict
await source.publish("timer.tick", {"count": 1})

# MessageBuilder -- fluent API with auto-build
await source.publish(
    create_message("timer.tick").payload({"count": 1})
)

# Full EmergentMessage object
await source.publish(message)

Streaming Publish

Publish a collection or async stream of messages. Each message is sent individually so subscribers begin consuming immediately. Both methods return the count of successfully published messages and stop on the first error.

# From a list or any Iterable
messages = [
    create_message("record.imported").payload(record)
    for record in records
]
count = await source.publish_all(messages)

# From an async generator or any AsyncIterable
async def generate_messages():
    for i in range(100):
        yield create_message("batch.item").payload({"index": i})

count = await source.publish_stream(generate_messages())

Both publish_all and publish_stream are available on EmergentSource and EmergentHandler.

Building Messages

create_message returns a fluent builder for constructing immutable EmergentMessage instances:

from emergent import create_message

msg = (
    create_message("sensor.reading")
    .payload({"value": 42.5, "unit": "celsius"})
    .metadata({"sensor_id": "temp-01", "location": "room-a"})
    .build()
)

Link messages into traceable chains with caused_by and correlated_with:

reply = (
    create_message("order.confirmed")
    .caused_by(original_msg.id)
    .correlated_with(request_id)
    .payload({"confirmed": True})
    .build()
)

The builder sets id (TypeID format) and timestamp_ms automatically. Call .build() explicitly when you need the message object, or pass the builder directly to publish(), which calls .build() for you.

Subscribing to Messages

subscribe accepts a list or variadic arguments:

# List form
stream = await sink.subscribe(["timer.tick", "timer.filtered"])

# Variadic form
stream = await sink.subscribe("timer.tick", "timer.filtered")

Iterate over the returned MessageStream with async for:

async for msg in stream:
    data = msg.payload_as(dict)
    print(data["count"])

MessageStream implements AsyncIterator and the async context manager protocol, so you can use async with for automatic cleanup:

async with await sink.subscribe(["timer.tick"]) as stream:
    async for msg in stream:
        print(msg.payload)

Typed payloads with Pydantic

payload_as validates dict payloads against Pydantic models automatically:

from pydantic import BaseModel

class SensorReading(BaseModel):
    value: float
    unit: str

async for msg in EmergentSink.messages("my_sink", ["sensor.reading"]):
    reading = msg.payload_as(SensorReading)
    print(f"{reading.value} {reading.unit}")

Resource Cleanup

All primitives implement the async context manager protocol. Use async with for automatic cleanup (recommended), or call close() / disconnect() manually:

# Automatic cleanup (recommended)
async with await EmergentSink.connect("my_sink") as sink:
    ...

# Manual cleanup
sink = await EmergentSink.connect("my_sink")
# ... use sink ...
await sink.disconnect()

The SDK subscribes to system.shutdown internally. When the Emergent engine signals a graceful shutdown, active message streams close automatically.

Helper Functions

run_source, run_handler, and run_sink eliminate connection and signal-handling boilerplate. Each helper connects, sets up SIGTERM/SIGINT handlers, runs your callback, and disconnects on completion:

import asyncio
from emergent import run_source, run_handler, run_sink, create_message

# Source -- custom event loop with shutdown signal
async def timer_logic(source, shutdown_event):
    count = 0
    while not shutdown_event.is_set():
        try:
            await asyncio.wait_for(shutdown_event.wait(), timeout=3.0)
            break
        except asyncio.TimeoutError:
            count += 1
            await source.publish(
                create_message("timer.tick").payload({"count": count})
            )

await run_source("my_timer", timer_logic)

# Handler -- called once per message
async def process(msg, handler):
    await handler.publish(
        create_message("processed").caused_by(msg.id).payload({"done": True})
    )

await run_handler("my_handler", ["raw.event"], process)

# Sink -- called once per message
async def consume(msg):
    print(msg.payload)

await run_sink("my_sink", ["timer.tick"], consume)

The name argument is optional. When omitted or set to None, the helper reads from the EMERGENT_NAME environment variable.

Error Handling

All SDK errors extend EmergentError and include a machine-readable code property. Catch specific error types for precise control:

from emergent import (
    EmergentSource,
    SocketNotFoundError,
    ConnectionError,
    TimeoutError,
)

try:
    source = await EmergentSource.connect("my_source")
except SocketNotFoundError as e:
    print(f"Engine not running at: {e.socket_path}")
except TimeoutError as e:
    print(f"Timed out after {e.timeout}s")
except ConnectionError as e:
    print(f"Connection failed: {e}")

Error Types

Error Code Extra Fields
ConnectionError CONNECTION_FAILED
SocketNotFoundError SOCKET_NOT_FOUND socket_path
TimeoutError TIMEOUT timeout
ProtocolError PROTOCOL_ERROR
SubscriptionError SUBSCRIPTION_FAILED message_types
PublishError PUBLISH_FAILED message_type
DiscoveryError DISCOVERY_FAILED
DisposedError DISPOSED
ValidationError VALIDATION_ERROR field

Message Shape

Every message flowing through Emergent follows the same envelope:

Field Type Description
id str Unique TypeID (msg_<uuidv7>)
message_type str Routing key (e.g., "timer.tick")
source str Name of the publishing primitive
correlation_id str | None Links related messages
causation_id str | None ID of the triggering message
timestamp_ms int Creation time (Unix ms)
payload Any User-defined data
metadata dict[str, Any] | None Optional tracing/debug data

Use msg.payload_as(MyModel) to validate and convert the payload to a typed Pydantic model or any other type.

System Events

The Emergent engine broadcasts lifecycle events that your primitives can subscribe to:

Event Pattern Payload Type Fired When
system.started.<name> SystemEventPayload Primitive started
system.stopped.<name> SystemEventPayload Primitive stopped
system.error.<name> SystemEventPayload Primitive failed
system.shutdown SystemShutdownPayload Engine shutting down

Use the typed payload classes for safe access:

from emergent import SystemEventPayload

if msg.message_type.startswith("system.started."):
    event = msg.payload_as(SystemEventPayload)
    print(f"{event.name} ({event.kind}) started with PID {event.pid}")

if msg.message_type.startswith("system.error."):
    event = msg.payload_as(SystemEventPayload)
    if event.is_error():
        print(f"{event.name} failed: {event.error}")

Requirements

  • Python 3.12 or later
  • A running Emergent engine with the EMERGENT_SOCKET environment variable set

License

MIT OR Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

emergent_client-0.13.1.tar.gz (66.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

emergent_client-0.13.1-py3-none-any.whl (33.2 kB view details)

Uploaded Python 3

File details

Details for the file emergent_client-0.13.1.tar.gz.

File metadata

  • Download URL: emergent_client-0.13.1.tar.gz
  • Upload date:
  • Size: 66.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for emergent_client-0.13.1.tar.gz
Algorithm Hash digest
SHA256 4a25ec6ac6c699f13c617bc7d1b3db74b58f7ff647ccbc2aa9a798e8d5f268dc
MD5 8c562e03eb8157a9fb2bdc56652fce6d
BLAKE2b-256 03bd5f588782a004b9f6ac7cf1691f2521aeac42d06b9ac81dec2e291ef77ac7

See more details on using hashes here.

Provenance

The following attestation bundles were made for emergent_client-0.13.1.tar.gz:

Publisher: workflow-pypi.yml on Govcraft/emergent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file emergent_client-0.13.1-py3-none-any.whl.

File metadata

File hashes

Hashes for emergent_client-0.13.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b2bb3b255346238c11b14b1e7a8b2bbbfe20823ad6b990200aae08a8a89fee6d
MD5 9c5af1821615047d1c520a593b4a082e
BLAKE2b-256 a82ed443a1a5d23c367f1413e42179b18caf6e0fb3deb2a60a834773a946997e

See more details on using hashes here.

Provenance

The following attestation bundles were made for emergent_client-0.13.1-py3-none-any.whl:

Publisher: workflow-pypi.yml on Govcraft/emergent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page