Skip to main content

Python SDK for the Emergent event-driven workflow platform

Project description

emergent-client

Python SDK for building event-driven workflows on the Emergent engine. Connect to a running engine over Unix IPC and publish or consume messages through three typed primitives: Source, Handler, and Sink.

from emergent import EmergentSink

async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
    print(msg.payload)

Install

pip install emergent-client

Or with uv:

uv add emergent-client

Then import:

from emergent import EmergentSource, EmergentHandler, EmergentSink

Three Primitives

Every Emergent workflow is composed of Sources, Handlers, and Sinks. Each primitive has a single, well-defined role:

Primitive Subscribe Publish Role
Source -- Yes Ingress -- bring data into the system
Handler Yes Yes Processing -- transform, enrich, or route
Sink Yes -- Egress -- persist, display, or forward data

Quick Start

Sink -- consume messages

A Sink subscribes to message types and processes each one as it arrives. EmergentSink.messages is a convenience method that connects, subscribes, and yields messages in a single call:

from emergent import EmergentSink

async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
    data = msg.payload_as(dict)
    print(f"Tick #{data['sequence']}")

For explicit lifecycle control, connect and subscribe separately:

async with await EmergentSink.connect("my_sink") as sink:
    async with await sink.subscribe(["timer.tick", "timer.filtered"]) as stream:
        async for msg in stream:
            print(msg.message_type, msg.payload)

Source -- publish messages

A Source publishes messages into the engine. It cannot subscribe:

from emergent import EmergentSource

async with await EmergentSource.connect("my_source") as source:
    await source.publish("sensor.reading", {"value": 42.5, "unit": "celsius"})

Handler -- subscribe and publish

A Handler subscribes to incoming messages and publishes new ones. Use caused_by to link output messages to the input that triggered them:

from emergent import EmergentHandler, create_message

async with await EmergentHandler.connect("order_processor") as handler:
    async with await handler.subscribe(["order.created"]) as stream:
        async for msg in stream:
            await handler.publish(
                create_message("order.processed")
                .caused_by(msg.id)
                .payload({"status": "ok"})
            )

Publishing Messages

Every primitive that can publish supports three calling styles. All three produce the same result:

# Shorthand -- type string and payload dict
await source.publish("timer.tick", {"count": 1})

# MessageBuilder -- fluent API with auto-build
await source.publish(
    create_message("timer.tick").payload({"count": 1})
)

# Full EmergentMessage object
await source.publish(message)

Building Messages

create_message returns a fluent builder for constructing immutable EmergentMessage instances:

from emergent import create_message

msg = (
    create_message("sensor.reading")
    .payload({"value": 42.5, "unit": "celsius"})
    .metadata({"sensor_id": "temp-01", "location": "room-a"})
    .build()
)

Link messages into traceable chains with caused_by and correlated_with:

reply = (
    create_message("order.confirmed")
    .caused_by(original_msg.id)
    .correlated_with(request_id)
    .payload({"confirmed": True})
    .build()
)

The builder sets id (TypeID format) and timestamp_ms automatically. Call .build() explicitly when you need the message object, or pass the builder directly to publish(), which calls .build() for you.

Subscribing to Messages

subscribe accepts a list or variadic arguments:

# List form
stream = await sink.subscribe(["timer.tick", "timer.filtered"])

# Variadic form
stream = await sink.subscribe("timer.tick", "timer.filtered")

Iterate over the returned MessageStream with async for:

async for msg in stream:
    data = msg.payload_as(dict)
    print(data["count"])

MessageStream implements AsyncIterator and the async context manager protocol, so you can use async with for automatic cleanup:

async with await sink.subscribe(["timer.tick"]) as stream:
    async for msg in stream:
        print(msg.payload)

Typed payloads with Pydantic

payload_as validates dict payloads against Pydantic models automatically:

from pydantic import BaseModel

class SensorReading(BaseModel):
    value: float
    unit: str

async for msg in EmergentSink.messages("my_sink", ["sensor.reading"]):
    reading = msg.payload_as(SensorReading)
    print(f"{reading.value} {reading.unit}")

Resource Cleanup

All primitives implement the async context manager protocol. Use async with for automatic cleanup (recommended), or call close() / disconnect() manually:

# Automatic cleanup (recommended)
async with await EmergentSink.connect("my_sink") as sink:
    ...

# Manual cleanup
sink = await EmergentSink.connect("my_sink")
# ... use sink ...
await sink.disconnect()

The SDK subscribes to system.shutdown internally. When the Emergent engine signals a graceful shutdown, active message streams close automatically.

Helper Functions

run_source, run_handler, and run_sink eliminate connection and signal-handling boilerplate. Each helper connects, sets up SIGTERM/SIGINT handlers, runs your callback, and disconnects on completion:

import asyncio
from emergent import run_source, run_handler, run_sink, create_message

