Skip to main content

Domain-agnostic Python framework for digital twin architectures

Project description

DT-Forge

A domain-agnostic Python framework for building digital twins of any physical or process asset — pumps, soil, plants, sales pipelines, manufacturing lines.

DT-Forge implements a six-layer Digital Twin architecture plus a cross-cutting connector subsystem for inter-twin communication and four collection-twin patterns for grouping twins together. You declare the sensors, write the asset-specific glue, and the framework provides the rest: storage, modelling, services, reactive control, multi-agent diagnosis, and autonomous decision-making.

Architecture

┌─────────────────────────────────────────────────────────┐
│              AUTONOMOUS LAYER (Layer 6)                 │
│  OODA loop · GoalPlanner · AutonomousOverseer (LLM)     │
│  RL policy (SAC/TD3/PPO/A2C) · HumanNotifier            │
├─────────────────────────────────────────────────────────┤
│             INTELLIGENT LAYER (Layer 5)                 │
│  MultiAgentSystem · LangChain DiagnosticAgent           │
│  Neo4j knowledge graph · per-agent status / history     │
├─────────────────────────────────────────────────────────┤
│              REACTIVE LAYER (Layer 4)                   │
│  ThresholdRuleEngine · MultiStateFSMRuleEngine          │
│  simple-pid PIDController · PostgreSQL RuleRepository   │
├─────────────────────────────────────────────────────────┤
│              SERVICES LAYER (Layer 3)                   │
│  Eclipse Ditto sync · FastAPI REST + SSE chat           │
├─────────────────────────────────────────────────────────┤
│         SIMULATION & MODEL LAYER (Layer 2)              │
│  scipy ODE · ONNX/sklearn surrogate · SimPy · Prophet   │
├─────────────────────────────────────────────────────────┤
│                 DATA LAYER (Layer 1)                    │
│  MQTT ingest · TelemetryRouter · DataManagementPipeline │
│  InfluxDB · MongoDB · Redis · MinIO · PostgreSQL        │
│  ProvenanceLog · SessionStore · TextIngestor            │
└─────────────────────────────────────────────────────────┘

    Cross-cutting:  Connector subsystem  (MQTT · Ditto · HTTP API)
                    Collection twins     (Aggregate · Collection ·
                                          Composite · Network)

Every layer is optional — pick the ones your twin needs and skip the rest. A monitoring-only twin needs Data + Network. A self-driving asset needs all six. The CLI generates a docker-compose.yml for exactly the layers you ask for.

Collection patterns

Pattern Use case
AggregateDT Fleet of identical assets — fused state, shared control
CollectionDT Batch monitoring, outlier detection, statistical comparison
CompositeDT Hierarchical systems with boundary-condition exchange + swap
NetworkDT Graph-topology systems, cascade risk, bottleneck detection

Documentation

The guide/ directory is the authoritative manual — read it in order if you are new to the framework.

Step File Covers
01 guide/01_mental_model.md The big picture
02 guide/02_your_first_twin.md Docker + a minimal working twin
03 guide/03_config_and_sensors.md TwinConfig, SensorFieldSpec, env vars
04 guide/04_data_layer.md Storage backends, router, pipeline, sessions, text
05 guide/05_simulation_layer.md Physics, surrogates, residuals
06 guide/06_services_layer.md Ditto sync, FastAPI, SSE chat
07 guide/07_reactive_layer.md Rules, FSM, PID, hot-reloading rule repository
08 guide/08_intelligent_layer.md Knowledge graph, agents, MAS, escalation
09 guide/09_autonomous_layer.md OODA, overseer, planner, RL
10 guide/10_connectors_and_collections.md Multi-twin systems
11 guide/11_full_example.md End-to-end worked example

Worked implementations live under implementations/ — see implementations/sdt/ for a multi-twin agricultural system that exercises the full stack (BPDT + SDT + PDT + composite + RL).

The high-level design document is DT_Forge_Framework_Design.md.

Quick start

1. Install

Requires Python 3.11+ and Docker.

pip install -e .

2. Configure

cp .env.example .env   # then edit with your asset ID, LLM key, etc.

Configuration is driven by environment variables with the DT_ prefix and double-underscore nested-field delimiter:

DT_ASSET_ID=pump_001
DT_ASSET_TYPE=centrifugal_pump
DT_ASSET_NAME="Plant A Pump"

DT_MQTT__BROKER=localhost
DT_MQTT__PORT=1883

DT_INFLUX__URL=http://localhost:8086
DT_INFLUX__TOKEN=my-token
DT_INFLUX__ORG=digital_twin
DT_INFLUX__BUCKET=asset_telemetry

DT_LLM__PROVIDER=anthropic        # openai | anthropic | ollama
DT_LLM__MODEL=claude-sonnet-4-6
DT_LLM__API_KEY=sk-ant-...

See .env.example for the complete reference.

3. Start the infrastructure you need

dtforge infra up writes a docker-compose.yml tuned for the layers you ask for and runs docker compose up -d:

# Minimal monitoring twin (data + MQTT + Ditto sync):
dtforge infra up --layers data,network,services

# Add the intelligent layer (brings Neo4j up too):
dtforge infra up --layers data,network,services,intelligent

# Write the compose file without starting containers:
dtforge infra up --layers data,network,services --generate-only
docker compose up -d

Wait for everything to come up, then verify:

dtforge infra check --layers data,network,services,intelligent

Every service shows once the twin can reach it.

Containers started by infra up:

Service Host port Layer needed Purpose
Mosquitto 1883, 9001 network MQTT broker + WebSocket
InfluxDB 8086 data / network Time-series telemetry
MongoDB 27017 data / network Events + provenance
Redis 6379 data / network Cache, FSM state, sessions
MinIO 9000, 9002 data / network Trained models, large objects
Eclipse Ditto 8080 services Canonical twin state (nginx+auth)
Neo4j 7474, 7687 intelligent Knowledge graph
Grafana 3000 always Observability

Pre-built domains under implementations/ ship their own docker-compose.yml tuned for that domain — use those instead of generating from scratch when running an example.

4. Scaffold a new twin

dtforge init \
  --asset-type centrifugal_pump \
  --name "Plant A Pump" \
  --asset-id pump_001

That writes:

File Purpose
twin.py Twin class scaffold — edit build_layers() to wire it
.env Environment configuration

The scaffold imports the core layers (Data, Network ingest, Data management, Ditto sync, Reactive) and gives you a runnable starting point. Add the intelligent and autonomous layers as your twin grows.

5. Run

python twin.py            # direct
# or:
dtforge run twin          # via the CLI (loads twin.py, calls run_forever)

The FastAPI service exposes /health, /api/twin/state, /api/twin/telemetry, /api/twin/events, and the SSE chat endpoint /api/chat once the services layer is wired in.

Concrete example — a 30-line monitoring twin

import asyncio, logging
from dotenv import load_dotenv
from dt_forge.core.config import TwinConfig, SensorFieldSpec
from dt_forge.core.base import AbstractDigitalTwin
from dt_forge.core.lifecycle import TwinLifecycle
from dt_forge.data import InfluxAdapter, MongoAdapter, RedisAdapter
from dt_forge.data.writer import TelemetryRouter
from dt_forge.data.management import DataManagementPipeline
from dt_forge.network import MQTTIngestor
from dt_forge.reactive import ThresholdRuleEngine

load_dotenv()
logging.basicConfig(level=logging.INFO)

config = TwinConfig(sensor_fields=[
    SensorFieldSpec(name="temperature_c", nominal=25.0, noise_std=0.5,
                    warn_threshold=60.0, crit_threshold=75.0),
    SensorFieldSpec(name="pressure_bar", nominal=4.0, noise_std=0.05,
                    warn_threshold=3.0, crit_threshold=2.0,
                    threshold_direction="low"),
])

class PumpTwin(AbstractDigitalTwin):
    def build_layers(self):
        ts, doc, cache = InfluxAdapter(self.config), MongoAdapter(self.config), RedisAdapter(self.config)
        router = TelemetryRouter(self.config, self.bus,
                                 ts_store=ts, doc_store=doc, cache=cache)
        return {
            "data":      router,
            "network":   MQTTIngestor(self.config, self.bus, router=router),
            "data_mgmt": DataManagementPipeline(self.config, self.bus,
                                                ts_store=ts, cache=cache),
            "reactive":  ThresholdRuleEngine(self.config, self.bus,
                                              ts_store=ts, cache=cache, doc_store=doc),
        }

if __name__ == "__main__":
    lc = TwinLifecycle(); lc.add(PumpTwin(config))
    asyncio.run(lc.run_forever())

Run a GenericSimulator to publish synthetic readings on the telemetry topic, or point a real device at dt/pump_001/telemetry and the twin will route, store, smooth, score health, and drive the FSM automatically.

The guide walks through adding simulation, knowledge graph, agents, and the OODA loop on top of this skeleton.

Extension points

The framework is built around protocols and abstract base classes. Replace any piece without touching the rest:

Extension Mechanism
Storage backend Implement TimeSeriesStore / DocumentStore /
CacheStore / ObjectStore protocol
Physics model Subclass ODEModel or implement TwinModel
ML surrogate Implement TwinModel with ONNX/sklearn/Torch
Discrete-event model Wrap a generator with SimPyModel
Forecaster Wrap any time-series model with ProphetForecaster
Custom reactive rule Implement Rule.evaluate(readings) (sync)
N-state FSM Subclass MultiStateFSMRuleEngine
Diagnostic agent tools Override DiagnosticAgent._build_extra_tools()
LLM provider Set DT_LLM__PROVIDER (openai/anthropic/ollama)
Goal-based assessment Subclass GoalPlanner.assess()
Strategic LLM decisions Wire an AutonomousOverseer into OODALoop
RL policy PolicyTrainer + PolicyDeployer (SB3)
Reward function Pass reward_fn to GenericTwinEnv
Notification channel Implement NotificationBackend (email/Slack/webhook)
Cross-twin transport Implement ConnectorProtocol
New collection pattern Subclass AbstractCollectionTwin
Domain session state Subclass SessionContext, use SessionStore[T]

Technology stack

Concern Library
Messaging Eclipse Mosquitto / paho-mqtt 2.x
Time-series InfluxDB 2 (influxdb-client)
Documents MongoDB (pymongo + motor)
Cache / pub-sub Redis
Object storage MinIO (S3 API)
Relational PostgreSQL (asyncpg) — used by RuleRepository
Twin state Eclipse Ditto
Knowledge graph Neo4j 5
Web API FastAPI + Uvicorn + sse-starlette
Config Pydantic v2 + pydantic-settings
State machine transitions
PID control simple-pid
Physics SciPy (solve_ivp)
Surrogate / ML ONNX Runtime + scikit-learn
Forecasting Prophet
Discrete event SimPy
Sentiment / NLP vaderSentiment (swappable for any callable)
Agents LangChain + langchain-classic
LLM providers OpenAI · Anthropic · Ollama
RL Stable-Baselines3 + Gymnasium
CLI Click

CLI reference

dtforge init        --asset-type TYPE --name NAME --asset-id ID [--out DIR]
dtforge infra up    --layers data,network,services[,intelligent] [--out FILE] [--generate-only]
dtforge infra check [--layers ...]              # default: read .dtforge-layers
dtforge run         [TWIN_MODULE]               # default module: "twin"
dtforge train       [--algorithm SAC|TD3|PPO|A2C] [--timesteps N]
                    [--env-module M] [--save NAME]

Run any command with --help for the full flag list.

Project layout

framework/
├── dt_forge/           ← the framework package (never edit your own code here)
│   ├── core/           ← config, events, lifecycle, registries, base classes
│   ├── data/           ← storage adapters, router, management pipeline,
│   │                     text ingestor, sessions, provenance
│   ├── simulation/     ← ODE, SimPy, surrogate, Prophet, runner
│   ├── services/       ← Ditto client + sync, FastAPI app + routes + SSE
│   ├── reactive/       ← rules, FSMs, PID, rule repository
│   ├── intelligent/    ← knowledge graph, agents, MAS
│   ├── autonomous/     ← OODA, overseer, planner, RL trainer/deployer
│   ├── network/        ← MQTT transport + ingestor
│   ├── connector/      ← MQTT/Ditto/HTTP cross-twin connectors
│   ├── collection/     ← four collection-twin patterns + orchestrator
│   ├── notifications/  ← human-notifier backends (email/Slack/webhook)
│   ├── infra/          ← docker-compose generator + health checks
│   └── cli/            ← `dtforge` CLI
├── guide/              ← step-by-step manual
├── implementations/    ← worked examples (sdt, biotic pod, …)
└── DT_Forge_Framework_Design.md   ← design rationale

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dt_forge-0.4.2.tar.gz (86.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dt_forge-0.4.2-py3-none-any.whl (114.0 kB view details)

Uploaded Python 3

File details

Details for the file dt_forge-0.4.2.tar.gz.

File metadata

  • Download URL: dt_forge-0.4.2.tar.gz
  • Upload date:
  • Size: 86.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dt_forge-0.4.2.tar.gz
Algorithm Hash digest
SHA256 2b9f2405cd81464a5c13676088a06128c8d69c881437965db36f75d021bd1642
MD5 f5611ac2f68ea0852b6f02d814201662
BLAKE2b-256 694aaf8742c630bff57bc33b38a50fa82fc626ffe3f0ff36594f49673edba2a8

See more details on using hashes here.

File details

Details for the file dt_forge-0.4.2-py3-none-any.whl.

File metadata

  • Download URL: dt_forge-0.4.2-py3-none-any.whl
  • Upload date:
  • Size: 114.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dt_forge-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f118d5a273e892a9c5506d21d8aeaee49ebb957c263fbffda6757413ae9d2750
MD5 aa1036c6d55d50d27c2c0c695fe98b3a
BLAKE2b-256 ed204673908000bcbcc99f41ee3d74cb37a3abe6ea9aeaa1795b5fc5ebebf088

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page