Skip to main content

Domain-agnostic Python framework for digital twin architectures

Project description

DT-Forge

A domain-agnostic Python framework for building digital twins of any physical or process asset — pumps, soil, plants, sales pipelines, manufacturing lines.

DT-Forge implements a six-layer Digital Twin architecture plus a cross-cutting connector subsystem for inter-twin communication and four collection-twin patterns for grouping twins together. You declare the sensors, write the asset-specific glue, and the framework provides the rest: storage, modelling, services, reactive control, multi-agent diagnosis, and autonomous decision-making.

Architecture

┌─────────────────────────────────────────────────────────┐
│              AUTONOMOUS LAYER (Layer 6)                 │
│  OODA loop · GoalPlanner · AutonomousOverseer (LLM)     │
│  RL policy (SAC/TD3/PPO/A2C) · HumanNotifier            │
├─────────────────────────────────────────────────────────┤
│             INTELLIGENT LAYER (Layer 5)                 │
│  MultiAgentSystem · LangChain DiagnosticAgent           │
│  Neo4j knowledge graph · per-agent status / history     │
├─────────────────────────────────────────────────────────┤
│              REACTIVE LAYER (Layer 4)                   │
│  ThresholdRuleEngine · MultiStateFSMRuleEngine          │
│  simple-pid PIDController · PostgreSQL RuleRepository   │
├─────────────────────────────────────────────────────────┤
│              SERVICES LAYER (Layer 3)                   │
│  Eclipse Ditto sync · FastAPI REST + SSE chat           │
├─────────────────────────────────────────────────────────┤
│         SIMULATION & MODEL LAYER (Layer 2)              │
│  scipy ODE · ONNX/sklearn surrogate · SimPy · Prophet   │
├─────────────────────────────────────────────────────────┤
│                 DATA LAYER (Layer 1)                    │
│  MQTT ingest · TelemetryRouter · DataManagementPipeline │
│  InfluxDB · MongoDB · Redis · MinIO · PostgreSQL        │
│  ProvenanceLog · SessionStore · TextIngestor            │
└─────────────────────────────────────────────────────────┘

    Cross-cutting:  Connector subsystem  (MQTT · Ditto · HTTP API)
                    Collection twins     (Aggregate · Collection ·
                                          Composite · Network)

Every layer is optional — pick the ones your twin needs and skip the rest. A monitoring-only twin needs Data + Network. A self-driving asset needs all six. The bundled dtforge CLI generates a docker-compose.yml for exactly the layers you ask for.

Collection patterns

Pattern Use case
AggregateDT Fleet of identical assets — fused state, shared control
CollectionDT Batch monitoring, outlier detection, statistical comparison
CompositeDT Hierarchical systems with boundary-condition exchange + swap
NetworkDT Graph-topology systems, cascade risk, bottleneck detection

Installation

Requires Python 3.11+ and Docker (for the backing infrastructure).

pip install dt-forge

This installs the framework plus the dtforge CLI.

Quick start

1. Configure

Configuration is driven by environment variables with the DT_ prefix and double-underscore nested-field delimiter. Put them in a .env file in your project directory:

DT_ASSET_ID=pump_001
DT_ASSET_TYPE=centrifugal_pump
DT_ASSET_NAME="Plant A Pump"

DT_MQTT__BROKER=localhost
DT_MQTT__PORT=1883

DT_INFLUX__URL=http://localhost:8086
DT_INFLUX__TOKEN=my-token
DT_INFLUX__ORG=digital_twin
DT_INFLUX__BUCKET=asset_telemetry

DT_MONGO__URI=mongodb://admin:password@localhost:27017
DT_REDIS__URL=redis://localhost:6379
DT_DITTO__URL=http://localhost:8080

# Optional — for the intelligent layer
DT_LLM__PROVIDER=anthropic        # openai | anthropic | ollama
DT_LLM__MODEL=claude-sonnet-4-6
DT_LLM__API_KEY=sk-ant-...
DT_NEO4J__URI=bolt://localhost:7687

2. Start the infrastructure you need