# Source -- custom event loop with shutdown signal
async def timer_logic(source, shutdown_event):
    count = 0
    while not shutdown_event.is_set():
        try:
            await asyncio.wait_for(shutdown_event.wait(), timeout=3.0)
            break
        except asyncio.TimeoutError:
            count += 1
            await source.publish(
                create_message("timer.tick").payload({"count": count})
            )

await run_source("my_timer", timer_logic)

# Handler -- called once per message
async def process(msg, handler):
    await handler.publish(
        create_message("processed").caused_by(msg.id).payload({"done": True})
    )

await run_handler("my_handler", ["raw.event"], process)

# Sink -- called once per message
async def consume(msg):
    print(msg.payload)

await run_sink("my_sink", ["timer.tick"], consume)

The name argument is optional. When omitted or set to None, the helper reads from the EMERGENT_NAME environment variable.

Error Handling

All SDK errors extend EmergentError and include a machine-readable code property. Catch specific error types for precise control:

from emergent import (
    EmergentSource,
    SocketNotFoundError,
    ConnectionError,
    TimeoutError,
)

try:
    source = await EmergentSource.connect("my_source")
except SocketNotFoundError as e:
    print(f"Engine not running at: {e.socket_path}")
except TimeoutError as e:
    print(f"Timed out after {e.timeout}s")
except ConnectionError as e:
    print(f"Connection failed: {e}")

Error Types

Error Code Extra Fields
ConnectionError CONNECTION_FAILED
SocketNotFoundError SOCKET_NOT_FOUND socket_path
TimeoutError TIMEOUT timeout
ProtocolError PROTOCOL_ERROR
SubscriptionError SUBSCRIPTION_FAILED message_types
PublishError PUBLISH_FAILED message_type
DiscoveryError DISCOVERY_FAILED
DisposedError DISPOSED
ValidationError VALIDATION_ERROR field

Message Shape

Every message flowing through Emergent follows the same envelope:

Field Type Description
id str Unique TypeID (msg_<uuidv7>)
message_type str Routing key (e.g., "timer.tick")
source str Name of the publishing primitive
correlation_id str | None Links related messages
causation_id str | None ID of the triggering message
timestamp_ms int Creation time (Unix ms)
payload Any User-defined data
metadata dict[str, Any] | None Optional tracing/debug data

Use msg.payload_as(MyModel) to validate and convert the payload to a typed Pydantic model or any other type.

System Events

The Emergent engine broadcasts lifecycle events that your primitives can subscribe to:

Event Pattern Payload Type Fired When
system.started.<name> SystemEventPayload Primitive started
system.stopped.<name> SystemEventPayload Primitive stopped
system.error.<name> SystemEventPayload Primitive failed
system.shutdown SystemShutdownPayload Engine shutting down

Use the typed payload classes for safe access:

from emergent import SystemEventPayload

if msg.message_type.startswith("system.started."):
    event = msg.payload_as(SystemEventPayload)
    print(f"{event.name} ({event.kind}) started with PID {event.pid}")

if msg.message_type.startswith("system.error."):
    event = msg.payload_as(SystemEventPayload)
    if event.is_error():
        print(f"{event.name} failed: {event.error}")

Requirements

  • Python 3.12 or later
  • A running Emergent engine with the EMERGENT_SOCKET environment variable set

License

MIT OR Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

emergent_client-0.10.0.tar.gz (60.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

emergent_client-0.10.0-py3-none-any.whl (29.6 kB view details)

Uploaded Python 3

File details

Details for the file emergent_client-0.10.0.tar.gz.

File metadata

  • Download URL: emergent_client-0.10.0.tar.gz
  • Upload date:
  • Size: 60.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for emergent_client-0.10.0.tar.gz
Algorithm Hash digest
SHA256 6176bbfda2c2215e659ccb0ba091021bccc439c9afaa356257725be2a5435a60
MD5 0bd7d5bb0a9c0c5451e383c2ff9b79d1
BLAKE2b-256 02fe2993cd5035d7766290d38ae1cf4c1f7befab6666a0a5bd7e0554581e6b2b

See more details on using hashes here.

Provenance

The following attestation bundles were made for emergent_client-0.10.0.tar.gz:

Publisher: workflow-pypi.yml on Govcraft/emergent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file emergent_client-0.10.0-py3-none-any.whl.

File metadata

File hashes

Hashes for emergent_client-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5846ebbcae2e5d9f714ea2b4b95bc7a2869b893844ff58cf094262d94791bdea
MD5 90c5da6109d0730dae9832035e4bc086
BLAKE2b-256 bdb4645b21cf9cfd009639f26ee51b40d2f14de7a0afea0129372e2eab5a13f3

See more details on using hashes here.

Provenance

The following attestation bundles were made for emergent_client-0.10.0-py3-none-any.whl:

Publisher: workflow-pypi.yml on Govcraft/emergent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page