Domain-agnostic Python framework for digital twin architectures
Project description
DT-Forge
A domain-agnostic Python framework for building digital twins of any physical or process asset — pumps, soil, plants, sales pipelines, manufacturing lines.
DT-Forge implements a six-layer Digital Twin architecture plus a cross-cutting connector subsystem for inter-twin communication and four collection-twin patterns for grouping twins together. You declare the sensors, write the asset-specific glue, and the framework provides the rest: storage, modelling, services, reactive control, multi-agent diagnosis, and autonomous decision-making.
Architecture
┌─────────────────────────────────────────────────────────┐
│ AUTONOMOUS LAYER (Layer 6) │
│ OODA loop · GoalPlanner · AutonomousOverseer (LLM) │
│ RL policy (SAC/TD3/PPO/A2C) · HumanNotifier │
├─────────────────────────────────────────────────────────┤
│ INTELLIGENT LAYER (Layer 5) │
│ MultiAgentSystem · LangChain DiagnosticAgent │
│ Neo4j knowledge graph · per-agent status / history │
├─────────────────────────────────────────────────────────┤
│ REACTIVE LAYER (Layer 4) │
│ ThresholdRuleEngine · MultiStateFSMRuleEngine │
│ simple-pid PIDController · PostgreSQL RuleRepository │
├─────────────────────────────────────────────────────────┤
│ SERVICES LAYER (Layer 3) │
│ Eclipse Ditto sync · FastAPI REST + SSE chat │
├─────────────────────────────────────────────────────────┤
│ SIMULATION & MODEL LAYER (Layer 2) │
│ scipy ODE · ONNX/sklearn surrogate · SimPy · Prophet │
├─────────────────────────────────────────────────────────┤
│ DATA LAYER (Layer 1) │
│ MQTT ingest · TelemetryRouter · DataManagementPipeline │
│ InfluxDB · MongoDB · Redis · MinIO · PostgreSQL │
│ ProvenanceLog · SessionStore · TextIngestor │
└─────────────────────────────────────────────────────────┘
Cross-cutting: Connector subsystem (MQTT · Ditto · HTTP API)
Collection twins (Aggregate · Collection ·
Composite · Network)
Every layer is optional — pick the ones your twin needs and skip the rest.
A monitoring-only twin needs Data + Network. A self-driving asset needs all
six. The CLI generates a docker-compose.yml for exactly the layers you ask
for.
Collection patterns
| Pattern | Use case |
|---|---|
AggregateDT |
Fleet of identical assets — fused state, shared control |
CollectionDT |
Batch monitoring, outlier detection, statistical comparison |
CompositeDT |
Hierarchical systems with boundary-condition exchange + swap |
NetworkDT |
Graph-topology systems, cascade risk, bottleneck detection |
Documentation
The guide/ directory is the authoritative manual — read it in order if you
are new to the framework.
| Step | File | Covers |
|---|---|---|
| 01 | guide/01_mental_model.md |
The big picture |
| 02 | guide/02_your_first_twin.md |
Docker + a minimal working twin |
| 03 | guide/03_config_and_sensors.md |
TwinConfig, SensorFieldSpec, env vars |
| 04 | guide/04_data_layer.md |
Storage backends, router, pipeline, sessions, text |
| 05 | guide/05_simulation_layer.md |
Physics, surrogates, residuals |
| 06 | guide/06_services_layer.md |
Ditto sync, FastAPI, SSE chat |
| 07 | guide/07_reactive_layer.md |
Rules, FSM, PID, hot-reloading rule repository |
| 08 | guide/08_intelligent_layer.md |
Knowledge graph, agents, MAS, escalation |
| 09 | guide/09_autonomous_layer.md |
OODA, overseer, planner, RL |
| 10 | guide/10_connectors_and_collections.md |
Multi-twin systems |
| 11 | guide/11_full_example.md |
End-to-end worked example |
Worked implementations live under implementations/ — see
implementations/sdt/ for a multi-twin agricultural system that exercises the
full stack (BPDT + SDT + PDT + composite + RL).
The high-level design document is
DT_Forge_Framework_Design.md.
Quick start
1. Install
Requires Python 3.11+ and Docker.
pip install -e .
2. Configure
cp .env.example .env # then edit with your asset ID, LLM key, etc.
Configuration is driven by environment variables with the DT_ prefix and
double-underscore nested-field delimiter:
DT_ASSET_ID=pump_001
DT_ASSET_TYPE=centrifugal_pump
DT_ASSET_NAME="Plant A Pump"
DT_MQTT__BROKER=localhost
DT_MQTT__PORT=1883
DT_INFLUX__URL=http://localhost:8086
DT_INFLUX__TOKEN=my-token
DT_INFLUX__ORG=digital_twin
DT_INFLUX__BUCKET=asset_telemetry
DT_LLM__PROVIDER=anthropic # openai | anthropic | ollama
DT_LLM__MODEL=claude-sonnet-4-6
DT_LLM__API_KEY=sk-ant-...
See .env.example for the complete reference.
3. Start the infrastructure you need
dtforge infra up writes a docker-compose.yml tuned for the layers you ask
for and runs docker compose up -d:
# Minimal monitoring twin (data + MQTT + Ditto sync):
dtforge infra up --layers data,network,services
# Add the intelligent layer (brings Neo4j up too):
dtforge infra up --layers data,network,services,intelligent
# Write the compose file without starting containers:
dtforge infra up --layers data,network,services --generate-only
docker compose up -d
Wait for everything to come up, then verify:
dtforge infra check --layers data,network,services,intelligent
Every service shows ✓ once the twin can reach it.
Containers started by infra up:
| Service | Host port | Layer needed | Purpose |
|---|---|---|---|
| Mosquitto | 1883, 9001 | network |
MQTT broker + WebSocket |
| InfluxDB | 8086 | data / network |
Time-series telemetry |
| MongoDB | 27017 | data / network |
Events + provenance |
| Redis | 6379 | data / network |
Cache, FSM state, sessions |
| MinIO | 9000, 9002 | data / network |
Trained models, large objects |
| Eclipse Ditto | 8080 | services |
Canonical twin state (nginx+auth) |
| Neo4j | 7474, 7687 | intelligent |
Knowledge graph |
| Grafana | 3000 | always | Observability |
Pre-built domains under implementations/ ship their own
docker-compose.yml tuned for that domain — use those instead of generating
from scratch when running an example.
4. Scaffold a new twin
dtforge init \
--asset-type centrifugal_pump \
--name "Plant A Pump" \
--asset-id pump_001
That writes:
| File | Purpose |
|---|---|
twin.py |
Twin class scaffold — edit build_layers() to wire it |
.env |
Environment configuration |
The scaffold imports the core layers (Data, Network ingest, Data management, Ditto sync, Reactive) and gives you a runnable starting point. Add the intelligent and autonomous layers as your twin grows.
5. Run
python twin.py # direct
# or:
dtforge run twin # via the CLI (loads twin.py, calls run_forever)
The FastAPI service exposes /health, /api/twin/state,
/api/twin/telemetry, /api/twin/events, and the SSE chat endpoint
/api/chat once the services layer is wired in.
Concrete example — a 30-line monitoring twin
import asyncio, logging
from dotenv import load_dotenv
from dt_forge.core.config import TwinConfig, SensorFieldSpec
from dt_forge.core.base import AbstractDigitalTwin
from dt_forge.core.lifecycle import TwinLifecycle
from dt_forge.data import InfluxAdapter, MongoAdapter, RedisAdapter
from dt_forge.data.writer import TelemetryRouter
from dt_forge.data.management import DataManagementPipeline
from dt_forge.network import MQTTIngestor
from dt_forge.reactive import ThresholdRuleEngine
load_dotenv()
logging.basicConfig(level=logging.INFO)
config = TwinConfig(sensor_fields=[
SensorFieldSpec(name="temperature_c", nominal=25.0, noise_std=0.5,
warn_threshold=60.0, crit_threshold=75.0),
SensorFieldSpec(name="pressure_bar", nominal=4.0, noise_std=0.05,
warn_threshold=3.0, crit_threshold=2.0,
threshold_direction="low"),
])
class PumpTwin(AbstractDigitalTwin):
def build_layers(self):
ts, doc, cache = InfluxAdapter(self.config), MongoAdapter(self.config), RedisAdapter(self.config)
router = TelemetryRouter(self.config, self.bus,
ts_store=ts, doc_store=doc, cache=cache)
return {
"data": router,
"network": MQTTIngestor(self.config, self.bus, router=router),
"data_mgmt": DataManagementPipeline(self.config, self.bus,
ts_store=ts, cache=cache),
"reactive": ThresholdRuleEngine(self.config, self.bus,
ts_store=ts, cache=cache, doc_store=doc),
}
if __name__ == "__main__":
lc = TwinLifecycle(); lc.add(PumpTwin(config))
asyncio.run(lc.run_forever())
Run a GenericSimulator to publish synthetic readings on the telemetry topic,
or point a real device at dt/pump_001/telemetry and the twin will route,
store, smooth, score health, and drive the FSM automatically.
The guide walks through adding simulation, knowledge graph, agents, and the OODA loop on top of this skeleton.
Extension points
The framework is built around protocols and abstract base classes. Replace any piece without touching the rest:
| Extension | Mechanism |
|---|---|
| Storage backend | Implement TimeSeriesStore / DocumentStore / |
CacheStore / ObjectStore protocol |
|
| Physics model | Subclass ODEModel or implement TwinModel |
| ML surrogate | Implement TwinModel with ONNX/sklearn/Torch |
| Discrete-event model | Wrap a generator with SimPyModel |
| Forecaster | Wrap any time-series model with ProphetForecaster |
| Custom reactive rule | Implement Rule.evaluate(readings) (sync) |
| N-state FSM | Subclass MultiStateFSMRuleEngine |
| Diagnostic agent tools | Override DiagnosticAgent._build_extra_tools() |
| LLM provider | Set DT_LLM__PROVIDER (openai/anthropic/ollama) |
| Goal-based assessment | Subclass GoalPlanner.assess() |
| Strategic LLM decisions | Wire an AutonomousOverseer into OODALoop |
| RL policy | PolicyTrainer + PolicyDeployer (SB3) |
| Reward function | Pass reward_fn to GenericTwinEnv |
| Notification channel | Implement NotificationBackend (email/Slack/webhook) |
| Cross-twin transport | Implement ConnectorProtocol |
| New collection pattern | Subclass AbstractCollectionTwin |
| Domain session state | Subclass SessionContext, use SessionStore[T] |
Technology stack
| Concern | Library |
|---|---|
| Messaging | Eclipse Mosquitto / paho-mqtt 2.x |
| Time-series | InfluxDB 2 (influxdb-client) |
| Documents | MongoDB (pymongo + motor) |
| Cache / pub-sub | Redis |
| Object storage | MinIO (S3 API) |
| Relational | PostgreSQL (asyncpg) — used by RuleRepository |
| Twin state | Eclipse Ditto |
| Knowledge graph | Neo4j 5 |
| Web API | FastAPI + Uvicorn + sse-starlette |
| Config | Pydantic v2 + pydantic-settings |
| State machine | transitions |
| PID control | simple-pid |
| Physics | SciPy (solve_ivp) |
| Surrogate / ML | ONNX Runtime + scikit-learn |
| Forecasting | Prophet |
| Discrete event | SimPy |
| Sentiment / NLP | vaderSentiment (swappable for any callable) |
| Agents | LangChain + langchain-classic |
| LLM providers | OpenAI · Anthropic · Ollama |
| RL | Stable-Baselines3 + Gymnasium |
| CLI | Click |
CLI reference
dtforge init --asset-type TYPE --name NAME --asset-id ID [--out DIR]
dtforge infra up --layers data,network,services[,intelligent] [--out FILE] [--generate-only]
dtforge infra check [--layers ...] # default: read .dtforge-layers
dtforge run [TWIN_MODULE] # default module: "twin"
dtforge train [--algorithm SAC|TD3|PPO|A2C] [--timesteps N]
[--env-module M] [--save NAME]
Run any command with --help for the full flag list.
Project layout
framework/
├── dt_forge/ ← the framework package (never edit your own code here)
│ ├── core/ ← config, events, lifecycle, registries, base classes
│ ├── data/ ← storage adapters, router, management pipeline,
│ │ text ingestor, sessions, provenance
│ ├── simulation/ ← ODE, SimPy, surrogate, Prophet, runner
│ ├── services/ ← Ditto client + sync, FastAPI app + routes + SSE
│ ├── reactive/ ← rules, FSMs, PID, rule repository
│ ├── intelligent/ ← knowledge graph, agents, MAS
│ ├── autonomous/ ← OODA, overseer, planner, RL trainer/deployer
│ ├── network/ ← MQTT transport + ingestor
│ ├── connector/ ← MQTT/Ditto/HTTP cross-twin connectors
│ ├── collection/ ← four collection-twin patterns + orchestrator
│ ├── notifications/ ← human-notifier backends (email/Slack/webhook)
│ ├── infra/ ← docker-compose generator + health checks
│ └── cli/ ← `dtforge` CLI
├── guide/ ← step-by-step manual
├── implementations/ ← worked examples (sdt, biotic pod, …)
└── DT_Forge_Framework_Design.md ← design rationale
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dt_forge-0.4.2.tar.gz.
File metadata
- Download URL: dt_forge-0.4.2.tar.gz
- Upload date:
- Size: 86.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2b9f2405cd81464a5c13676088a06128c8d69c881437965db36f75d021bd1642
|
|
| MD5 |
f5611ac2f68ea0852b6f02d814201662
|
|
| BLAKE2b-256 |
694aaf8742c630bff57bc33b38a50fa82fc626ffe3f0ff36594f49673edba2a8
|
File details
Details for the file dt_forge-0.4.2-py3-none-any.whl.
File metadata
- Download URL: dt_forge-0.4.2-py3-none-any.whl
- Upload date:
- Size: 114.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f118d5a273e892a9c5506d21d8aeaee49ebb957c263fbffda6757413ae9d2750
|
|
| MD5 |
aa1036c6d55d50d27c2c0c695fe98b3a
|
|
| BLAKE2b-256 |
ed204673908000bcbcc99f41ee3d74cb37a3abe6ea9aeaa1795b5fc5ebebf088
|