Skip to main content

Real-time SimPy control plane to dynamically update parameters and stream outputs via external systems like Kafka, Redis, or Postgres. Built for event-driven digital twins.

Project description

Dynamic DES

CI Pipeline Documentation PyPI version Python Versions License: MIT

Real-time SimPy control plane for event-driven digital twins.

Dashboard Screenshot

Dynamic DES bridges the gap between static discrete-event simulations and the live world. It allows you to update simulation parameters (arrivals, service times, capacities) and stream telemetry via Kafka, Redis, or PostgreSQL without stopping the simulation. Beyond live streaming, it transforms static models into synchronized forecasting engines, enabling rapid historical data generation and future state prediction. Export compressed, chunked datasets (Parquet, JSONL) directly to local storage, AWS S3, Google Cloud Storage, Azure Blob, or SeaweedFS using PyArrow VFS, complete with strict schema drift prevention.

Key Features

  • ⚡ Real-Time Control: Synchronize SimPy with the system clock using DynamicRealtimeEnvironment.
  • 🔗 Dynamic Registry: Dynamic, path-based updates (e.g., Line_A.arrival.rate) that trigger instant logic changes.
  • 🚀 High Throughput: Optimized to handle high throughput using orjson and local batching.
  • 🛡️ Enterprise Ready: Native **kwargs passthrough for SASL, mTLS, OAuth, and AWS IAM Kafka clusters.
  • 📦 Pluggable Serialization: Stream lightweight JSON by default, or map specific ML topics to lazy-loaded Avro/Schema Registry serializers (Confluent & AWS Glue).
  • 🗄️ Data Lake Ingestion: Native PyArrow VFS integration for fast chunked writing (Parquet/JSONL) directly to object storage, with built-in schema inference and drift enforcement.
  • 🦆 Pydantic Duck-Typing: Seamlessly publish strictly-typed Pydantic V2 models straight from your simulation logic.
  • 🌍 Domain Agnostic: Perfect for factory floors, crypto trading bots, or RPG game state management.

Installation

Install the core library:

pip install dynamic-des

To include specific backends and enterprise features:

# For Kafka support
pip install "dynamic-des[kafka]"

# For Confluent Schema Registry (Avro)
pip install "dynamic-des[kafka,confluent]"

# For AWS Glue Schema Registry (Avro)
pip install "dynamic-des[kafka,glue]"

# For Data Lake Storage (Parquet & PyArrow VFS)
pip install "dynamic-des[parquet]"

# For all backends (Kafka, Redis, Postgres, Dashboard, Avro, Parquet)
pip install "dynamic-des[all]"

Quick Start: Zero-Setup Demos

Dynamic DES comes with built-in examples and infrastructure orchestration so you can see it in action immediately.

Run the local, dependency-free simulation:

ddes-local-example

Run the full Real-Time Digital Twin stack with Kafka and a live UI:

# Start the background Kafka cluster (requires Docker)
ddes-kafka-infra-up

# Open a new terminal and run the simulation
# Ctrl + C to stop
ddes-kafka-example

# Open a new terminal and start the control dashboard (opens in browser)
# Visit http://localhost:8080
# Ctrl + C to stop
ddes-kafka-dashboard

# Clean up the infrastructure when finished
ddes-kafka-infra-down

Building Your Own Simulation (Local Example)

The following snippet demonstrates a simple example. It initializes a production line, schedules an external capacity update, and streams telemetry to the console.

import logging
from dynamic_des import (
    CapacityConfig, ConsoleEgress, DistributionConfig,
    DynamicRealtimeEnvironment, DynamicResource, LocalIngress, SimParameter
)

logging.basicConfig(
    level=logging.INFO, format="%(levelname)s [%(asctime)s] %(message)s"
)
logger = logging.getLogger("local_example")

# 1. Define initial system state
params = SimParameter(
    sim_id="Line_A",
    arrival={"standard": DistributionConfig(dist="exponential", rate=1)},
    resources={"lathe": CapacityConfig(current_cap=1, max_cap=5)},
)

# 2. Setup Environment with Local Connectors
# Schedule capacity to jump from 1 to 3 at t=5s
ingress = LocalIngress([(5.0, "Line_A.resources.lathe.current_cap", 3)])
egress = ConsoleEgress()

env = DynamicRealtimeEnvironment(factor=1.0)
env.registry.register_sim_parameter(params)
env.setup_ingress([ingress])
env.setup_egress([egress])

# 3. Create Resource
res = DynamicResource(env, "Line_A", "lathe")

def telemetry_monitor(env: DynamicRealtimeEnvironment, res: DynamicResource):
    """Streams system health metrics every 2 seconds."""
    while True:
        env.publish_telemetry("Line_A.resources.lathe.capacity", res.capacity)
        yield env.timeout(2.0)


env.process(telemetry_monitor(env, res))

# 4. Run
print("Simulation started. Watch capacity change at t=5s...")
try:
    env.run(until=10.1)
finally:
    env.teardown()

What this does

  1. Registry Initialization: The SimParameter defines the initial state. The Registry flattens this into addressable paths (e.g., Line_A.resources.lathe.current_cap).
  2. Live Ingress: The LocalIngress simulates an external event (like a Kafka message) arriving 5 seconds into the run.
  3. Zero-Polling Update: The DynamicResource listens to the Registry. The moment the ingress updates the value, the resource automatically expands its internal token pool without any manual checking.
  4. Telemetry Egress: The ConsoleEgress prints system vitals to your terminal, mimicking a live dashboard feed.

Data Egress JSON Schemas

To ensure strict data contracts with external consumers (like Kafka, Redis, or PostgreSQL), dynamic-des uses Pydantic to validate all outbound payloads. Users can expect two distinct JSON structures depending on the stream type:

Telemetry Stream

Used for scalar metrics like resource utilization, queue lengths, or simulation lag.

{
  "stream_type": "telemetry",
  "path_id": "Line_A.resources.lathe.utilization",
  "value": 85.5,
  "sim_ts": 120.5,
  "timestamp": "2023-10-25T14:30:00.000Z"
}

Event Stream

Used for discrete task lifecycle events (e.g., a part arriving, entering a queue, or finishing processing).

{
  "stream_type": "event",
  "key": "task-001",
  "value": {
    "status": "finished",
    "duration": 45.2,
    "path_id": "Line_A.service.lathe"
  },
  "sim_ts": 125.0,
  "timestamp": "2023-10-25T14:30:04.500Z"
}

More Examples

For more examples, including implementations using Kafka providers, please explore the examples folder.

Core Concepts

Dynamic DES is built on the Switchboard Pattern, decoupling data sourcing from simulation logic.

Switchboard Pattern

Instead of resources polling Kafka directly, the architecture is split into three layers:

  1. Connectors (Ingress/Egress): Background threads handle heavy I/O (Kafka, Redis).
  2. Registry (Switchboard): A centralized state manager that flattens data into dot-notation paths.
  3. Resources (SimPy Objects): Passive observers that "wake up" only when the Registry signals a change.

Event-Driven Capacity

Standard SimPy resources have static capacities. DynamicResource wraps a Container and a PriorityStore. When the Registry updates:

  • Growing: Extra tokens are added to the pool immediately.
  • Shrinking: The resource requests tokens back. If they are busy, it waits until they are released, ensuring no work-in-progress is lost.

High-Throughput Events

To handle high throughput, the EgressMixIn uses:

  • Batching: Pushing lists of events to the I/O thread to reduce lock contention.
  • orjson: Rust-powered serialization for maximum speed.

Documentation

For full documentation, architecture details, and API reference, visit: https://jaehyeon.me/dynamic-des/.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamic_des-0.8.0.tar.gz (39.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamic_des-0.8.0-py3-none-any.whl (56.6 kB view details)

Uploaded Python 3

File details

Details for the file dynamic_des-0.8.0.tar.gz.

File metadata

  • Download URL: dynamic_des-0.8.0.tar.gz
  • Upload date:
  • Size: 39.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamic_des-0.8.0.tar.gz
Algorithm Hash digest
SHA256 55ddc339dd054f9f92c7805ad6e8bb79703a8f3311918900afa151c35e831f6d
MD5 854de2ba1145f2ca961dcaa922572045
BLAKE2b-256 4443ec243df1980c7a5008f60a0e34d63a5e18c88ede8519e8e32b226a2864a7

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamic_des-0.8.0.tar.gz:

Publisher: publish.yml on jaehyeon-kim/dynamic-des

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dynamic_des-0.8.0-py3-none-any.whl.

File metadata

  • Download URL: dynamic_des-0.8.0-py3-none-any.whl
  • Upload date:
  • Size: 56.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamic_des-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 38ca199fb5e973c9198b65d15227ee3f36ecce5842171844c205367be9307c79
MD5 93735b98da0652833ea7931aa2a3c54d
BLAKE2b-256 681c6a9a746d3dfce247253335173e8ed5a1f04f6a7fce614355c0952140d2c8

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamic_des-0.8.0-py3-none-any.whl:

Publisher: publish.yml on jaehyeon-kim/dynamic-des

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page