dtforge infra up writes a docker-compose.yml tuned for the layers you ask for and runs docker compose up -d:

# Minimal monitoring twin (data + MQTT + Ditto sync):
dtforge infra up --layers data,network,services

# Add the intelligent layer (also brings Neo4j up):
dtforge infra up --layers data,network,services,intelligent

# Write the compose file without starting containers:
dtforge infra up --layers data,network,services --generate-only
docker compose up -d

Wait for everything to come up, then verify:

dtforge infra check --layers data,network,services,intelligent

Every service shows once the twin can reach it.

Containers started by infra up:

Service Host port Layer needed Purpose
Mosquitto 1883, 9001 network MQTT broker + WebSocket
InfluxDB 8086 data / network Time-series telemetry
MongoDB 27017 data / network Events + provenance
Redis 6379 data / network Cache, FSM state, sessions
MinIO 9000, 9002 data / network Trained models, large objects
Eclipse Ditto 8080 services Canonical twin state (nginx+auth)
Neo4j 7474, 7687 intelligent Knowledge graph
Grafana 3000 always Observability

3. Scaffold a new twin

dtforge init \
  --asset-type centrifugal_pump \
  --name "Plant A Pump" \
  --asset-id pump_001

That writes:

File Purpose
twin.py Twin class scaffold — edit build_layers() to wire it
.env Environment configuration

The scaffold imports the core layers (Data, Network ingest, Data management, Ditto sync, Reactive) and gives you a runnable starting point. Add the intelligent and autonomous layers as your twin grows.

4. Run

python twin.py            # direct
# or:
dtforge run twin          # via the CLI (loads twin.py, calls run_forever)

The FastAPI service exposes /health, /api/twin/state, /api/twin/telemetry, /api/twin/events, and the SSE chat endpoint /api/chat once the services layer is wired in.

A 30-line monitoring twin

import asyncio, logging
from dotenv import load_dotenv
from dt_forge.core.config import TwinConfig, SensorFieldSpec
from dt_forge.core.base import AbstractDigitalTwin
from dt_forge.core.lifecycle import TwinLifecycle
from dt_forge.data import InfluxAdapter, MongoAdapter, RedisAdapter
from dt_forge.data.writer import TelemetryRouter
from dt_forge.data.management import DataManagementPipeline
from dt_forge.network import MQTTIngestor
from dt_forge.reactive import ThresholdRuleEngine

load_dotenv()
logging.basicConfig(level=logging.INFO)

config = TwinConfig(sensor_fields=[
    SensorFieldSpec(name="temperature_c", nominal=25.0, noise_std=0.5,
                    warn_threshold=60.0, crit_threshold=75.0),
    SensorFieldSpec(name="pressure_bar", nominal=4.0, noise_std=0.05,
                    warn_threshold=3.0, crit_threshold=2.0,
                    threshold_direction="low"),
])

class PumpTwin(AbstractDigitalTwin):
    def build_layers(self):
        ts, doc, cache = InfluxAdapter(self.config), MongoAdapter(self.config), RedisAdapter(self.config)
        router = TelemetryRouter(self.config, self.bus,
                                 ts_store=ts, doc_store=doc, cache=cache)
        return {
            "data":      router,
            "network":   MQTTIngestor(self.config, self.bus, router=router),
            "data_mgmt": DataManagementPipeline(self.config, self.bus,
                                                ts_store=ts, cache=cache),
            "reactive":  ThresholdRuleEngine(self.config, self.bus,
                                              ts_store=ts, cache=cache, doc_store=doc),
        }

if __name__ == "__main__":
    lc = TwinLifecycle(); lc.add(PumpTwin(config))
    asyncio.run(lc.run_forever())

Point a device (or dt_forge.physical.simulator.GenericSimulator) at dt/pump_001/telemetry and the twin will route, store, smooth, score health, and drive the FSM automatically. Add a simulation model, a knowledge graph, a LangChain agent, and an OODA loop on top of this skeleton when you need them.

Extension points

The framework is built around protocols and abstract base classes. Replace any piece without touching the rest:

Extension Mechanism
Storage backend Implement TimeSeriesStore / DocumentStore /
CacheStore / ObjectStore protocol
Physics model Subclass ODEModel or implement TwinModel
ML surrogate Implement TwinModel with ONNX/sklearn/Torch
Discrete-event model Wrap a generator with SimPyModel
Forecaster Wrap any time-series model with ProphetForecaster
Custom reactive rule Implement Rule.evaluate(readings) (sync)
N-state FSM Subclass MultiStateFSMRuleEngine
Diagnostic agent tools Override DiagnosticAgent._build_extra_tools()
LLM provider Set DT_LLM__PROVIDER (openai/anthropic/ollama)
Goal-based assessment Subclass GoalPlanner.assess()
Strategic LLM decisions Wire an AutonomousOverseer into OODALoop
RL policy PolicyTrainer + PolicyDeployer (SB3)
Reward function Pass reward_fn to GenericTwinEnv
Notification channel Implement NotificationBackend (email/Slack/webhook)
Cross-twin transport Implement ConnectorProtocol
New collection pattern Subclass AbstractCollectionTwin
Domain session state Subclass SessionContext, use SessionStore[T]

Technology stack

Concern Library
Messaging Eclipse Mosquitto / paho-mqtt 2.x
Time-series InfluxDB 2 (influxdb-client)
Documents MongoDB (pymongo + motor)
Cache / pub-sub Redis
Object storage MinIO (S3 API)
Relational PostgreSQL (asyncpg) — used by RuleRepository
Twin state Eclipse Ditto
Knowledge graph Neo4j 5
Web API FastAPI + Uvicorn + sse-starlette
Config Pydantic v2 + pydantic-settings
State machine transitions
PID control simple-pid
Physics SciPy (solve_ivp)
Surrogate / ML ONNX Runtime + scikit-learn
Forecasting Prophet
Discrete event SimPy
Sentiment / NLP vaderSentiment (swappable for any callable)
Agents LangChain + langchain-classic
LLM providers OpenAI · Anthropic · Ollama
RL Stable-Baselines3 + Gymnasium
CLI Click

CLI reference

dtforge init        --asset-type TYPE --name NAME --asset-id ID [--out DIR]
dtforge infra up    --layers data,network,services[,intelligent] [--out FILE] [--generate-only]
dtforge infra check [--layers ...]              # default: read .dtforge-layers
dtforge run         [TWIN_MODULE]               # default module: "twin"
dtforge train       [--algorithm SAC|TD3|PPO|A2C] [--timesteps N]
                    [--env-module M] [--save NAME]

Run any command with --help for the full flag list.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dt_forge-0.4.3.tar.gz (84.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dt_forge-0.4.3-py3-none-any.whl (113.0 kB view details)

Uploaded Python 3

File details

Details for the file dt_forge-0.4.3.tar.gz.

File metadata

  • Download URL: dt_forge-0.4.3.tar.gz
  • Upload date:
  • Size: 84.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dt_forge-0.4.3.tar.gz
Algorithm Hash digest
SHA256 0985eef07f30d8b03dbcceaccc057b1742d9c86eaab899629be2f3247139fac3
MD5 47f8d9266286c68ddd8d58f80cd43cce
BLAKE2b-256 75f5db1e8761c62dceb42ab00ef1ee68fefd72fda08f1733a277a75281d914ef

See more details on using hashes here.

File details

Details for the file dt_forge-0.4.3-py3-none-any.whl.

File metadata

  • Download URL: dt_forge-0.4.3-py3-none-any.whl
  • Upload date:
  • Size: 113.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dt_forge-0.4.3-py3-none-any.whl
Algorithm Hash digest
SHA256 67100551d4e7c7b9464e9c65c4153322a3db5b8fae2d14248b4cda7656135528
MD5 41b8ef17752949df75c10dae0437aa24
BLAKE2b-256 b4518b3d94dcbc76e781028851f92f448a6e6ee03076ddfafc48b1234b719bb9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page