Skip to main content

Real-time SimPy control plane to dynamically update parameters and stream outputs via external systems like Kafka, Redis, or Postgres. Built for event-driven digital twins.

Project description

Dynamic DES

CI Pipeline Documentation PyPI version Python Versions License: MIT

Real-time SimPy control plane for event-driven digital twins.

Dashboard Screenshot

Dynamic DES bridges the gap between static discrete-event simulations and the live world. It allows you to update simulation parameters (arrivals, service times, capacities) and stream telemetry via Kafka, Redis, or PostgreSQL without stopping the simulation.

Key Features

  • ⚡ Real-Time Control: Synchronize SimPy with the system clock using DynamicRealtimeEnvironment.
  • 🔗 Dynamic Registry: Dynamic, path-based updates (e.g., Line_A.arrival.rate) that trigger instant logic changes.
  • 🚀 High Throughput: Optimized to handle high throughput using orjson and local batching.
  • 🔋 Flexible Resources: DynamicResource provides prioritized queuing with graceful capacity shrinking.
  • 🔌 Modular Connectors: Plugin-based architecture for Kafka, Redis, Postgres and Local testing.
  • 📊 System Observability: Built-in lag monitoring to track simulation drift from real-world time, exposed via the telemetry stream.

Installation

Install the core library:

pip install dynamic-des

To include specific backends:

# For Kafka support
pip install "dynamic-des[kafka]"

# For Kafka and Dashboard support
pip install "dynamic-des[kafka,dashboard]"

# For all backends (Kafka, Redis, Postgres, Dashboard)
pip install "dynamic-des[all]"

Quick Start: Zero-Setup Demos

Dynamic DES comes with built-in examples and infrastructure orchestration so you can see it in action immediately.

Run the local, dependency-free simulation:

ddes-local-example

Run the full Real-Time Digital Twin stack with Kafka and a live UI:

# Start the background Kafka cluster (requires Docker)
ddes-kafka-infra-up

# Open a new terminal and run the simulation
# Ctrl + C to stop
ddes-kafka-example

# Open a new terminal and start the control dashboard (opens in browser)
# Visit http://localhost:8080
# Ctrl + C to stop
ddes-kafka-dashboard

# Clean up the infrastructure when finished
ddes-kafka-infra-down

Building Your Own Simulation (Local Example)

The following snippet demonstrates a simple example. It initializes a production line, schedules an external capacity update, and streams telemetry to the console.

import logging
from dynamic_des import (
    CapacityConfig, ConsoleEgress, DistributionConfig,
    DynamicRealtimeEnvironment, DynamicResource, LocalIngress, SimParameter
)

logging.basicConfig(
    level=logging.INFO, format="%(levelname)s [%(asctime)s] %(message)s"
)
logger = logging.getLogger("local_example")

# 1. Define initial system state
params = SimParameter(
    sim_id="Line_A",
    arrival={"standard": DistributionConfig(dist="exponential", rate=1)},
    resources={"lathe": CapacityConfig(current_cap=1, max_cap=5)},
)

# 2. Setup Environment with Local Connectors
# Schedule capacity to jump from 1 to 3 at t=5s
ingress = LocalIngress([(5.0, "Line_A.resources.lathe.current_cap", 3)])
egress = ConsoleEgress()

env = DynamicRealtimeEnvironment(factor=1.0)
env.registry.register_sim_parameter(params)
env.setup_ingress([ingress])
env.setup_egress([egress])

# 3. Create Resource
res = DynamicResource(env, "Line_A", "lathe")

def telemetry_monitor(env: DynamicRealtimeEnvironment, res: DynamicResource):
    """Streams system health metrics every 2 seconds."""
    while True:
        env.publish_telemetry("Line_A.resources.lathe.capacity", res.capacity)
        yield env.timeout(2.0)


env.process(telemetry_monitor(env, res))

# 4. Run
print("Simulation started. Watch capacity change at t=5s...")
try:
    env.run(until=10.1)
finally:
    env.teardown()

What this does

  1. Registry Initialization: The SimParameter defines the initial state. The Registry flattens this into addressable paths (e.g., Line_A.resources.lathe.current_cap).
  2. Live Ingress: The LocalIngress simulates an external event (like a Kafka message) arriving 5 seconds into the run.
  3. Zero-Polling Update: The DynamicResource listens to the Registry. The moment the ingress updates the value, the resource automatically expands its internal token pool without any manual checking.
  4. Telemetry Egress: The ConsoleEgress prints system vitals to your terminal, mimicking a live dashboard feed.

Data Egress JSON Schemas

To ensure strict data contracts with external consumers (like Kafka, Redis, or PostgreSQL), dynamic-des uses Pydantic to validate all outbound payloads. Users can expect two distinct JSON structures depending on the stream type:

Telemetry Stream

Used for scalar metrics like resource utilization, queue lengths, or simulation lag.

{
  "stream_type": "telemetry",
  "path_id": "Line_A.resources.lathe.utilization",
  "value": 85.5,
  "sim_ts": 120.5,
  "timestamp": "2023-10-25T14:30:00.000Z"
}

Event Stream

Used for discrete task lifecycle events (e.g., a part arriving, entering a queue, or finishing processing).

{
  "stream_type": "event",
  "key": "task-001",
  "value": {
    "status": "finished",
    "duration": 45.2,
    "path_id": "Line_A.service.lathe"
  },
  "sim_ts": 125.0,
  "timestamp": "2023-10-25T14:30:04.500Z"
}

More Examples

For more examples, including implementations using Kafka providers, please explore the examples folder.

Core Concepts

Dynamic DES is built on the Switchboard Pattern, decoupling data sourcing from simulation logic.

Switchboard Pattern

Instead of resources polling Kafka directly, the architecture is split into three layers:

  1. Connectors (Ingress/Egress): Background threads handle heavy I/O (Kafka, Redis).
  2. Registry (Switchboard): A centralized state manager that flattens data into dot-notation paths.
  3. Resources (SimPy Objects): Passive observers that "wake up" only when the Registry signals a change.

Event-Driven Capacity

Standard SimPy resources have static capacities. DynamicResource wraps a Container and a PriorityStore. When the Registry updates:

  • Growing: Extra tokens are added to the pool immediately.
  • Shrinking: The resource requests tokens back. If they are busy, it waits until they are released, ensuring no work-in-progress is lost.

High-Throughput Events

To handle high throughput, the EgressMixIn uses:

  • Batching: Pushing lists of events to the I/O thread to reduce lock contention.
  • orjson: Rust-powered serialization for maximum speed.

Documentation

For full documentation, architecture details, and API reference, visit: https://jaehyeon.me/dynamic-des/.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamic_des-0.3.0.tar.gz (29.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamic_des-0.3.0-py3-none-any.whl (45.0 kB view details)

Uploaded Python 3

File details

Details for the file dynamic_des-0.3.0.tar.gz.

File metadata

  • Download URL: dynamic_des-0.3.0.tar.gz
  • Upload date:
  • Size: 29.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamic_des-0.3.0.tar.gz
Algorithm Hash digest
SHA256 266d8bdcefa4090c2f768b0f2f95aacaedc02f49019cf4d9d94bbd411a2571df
MD5 d5888af8bac6ba119e2ee53bf8d5c1d6
BLAKE2b-256 f35329bdfa1792d0e3a9103db59e5fb91f1a6a93f6f143e4a831fa9ea4b38b54

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamic_des-0.3.0.tar.gz:

Publisher: publish.yml on jaehyeon-kim/dynamic-des

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dynamic_des-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: dynamic_des-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 45.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamic_des-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d537a7a6aada5b5c863c2aaa7fc68ec84ccf0b291b167d456d1f7a388e1535ce
MD5 a46548087bdf335d1fdb157702b8b40c
BLAKE2b-256 a1250b630f99cbb39d8eca92ca9cec7c1cc41c7b7e0001258c5a07ba9299f2c7

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamic_des-0.3.0-py3-none-any.whl:

Publisher: publish.yml on jaehyeon-kim/dynamic-des

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page