Real-time SimPy control plane to dynamically update parameters and stream outputs via external systems like Kafka, Redis, or Postgres. Built for event-driven digital twins.
Project description
Dynamic DES
Real-time SimPy control plane for event-driven digital twins.
Dynamic DES bridges the gap between static discrete-event simulations and the live world. It allows you to update simulation parameters (arrivals, service times, capacities) and stream telemetry via Kafka, Redis, or PostgreSQL without stopping the simulation.
Key Features
- ⚡ Real-Time Control: Synchronize SimPy with the system clock using
DynamicRealtimeEnvironment. - 🔗 Dynamic Registry: Dynamic, path-based updates (e.g.,
Line_A.arrival.rate) that trigger instant logic changes. - 🚀 High Throughput: Optimized to handle high throughput using
orjsonand local batching. - 🔋 Flexible Resources:
DynamicResourceprovides prioritized queuing with graceful capacity shrinking. - 🔌 Modular Connectors: Plugin-based architecture for Kafka, Redis, Postgres and Local testing.
- 📊 System Observability: Built-in lag monitoring to track simulation drift from real-world time, exposed via the telemetry stream.
Installation
Install the core library:
pip install dynamic-des
To include specific backends:
# For Kafka support
pip install "dynamic-des[kafka]"
# For Kafka and Dashboard support
pip install "dynamic-des[kafka,dashboard]"
# For all backends (Kafka, Redis, Postgres, Dashboard)
pip install "dynamic-des[all]"
Quick Start: Zero-Setup Demos
Dynamic DES comes with built-in examples and infrastructure orchestration so you can see it in action immediately.
Run the local, dependency-free simulation:
ddes-local-example
Run the full Real-Time Digital Twin stack with Kafka and a live UI:
# Start the background Kafka cluster (requires Docker)
ddes-kafka-infra-up
# Open a new terminal and run the simulation
# Ctrl + C to stop
ddes-kafka-example
# Open a new terminal and start the control dashboard (opens in browser)
# Visit http://localhost:8080
# Ctrl + C to stop
ddes-kafka-dashboard
# Clean up the infrastructure when finished
ddes-kafka-infra-down
Building Your Own Simulation (Local Example)
The following snippet demonstrates a simple example. It initializes a production line, schedules an external capacity update, and streams telemetry to the console.
import logging
from dynamic_des import (
CapacityConfig, ConsoleEgress, DistributionConfig,
DynamicRealtimeEnvironment, DynamicResource, LocalIngress, SimParameter
)
logging.basicConfig(
level=logging.INFO, format="%(levelname)s [%(asctime)s] %(message)s"
)
logger = logging.getLogger("local_example")
# 1. Define initial system state
params = SimParameter(
sim_id="Line_A",
arrival={"standard": DistributionConfig(dist="exponential", rate=1)},
resources={"lathe": CapacityConfig(current_cap=1, max_cap=5)},
)
# 2. Setup Environment with Local Connectors
# Schedule capacity to jump from 1 to 3 at t=5s
ingress = LocalIngress([(5.0, "Line_A.resources.lathe.current_cap", 3)])
egress = ConsoleEgress()
env = DynamicRealtimeEnvironment(factor=1.0)
env.registry.register_sim_parameter(params)
env.setup_ingress([ingress])
env.setup_egress([egress])
# 3. Create Resource
res = DynamicResource(env, "Line_A", "lathe")
def telemetry_monitor(env: DynamicRealtimeEnvironment, res: DynamicResource):
"""Streams system health metrics every 2 seconds."""
while True:
env.publish_telemetry("Line_A.resources.lathe.capacity", res.capacity)
yield env.timeout(2.0)
env.process(telemetry_monitor(env, res))
# 4. Run
print("Simulation started. Watch capacity change at t=5s...")
try:
env.run(until=10.1)
finally:
env.teardown()
What this does
- Registry Initialization: The
SimParameterdefines the initial state. The Registry flattens this into addressable paths (e.g.,Line_A.resources.lathe.current_cap). - Live Ingress: The
LocalIngresssimulates an external event (like a Kafka message) arriving 5 seconds into the run. - Zero-Polling Update: The
DynamicResourcelistens to the Registry. The moment the ingress updates the value, the resource automatically expands its internal token pool without any manual checking. - Telemetry Egress: The
ConsoleEgressprints system vitals to your terminal, mimicking a live dashboard feed.
Data Egress JSON Schemas
To ensure strict data contracts with external consumers (like Kafka, Redis, or PostgreSQL), dynamic-des uses Pydantic to validate all outbound payloads. Users can expect two distinct JSON structures depending on the stream type:
Telemetry Stream
Used for scalar metrics like resource utilization, queue lengths, or simulation lag.
{
"stream_type": "telemetry",
"path_id": "Line_A.resources.lathe.utilization",
"value": 85.5,
"sim_ts": 120.5,
"timestamp": "2023-10-25T14:30:00.000Z"
}
Event Stream
Used for discrete task lifecycle events (e.g., a part arriving, entering a queue, or finishing processing).
{
"stream_type": "event",
"key": "task-001",
"value": {
"status": "finished",
"duration": 45.2,
"path_id": "Line_A.service.lathe"
},
"sim_ts": 125.0,
"timestamp": "2023-10-25T14:30:04.500Z"
}
More Examples
For more examples, including implementations using Kafka providers, please explore the examples folder.
Core Concepts
Dynamic DES is built on the Switchboard Pattern, decoupling data sourcing from simulation logic.
Switchboard Pattern
Instead of resources polling Kafka directly, the architecture is split into three layers:
- Connectors (Ingress/Egress): Background threads handle heavy I/O (Kafka, Redis).
- Registry (Switchboard): A centralized state manager that flattens data into dot-notation paths.
- Resources (SimPy Objects): Passive observers that "wake up" only when the Registry signals a change.
Event-Driven Capacity
Standard SimPy resources have static capacities. DynamicResource wraps a Container and a PriorityStore. When the Registry updates:
- Growing: Extra tokens are added to the pool immediately.
- Shrinking: The resource requests tokens back. If they are busy, it waits until they are released, ensuring no work-in-progress is lost.
High-Throughput Events
To handle high throughput, the EgressMixIn uses:
- Batching: Pushing lists of events to the I/O thread to reduce lock contention.
- orjson: Rust-powered serialization for maximum speed.
Documentation
For full documentation, architecture details, and API reference, visit: https://jaehyeon.me/dynamic-des/.
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dynamic_des-0.3.0.tar.gz.
File metadata
- Download URL: dynamic_des-0.3.0.tar.gz
- Upload date:
- Size: 29.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
266d8bdcefa4090c2f768b0f2f95aacaedc02f49019cf4d9d94bbd411a2571df
|
|
| MD5 |
d5888af8bac6ba119e2ee53bf8d5c1d6
|
|
| BLAKE2b-256 |
f35329bdfa1792d0e3a9103db59e5fb91f1a6a93f6f143e4a831fa9ea4b38b54
|
Provenance
The following attestation bundles were made for dynamic_des-0.3.0.tar.gz:
Publisher:
publish.yml on jaehyeon-kim/dynamic-des
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dynamic_des-0.3.0.tar.gz -
Subject digest:
266d8bdcefa4090c2f768b0f2f95aacaedc02f49019cf4d9d94bbd411a2571df - Sigstore transparency entry: 1249176261
- Sigstore integration time:
-
Permalink:
jaehyeon-kim/dynamic-des@b840cc1dc331525014043159881e4e09082efd68 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/jaehyeon-kim
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b840cc1dc331525014043159881e4e09082efd68 -
Trigger Event:
release
-
Statement type:
File details
Details for the file dynamic_des-0.3.0-py3-none-any.whl.
File metadata
- Download URL: dynamic_des-0.3.0-py3-none-any.whl
- Upload date:
- Size: 45.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d537a7a6aada5b5c863c2aaa7fc68ec84ccf0b291b167d456d1f7a388e1535ce
|
|
| MD5 |
a46548087bdf335d1fdb157702b8b40c
|
|
| BLAKE2b-256 |
a1250b630f99cbb39d8eca92ca9cec7c1cc41c7b7e0001258c5a07ba9299f2c7
|
Provenance
The following attestation bundles were made for dynamic_des-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on jaehyeon-kim/dynamic-des
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dynamic_des-0.3.0-py3-none-any.whl -
Subject digest:
d537a7a6aada5b5c863c2aaa7fc68ec84ccf0b291b167d456d1f7a388e1535ce - Sigstore transparency entry: 1249176353
- Sigstore integration time:
-
Permalink:
jaehyeon-kim/dynamic-des@b840cc1dc331525014043159881e4e09082efd68 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/jaehyeon-kim
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b840cc1dc331525014043159881e4e09082efd68 -
Trigger Event:
release
-
Statement type: