Python SDK for the Emergent event-driven workflow platform
Project description
emergent-client
Python SDK for building event-driven workflows on the Emergent engine. Connect to a running engine over Unix IPC and publish or consume messages through three typed primitives: Source, Handler, and Sink.
from emergent import EmergentSink
async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
print(msg.payload)
Install
pip install emergent-client
Or with uv:
uv add emergent-client
Then import:
from emergent import EmergentSource, EmergentHandler, EmergentSink
Three Primitives
Every Emergent workflow is composed of Sources, Handlers, and Sinks. Each primitive has a single, well-defined role:
| Primitive | Subscribe | Publish | Role |
|---|---|---|---|
| Source | -- | Yes | Ingress -- bring data into the system |
| Handler | Yes | Yes | Processing -- transform, enrich, or route |
| Sink | Yes | -- | Egress -- persist, display, or forward data |
Quick Start
Sink -- consume messages
A Sink subscribes to message types and processes each one as it arrives.
EmergentSink.messages is a convenience method that connects, subscribes, and
yields messages in a single call:
from emergent import EmergentSink
async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
data = msg.payload_as(dict)
print(f"Tick #{data['sequence']}")
For explicit lifecycle control, connect and subscribe separately:
async with await EmergentSink.connect("my_sink") as sink:
async with await sink.subscribe(["timer.tick", "timer.filtered"]) as stream:
async for msg in stream:
print(msg.message_type, msg.payload)
Source -- publish messages
A Source publishes messages into the engine. It cannot subscribe:
from emergent import EmergentSource
async with await EmergentSource.connect("my_source") as source:
await source.publish("sensor.reading", {"value": 42.5, "unit": "celsius"})
Handler -- subscribe and publish
A Handler subscribes to incoming messages and publishes new ones. Use
caused_by to link output messages to the input that triggered them:
from emergent import EmergentHandler, create_message
async with await EmergentHandler.connect("order_processor") as handler:
async with await handler.subscribe(["order.created"]) as stream:
async for msg in stream:
await handler.publish(
create_message("order.processed")
.caused_by(msg.id)
.payload({"status": "ok"})
)
Publishing Messages
Every primitive that can publish supports three calling styles. All three produce the same result:
# Shorthand -- type string and payload dict
await source.publish("timer.tick", {"count": 1})
# MessageBuilder -- fluent API with auto-build
await source.publish(
create_message("timer.tick").payload({"count": 1})
)
# Full EmergentMessage object
await source.publish(message)
Building Messages
create_message returns a fluent builder for constructing immutable
EmergentMessage instances:
from emergent import create_message
msg = (
create_message("sensor.reading")
.payload({"value": 42.5, "unit": "celsius"})
.metadata({"sensor_id": "temp-01", "location": "room-a"})
.build()
)
Link messages into traceable chains with caused_by and correlated_with:
reply = (
create_message("order.confirmed")
.caused_by(original_msg.id)
.correlated_with(request_id)
.payload({"confirmed": True})
.build()
)
The builder sets id (TypeID format) and timestamp_ms automatically. Call
.build() explicitly when you need the message object, or pass the builder
directly to publish(), which calls .build() for you.
Subscribing to Messages
subscribe accepts a list or variadic arguments:
# List form
stream = await sink.subscribe(["timer.tick", "timer.filtered"])
# Variadic form
stream = await sink.subscribe("timer.tick", "timer.filtered")
Iterate over the returned MessageStream with async for:
async for msg in stream:
data = msg.payload_as(dict)
print(data["count"])
MessageStream implements AsyncIterator and the async context manager
protocol, so you can use async with for automatic cleanup:
async with await sink.subscribe(["timer.tick"]) as stream:
async for msg in stream:
print(msg.payload)
Typed payloads with Pydantic
payload_as validates dict payloads against Pydantic models automatically:
from pydantic import BaseModel
class SensorReading(BaseModel):
value: float
unit: str
async for msg in EmergentSink.messages("my_sink", ["sensor.reading"]):
reading = msg.payload_as(SensorReading)
print(f"{reading.value} {reading.unit}")
Resource Cleanup
All primitives implement the async context manager protocol. Use async with
for automatic cleanup (recommended), or call close() / disconnect()
manually:
# Automatic cleanup (recommended)
async with await EmergentSink.connect("my_sink") as sink:
...
# Manual cleanup
sink = await EmergentSink.connect("my_sink")
# ... use sink ...
await sink.disconnect()
The SDK subscribes to system.shutdown internally. When the Emergent engine
signals a graceful shutdown, active message streams close automatically.
Helper Functions
run_source, run_handler, and run_sink eliminate connection and
signal-handling boilerplate. Each helper connects, sets up SIGTERM/SIGINT
handlers, runs your callback, and disconnects on completion:
import asyncio
from emergent import run_source, run_handler, run_sink, create_message
# Source -- custom event loop with shutdown signal
async def timer_logic(source, shutdown_event):
count = 0
while not shutdown_event.is_set():
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=3.0)
break
except asyncio.TimeoutError:
count += 1
await source.publish(
create_message("timer.tick").payload({"count": count})
)
await run_source("my_timer", timer_logic)
# Handler -- called once per message
async def process(msg, handler):
await handler.publish(
create_message("processed").caused_by(msg.id).payload({"done": True})
)
await run_handler("my_handler", ["raw.event"], process)
# Sink -- called once per message
async def consume(msg):
print(msg.payload)
await run_sink("my_sink", ["timer.tick"], consume)
The name argument is optional. When omitted or set to None, the helper reads
from the EMERGENT_NAME environment variable.
Error Handling
All SDK errors extend EmergentError and include a machine-readable code
property. Catch specific error types for precise control:
from emergent import (
EmergentSource,
SocketNotFoundError,
ConnectionError,
TimeoutError,
)
try:
source = await EmergentSource.connect("my_source")
except SocketNotFoundError as e:
print(f"Engine not running at: {e.socket_path}")
except TimeoutError as e:
print(f"Timed out after {e.timeout}s")
except ConnectionError as e:
print(f"Connection failed: {e}")
Error Types
| Error | Code | Extra Fields |
|---|---|---|
ConnectionError |
CONNECTION_FAILED |
|
SocketNotFoundError |
SOCKET_NOT_FOUND |
socket_path |
TimeoutError |
TIMEOUT |
timeout |
ProtocolError |
PROTOCOL_ERROR |
|
SubscriptionError |
SUBSCRIPTION_FAILED |
message_types |
PublishError |
PUBLISH_FAILED |
message_type |
DiscoveryError |
DISCOVERY_FAILED |
|
DisposedError |
DISPOSED |
|
ValidationError |
VALIDATION_ERROR |
field |
Message Shape
Every message flowing through Emergent follows the same envelope:
| Field | Type | Description |
|---|---|---|
id |
str |
Unique TypeID (msg_<uuidv7>) |
message_type |
str |
Routing key (e.g., "timer.tick") |
source |
str |
Name of the publishing primitive |
correlation_id |
str | None |
Links related messages |
causation_id |
str | None |
ID of the triggering message |
timestamp_ms |
int |
Creation time (Unix ms) |
payload |
Any |
User-defined data |
metadata |
dict[str, Any] | None |
Optional tracing/debug data |
Use msg.payload_as(MyModel) to validate and convert the payload to a typed
Pydantic model or any other type.
System Events
The Emergent engine broadcasts lifecycle events that your primitives can subscribe to:
| Event Pattern | Payload Type | Fired When |
|---|---|---|
system.started.<name> |
SystemEventPayload |
Primitive started |
system.stopped.<name> |
SystemEventPayload |
Primitive stopped |
system.error.<name> |
SystemEventPayload |
Primitive failed |
system.shutdown |
SystemShutdownPayload |
Engine shutting down |
Use the typed payload classes for safe access:
from emergent import SystemEventPayload
if msg.message_type.startswith("system.started."):
event = msg.payload_as(SystemEventPayload)
print(f"{event.name} ({event.kind}) started with PID {event.pid}")
if msg.message_type.startswith("system.error."):
event = msg.payload_as(SystemEventPayload)
if event.is_error():
print(f"{event.name} failed: {event.error}")
Requirements
- Python 3.12 or later
- A running Emergent engine with the
EMERGENT_SOCKETenvironment variable set
License
MIT OR Apache-2.0
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file emergent_client-0.7.2.tar.gz.
File metadata
- Download URL: emergent_client-0.7.2.tar.gz
- Upload date:
- Size: 60.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f7370d431b3f14ea5c2ecab30b3a8dfb12321cf60f9ef4bc0d6e345e5c71f84f
|
|
| MD5 |
d0c914084beb94d217876299e30bc40b
|
|
| BLAKE2b-256 |
b439efe3ddc075064d5f0fbd1a62778084ac728c3a43e28bf08731ddc73ccd96
|
Provenance
The following attestation bundles were made for emergent_client-0.7.2.tar.gz:
Publisher:
workflow-pypi.yml on Govcraft/emergent
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
emergent_client-0.7.2.tar.gz -
Subject digest:
f7370d431b3f14ea5c2ecab30b3a8dfb12321cf60f9ef4bc0d6e345e5c71f84f - Sigstore transparency entry: 1093962004
- Sigstore integration time:
-
Permalink:
Govcraft/emergent@c59e0219fd590892a28af4a911241651a34aacf6 -
Branch / Tag:
refs/tags/v0.8.2 - Owner: https://github.com/Govcraft
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow-pypi.yml@c59e0219fd590892a28af4a911241651a34aacf6 -
Trigger Event:
push
-
Statement type:
File details
Details for the file emergent_client-0.7.2-py3-none-any.whl.
File metadata
- Download URL: emergent_client-0.7.2-py3-none-any.whl
- Upload date:
- Size: 29.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
baa92db8d881146b0db4a031cf7a6cfc217677f545414dd7b40ec7b3ee13bda6
|
|
| MD5 |
137a27378872981d5e0dad1339bbecd6
|
|
| BLAKE2b-256 |
c0eeaa231601b52c5495788b8edd4b83c7403a15b6e344e38109ed9a91091f85
|
Provenance
The following attestation bundles were made for emergent_client-0.7.2-py3-none-any.whl:
Publisher:
workflow-pypi.yml on Govcraft/emergent
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
emergent_client-0.7.2-py3-none-any.whl -
Subject digest:
baa92db8d881146b0db4a031cf7a6cfc217677f545414dd7b40ec7b3ee13bda6 - Sigstore transparency entry: 1093962008
- Sigstore integration time:
-
Permalink:
Govcraft/emergent@c59e0219fd590892a28af4a911241651a34aacf6 -
Branch / Tag:
refs/tags/v0.8.2 - Owner: https://github.com/Govcraft
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow-pypi.yml@c59e0219fd590892a28af4a911241651a34aacf6 -
Trigger Event:
push
-
Statement type: