Skip to main content

Real-time SimPy control plane to dynamically update parameters and stream outputs via external systems like Kafka, Redis, or Postgres. Built for event-driven digital twins.

Project description

Dynamic DES

CI Pipeline Documentation PyPI version Python Versions License: MIT

Real-time SimPy control plane for event-driven digital twins.

Dashboard Screenshot

Dynamic DES bridges the gap between static discrete-event simulations and the live world. It allows you to update simulation parameters (arrivals, service times, capacities) and stream telemetry via Kafka, Redis, or PostgreSQL without stopping the simulation. Beyond live streaming, it transforms static models into synchronized forecasting engines, enabling rapid historical data generation and future state prediction. Export compressed, chunked datasets (Parquet, JSONL) directly to local storage, AWS S3, Google Cloud Storage, Azure Blob, or SeaweedFS using PyArrow VFS, complete with strict schema drift prevention.

Key Features

  • ⚡ Real-Time Control: Synchronize SimPy with the system clock using DynamicRealtimeEnvironment.
  • 🔗 Dynamic Registry: Dynamic, path-based updates (e.g., Line_A.arrival.rate) that trigger instant logic changes.
  • 🚀 High Throughput: Optimized to handle high throughput using orjson and local batching.
  • 🛡️ Enterprise Ready: Native **kwargs passthrough for SASL, mTLS, OAuth, and AWS IAM Kafka clusters.
  • 📦 Pluggable Serialization: Stream lightweight JSON by default, or map specific ML topics to lazy-loaded Avro/Schema Registry serializers (Confluent & AWS Glue).
  • 🗄️ Data Lake Ingestion: Native PyArrow VFS integration for fast chunked writing (Parquet/JSONL) directly to object storage, with built-in schema inference and drift enforcement.
  • 🦆 Pydantic Duck-Typing: Seamlessly publish strictly-typed Pydantic V2 models straight from your simulation logic.
  • 🌍 Domain Agnostic: Perfect for factory floors, crypto trading bots, or RPG game state management.

Installation

Install the core library:

pip install dynamic-des

To include specific backends and enterprise features:

# For Kafka support
pip install "dynamic-des[kafka]"

# For Confluent Schema Registry (Avro)
pip install "dynamic-des[kafka,confluent]"

# For AWS Glue Schema Registry (Avro)
pip install "dynamic-des[kafka,glue]"

# For Data Lake Storage (Parquet & PyArrow VFS)
pip install "dynamic-des[parquet]"

# For all backends (Kafka, Redis, Postgres, Dashboard, Avro, Parquet)
pip install "dynamic-des[all]"

Quick Start: Zero-Setup Demos

Dynamic DES comes with built-in examples and infrastructure orchestration so you can see it in action immediately.

Run the local, dependency-free simulation:

ddes-local-example

Run the full Real-Time Digital Twin stack with Kafka and a live UI:

# Start the background Kafka cluster (requires Docker)
ddes-kafka-infra-up

# Open a new terminal and run the simulation
# Ctrl + C to stop
ddes-kafka-example

# Open a new terminal and start the control dashboard (opens in browser)
# Visit http://localhost:8080
# Ctrl + C to stop
ddes-kafka-dashboard

# Clean up the infrastructure when finished
ddes-kafka-infra-down

Building Your Own Simulation (Local Example)

The following snippet demonstrates a simple example. It initializes a production line, schedules an external capacity update, and streams telemetry to the console.

import logging
from dynamic_des import (
    CapacityConfig, ConsoleEgress, DistributionConfig,
    DynamicRealtimeEnvironment, DynamicResource, LocalIngress, SimParameter
)

logging.basicConfig(
    level=logging.INFO, format="%(levelname)s [%(asctime)s] %(message)s"
)
logger = logging.getLogger("local_example")

# 1. Define initial system state
params = SimParameter(
    sim_id="Line_A",
    arrival={"standard": DistributionConfig(dist="exponential", rate=1)},
    resources={"lathe": CapacityConfig(current_cap=1, max_cap=5)},
)

# 2. Setup Environment with Local Connectors
# Schedule capacity to jump from 1 to 3 at t=5s
ingress = LocalIngress([(5.0, "Line_A.resources.lathe.current_cap", 3)])
egress = ConsoleEgress()

env = DynamicRealtimeEnvironment(factor=1.0)
env.registry.register_sim_parameter(params)
env.setup_ingress([ingress])
env.setup_egress([egress])

# 3. Create Resource
res = DynamicResource(env, "Line_A", "lathe")

def telemetry_monitor(env: DynamicRealtimeEnvironment, res: DynamicResource):
    """Streams system health metrics every 2 seconds."""
    while True:
        env.publish_telemetry("Line_A.resources.lathe.capacity", res.capacity)
        yield env.timeout(2.0)


env.process(telemetry_monitor(env, res))

# 4. Run
print("Simulation started. Watch capacity change at t=5s...")
try:
    env.run(until=10.1)
finally:
    env.teardown()

What this does

  1. Registry Initialization: The SimParameter defines the initial state. The Registry flattens this into addressable paths (e.g., Line_A.resources.lathe.current_cap).
  2. Live Ingress: The LocalIngress simulates an external event (like a Kafka message) arriving 5 seconds into the run.
  3. Zero-Polling Update: The DynamicResource listens to the Registry. The moment the ingress updates the value, the resource automatically expands its internal token pool without any manual checking.
  4. Telemetry Egress: The ConsoleEgress prints system vitals to your terminal, mimicking a live dashboard feed.

Data Egress JSON Schemas

To ensure strict data contracts with external consumers (like Kafka, Redis, or PostgreSQL), dynamic-des uses Pydantic to validate all outbound payloads. Users can expect two distinct JSON structures depending on the stream type:

Telemetry Stream

Used for scalar metrics like resource utilization, queue lengths, or simulation lag.

{
  "stream_type": "telemetry",
  "path_id": "Line_A.resources.lathe.utilization",
  "value": 85.5,
  "sim_ts": 120.5,
  "timestamp": "2023-10-25T14:30:00.000Z"
}

Event Stream

Used for discrete task lifecycle events (e.g., a part arriving, entering a queue, or finishing processing).

{
  "stream_type": "event",
  "key": "task-001",
  "value": {
    "status": "finished",
    "duration": 45.2,
    "path_id": "Line_A.service.lathe"
  },
  "sim_ts": 125.0,
  "timestamp": "2023-10-25T14:30:04.500Z"
}

More Examples

For more examples, including implementations using Kafka providers, please explore the examples folder.

Core Concepts

Dynamic DES is built on the Switchboard Pattern, decoupling data sourcing from simulation logic.

Switchboard Pattern

Instead of resources polling Kafka directly, the architecture is split into three layers:

  1. Connectors (Ingress/Egress): Background threads handle heavy I/O (Kafka, Redis).
  2. Registry (Switchboard): A centralized state manager that flattens data into dot-notation paths.
  3. Resources (SimPy Objects): Passive observers that "wake up" only when the Registry signals a change.

Event-Driven Capacity

Standard SimPy resources have static capacities. DynamicResource wraps a Container and a PriorityStore. When the Registry updates:

  • Growing: Extra tokens are added to the pool immediately.
  • Shrinking: The resource requests tokens back. If they are busy, it waits until they are released, ensuring no work-in-progress is lost.

High-Throughput Events

To handle high throughput, the EgressMixIn uses:

  • Batching: Pushing lists of events to the I/O thread to reduce lock contention.
  • orjson: Rust-powered serialization for maximum speed.

Documentation

For full documentation, architecture details, and API reference, visit: https://jaehyeon.me/dynamic-des/.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamic_des-0.8.1.tar.gz (39.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamic_des-0.8.1-py3-none-any.whl (56.7 kB view details)

Uploaded Python 3

File details

Details for the file dynamic_des-0.8.1.tar.gz.

File metadata

  • Download URL: dynamic_des-0.8.1.tar.gz
  • Upload date:
  • Size: 39.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamic_des-0.8.1.tar.gz
Algorithm Hash digest
SHA256 76d9088688aba766fc419a686062a2ec197822495e856f88f1c9530e423c85b8
MD5 b11cbc176fed1062f801e13437e07900
BLAKE2b-256 529199ad75cd10179ffcb557b0c46e1232c9e0994f8ad2b6c1bcde17c4865569

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamic_des-0.8.1.tar.gz:

Publisher: publish.yml on jaehyeon-kim/dynamic-des

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dynamic_des-0.8.1-py3-none-any.whl.

File metadata

  • Download URL: dynamic_des-0.8.1-py3-none-any.whl
  • Upload date:
  • Size: 56.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamic_des-0.8.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d916003868232d85f51f9e1abc9ba26e11cf3fa51c845206e0ed2ecf49c57b4f
MD5 66d75d6f45fe4a0aa4a9aefe010c5ae2
BLAKE2b-256 bca2defcb0808ed4b0faf7d07988de78bcaed3b0b0d245c8f97b31d7d4359410

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamic_des-0.8.1-py3-none-any.whl:

Publisher: publish.yml on jaehyeon-kim/dynamic-des

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page