Skip to main content

Load & Stress Automation Freamework

Project description

LoadDensity

Multi-protocol load & stress automation: Locust + WebSocket + gRPC + MQTT + raw sockets, behind one JSON-driven action executor with batteries included.

PyPI Version Python Version License Documentation Status

繁體中文 | 简体中文


LoadDensity (je_load_density) started as a Locust wrapper and grew into a full multi-protocol load framework: HTTP, FastHttp, WebSocket, gRPC, MQTT, and raw TCP/UDP user templates behind one JSON-driven action executor, plus modules for parameterised data, scenario flow, reports, observability, distributed runners, recording, persistent storage, and an MCP control surface so Claude can drive load tests end-to-end. Every executor command has a deterministic name (LD_*) and a single dispatch point, so an action JSON can mix protocols, exporters, and reports in the same script.

Optional dependencies, opt-in install — every protocol driver and exporter ships behind a pip install je_load_density[<extra>] extra. The base install footprint is unchanged for users who only need HTTP load testing.

Table of Contents

Highlights

  • One executor, twelve user templates. HTTP, FastHttp, Async HTTP/2 (httpx), WebSocket, SSE, gRPC (unary + server/client/bidi streaming), MQTT, raw TCP/UDP, SQL (SQLAlchemy), Redis, Kafka, MongoDB — all dispatched from the same LD_start_test command through a user_detail_dict["user"] key.
  • Action JSON as a contract. Every command resolves through Executor.event_dict; the action list is the same whether it is hand-authored, generated by HAR import, sent over the control socket, or driven by an MCP tool.
  • Parameter resolver everywhere. ${var.NAME}, ${env.NAME}, ${csv.SOURCE.COL}, ${db.SOURCE.COL}, ${faker.method}, plus built-in ${uuid()}, ${now()}, ${randint(min,max)} helpers; values extracted from one response can feed the next task's URL, headers, body, or assertions.
  • Scenario flow without Python. Declare tasks as sequence (default), weighted, or conditional with run_if / skip_if predicates; per-task think_time, throttle.rps, and retry ({transient, flaky, permanent} budgets) control pacing & resilience without writing wait loops.
  • Built-in load shapes. load_shape="stages"|"spike"|"soak" with a JSON shape_config — no Locust subclass required.
  • Production-grade reliability. Adaptive retry with exponential backoff + jitter + per-error-class budgets, sliding-window failure budget / circuit breaker, process supervisor with hard-timeout watchdog, in-process network conditioner (latency / jitter / loss).
  • SLA gates + regression diff. LD_assert_sla fails CI when latency / failure-rate / request-count rules breach; LD_diff_runs compares two SQLite-persisted runs and flags per-name regressions over a tolerance.
  • Seven report formats. HTML, JSON, XML, CSV, JUnit XML, percentile-summary JSON, plus optional matplotlib chart reports (latency-over-time + RPS-over-time PNGs via [charts] extra).
  • Four live exporters. Prometheus HTTP endpoint, InfluxDB line-protocol UDP/HTTP sink, OpenTelemetry OTLP gRPC exporter, Datadog DogStatsD UDP sink — all lazily imported and gated by the matching install extra.
  • Live web dashboard. start_dashboard() boots a stdlib HTTP + SSE server that streams running RPS / avg / p95 / failure counts to any browser, per-name table included.
  • Slack + Teams notifiers. Block Kit + MessageCard summary posters keyed off the build_summary output (LD_post_slack_summary, LD_post_teams_summary).
  • Assertions + extractors. status_code, contains, not_contains, json_path, header assertions run under Locust's catch_response; extractors with sources json_path / header / status_code write back into the parameter resolver.
  • Distributed runners. runner_mode="master" / "worker" for cross-machine load with the same start_test API; master waits up to 60 s for the configured worker count before ramping.
  • Six importers. HAR (browser traffic), Postman v2.1 collections, OpenAPI 3.x specs, standalone cURL commands, k6 scripts, and JMeter JMX plans — each converts to action JSON or a single task ready for LD_start_test.
  • Auth helpers. Stdlib OAuth2 client (client_credentials / password / refresh with token cache), JWT signer (HS256/384/512 + RS256/384/512), AWS SigV4 request signer, plus mTLS client-cert support on every HTTP user template via task["cert"].
  • Persistent records. Optional SQLite sink with runs / records / metadata schema, indexed for cross-run regression checks; works against an empty file out of the box.
  • MCP server. python -m je_load_density.mcp_server exposes 11 tools so Claude (Desktop, Code, any MCP client) can run tests, manage projects, and pull reports without leaving chat.
  • Action JSON tooling. Built-in linter (LD_lint_action), JSON Schema exporter (LD_export_schema), GitHub Actions annotation emitter (LD_emit_github_annotations), stdlib LSP server (python -m je_load_density.action_lsp), composite GitHub Action wrapper (action.yml), pre-commit hook, and VS Code extension skeleton — editor + CI integration end-to-end.
  • Hardened control socket. 4-byte big-endian length-prefix framing (1 MiB cap), optional TLS via ssl.create_default_context, shared-secret token via env var or arg, plus a backwards-compatible legacy mode for downstream tools such as PyBreeze.
  • Safe executor. eval, exec, compile, __import__, breakpoint, open, and input are explicitly blocked from action JSON — only LD_* commands and a curated set of safe builtins (print, len, range, …) are dispatchable.
  • Live GUI. Optional PySide6 front-end with a live stats panel (RPS / avg / p95 / failures), translated to English, Traditional Chinese, Japanese, and Korean.
  • CLI subcommands. run / run-dir / run-str / init / serve. Legacy -e/-d/-c/--execute_str single-flag form is preserved for downstream tools.
  • Cross-platform. Windows 10/11, macOS, Ubuntu/Linux, Raspberry Pi (3B+ and later) on Python 3.10+.

Installation

Stable:

pip install je_load_density

Pulls in Locust and defusedxml — nothing else.

Optional extras

Install only the slices you use:

Extra Adds
gui PySide6 + qt-material (graphical front-end)
websocket websocket-client (WebSocket user template)
grpc grpcio + protobuf (gRPC user template)
mqtt paho-mqtt (MQTT user template)
redis redis (Redis user template)
kafka kafka-python (Kafka user template)
sql sqlalchemy (SQL user template + ${db.*} placeholder)
mongo pymongo (MongoDB user template)
http2 httpx[http2] (Async HTTP/2 user template)
auth cryptography (RS256/384/512 JWT signing)
reliability psutil (ProcessSupervisor)
prometheus prometheus-client (Prometheus exporter)
opentelemetry OpenTelemetry SDK + OTLP gRPC exporter
metrics prometheus + opentelemetry bundle
charts matplotlib (chart-rendering reports)
yaml pyyaml (OpenAPI YAML loading)
faker Faker (powers ${faker.method} placeholders)
mcp mcp SDK (drives the MCP server)
all Everything above
pip install "je_load_density[gui]"
pip install "je_load_density[mqtt,grpc,websocket]"
pip install "je_load_density[metrics]"
pip install "je_load_density[mcp]"
pip install "je_load_density[all]"

Development install

git clone https://github.com/Integration-Automation/LoadDensity.git
cd LoadDensity
pip install -e ".[all]"
pip install -r requirements.txt

Hard requirements: Python 3.10+, locust, defusedxml.

Architecture

System overview

flowchart LR
  subgraph Authoring
    A1["Action JSON files"]
    A2["Programmatic start_test"]
    A3["HAR → action JSON"]
    A4["MCP / Claude"]
  end

  subgraph Core
    EXE["Action Executor<br/>event_dict (LD_*)"]
    RES["Parameter Resolver<br/>${var} / ${env} / ${csv} / ${faker}"]
    REC["test_record_instance"]
  end

  subgraph Runners
    LOC["Locust local"]
    MAS["Locust master"]
    WRK["Locust worker"]
  end

  subgraph Templates
    HTTP["HTTP / FastHttp"]
    WS["WebSocket"]
    GRPC["gRPC"]
    MQTT["MQTT"]
    SOCK["Raw TCP/UDP"]
  end

  subgraph Outputs
    REP["Reports<br/>HTML/JSON/XML/CSV/JUnit/Summary"]
    EXP["Exporters<br/>Prometheus · InfluxDB · OTel"]
    SQL["SQLite persistence"]
  end

  A1 --> EXE
  A2 --> EXE
  A3 --> A1
  A4 --> EXE
  EXE --> RES
  EXE --> LOC
  EXE --> MAS
  EXE --> WRK
  LOC --> HTTP & WS & GRPC & MQTT & SOCK
  MAS --> WRK
  WRK --> HTTP & WS & GRPC & MQTT & SOCK
  HTTP & WS & GRPC & MQTT & SOCK --> REC
  REC --> REP
  REC --> EXP
  REC --> SQL

Action lifecycle

flowchart LR
  IN["Action<br/>[cmd, args_or_kwargs]"] --> DISP["event_dict[cmd]"]
  DISP -- "LD_start_test" --> SEED["Seed resolver from<br/>variables / csv_sources"]
  SEED --> PICK["Pick user template<br/>(_USER_REGISTRY)"]
  PICK --> ENV["prepare_env<br/>(local / master / worker)"]
  ENV --> RUN["Locust runner ticks"]
  RUN --> EXPAND["Parameter resolver<br/>expands ${...} per task"]
  EXPAND --> EXEC["execute_task<br/>(per-protocol request)"]
  EXEC -- response --> ASSERT["assertions + extractors"]
  ASSERT --> EVT["Locust request event"]
  EVT --> REC["test_record_instance.append"]
  DISP -- "LD_generate_*_report" --> RREAD["Read from test_record_instance"]
  RREAD --> OUT["Report file(s)"]

User dispatch

flowchart TB
  CMD["start_test(user_detail_dict={...})"] --> KEY{"user key?"}
  KEY -- "fast_http_user (default)" --> FH["FastHttpUserWrapper<br/>(geventhttpclient)"]
  KEY -- "http_user" --> H["HttpUserWrapper<br/>(requests)"]
  KEY -- "websocket_user" --> WS["WebSocketUserWrapper<br/>(websocket-client)"]
  KEY -- "grpc_user" --> G["GrpcUserWrapper<br/>(grpcio + importlib lookup)"]
  KEY -- "mqtt_user" --> M["MqttUserWrapper<br/>(paho-mqtt)"]
  KEY -- "socket_user" --> S["SocketUserWrapper<br/>(stdlib TCP / UDP)"]
  FH & H & WS & G & M & S --> SC["scenario_runner<br/>(sequence / weighted / conditional)"]
  SC --> RX["request_executor.execute_task"]

Module map

je_load_density/
├── __init__.py                       # Public API re-exports
├── __main__.py                       # CLI: run / run-dir / run-str / init / serve
├── gui/                              # Optional PySide6 front-end
│   ├── language_wrapper/             # En / zh-TW / Ja / Ko translations
│   ├── load_density_gui_thread.py    # Worker thread for non-blocking starts
│   ├── log_to_ui_filter.py           # Forward logger records to the UI pane
│   ├── main_widget.py                # Form-based test configurator
│   ├── main_window.py                # PySide6 main window shell
│   └── stats_panel.py                # Live RPS / avg / p95 / failures panel
├── mcp_server/                       # MCP server (11 tools for Claude)
│   ├── __main__.py
│   └── server.py
├── utils/
│   ├── callback/                     # callback_executor (post-action callbacks)
│   ├── exception/                    # LoadDensity* exception hierarchy + tags
│   ├── executor/                     # Executor class · event_dict · safe builtins
│   ├── file_process/                 # Directory walker · project scaffolder
│   ├── generate_report/              # HTML / JSON / XML / CSV / JUnit / Summary
│   ├── get_data_structure/           # API data helper (legacy)
│   ├── json/                         # JSON read/write · placeholder normaliser
│   ├── logging/                      # Configured load_density_logger
│   ├── metrics/                      # Prometheus · InfluxDB · OpenTelemetry sinks
│   ├── package_manager/              # Dynamic package loader (LD_add_package_*)
│   ├── parameterization/             # ParameterResolver + CSV / faker sources
│   ├── project/                      # Project template + create_project_dir
│   ├── recording/                    # HAR → action JSON converter
│   ├── socket_server/                # Length-framed TCP control plane (+TLS+token)
│   ├── test_record/                  # In-memory record list + SQLite persistence
│   └── xml/                          # defusedxml-backed XML helpers
└── wrapper/
    ├── create_locust_env/            # prepare_env / create_env (local/master/worker)
    ├── event/                        # request_hook (binds Locust events → records)
    ├── proxy/                        # Per-protocol task store (locust_wrapper_proxy)
    │   └── user/                     # fast_http / http / websocket / grpc / mqtt / socket
    ├── start_wrapper/                # start_test dispatcher (_USER_REGISTRY)
    └── user_template/                # Locust user classes + scenario_runner + request_executor
load_density_driver/                  # Standalone driver builds
test/                                 # pytest test suite
docs/                                 # Sphinx documentation (En / Zh / API)

Quick Start

HTTP load test in Python

from je_load_density import start_test

start_test(
    user_detail_dict={"user": "fast_http_user"},
    user_count=50,
    spawn_rate=10,
    test_time=30,
    variables={"base": "https://httpbin.org"},
    tasks=[
        {"method": "get",  "request_url": "${var.base}/get"},
        {"method": "post", "request_url": "${var.base}/post",
         "json": {"hello": "world"},
         "assertions": [{"type": "status_code", "value": 200}]},
    ],
)

Action JSON

{"load_density": [
  ["LD_register_variables", {"variables": {"base": "https://httpbin.org"}}],
  ["LD_start_test", {
    "user_detail_dict": {"user": "fast_http_user"},
    "user_count": 20, "spawn_rate": 10, "test_time": 30,
    "tasks": [
      {"method": "get",  "request_url": "${var.base}/get"},
      {"method": "post", "request_url": "${var.base}/post",
       "json": {"hello": "world"}}
    ]
  }],
  ["LD_generate_summary_report", {"report_name": "smoke"}]
]}

Run via the CLI:

python -m je_load_density run smoke.json

Action shapes

["command"]                                    # no args
["command", {"key": "value"}]                  # kwargs
["command", [arg1, arg2]]                      # positional

The top-level document is either a bare list or a {"load_density": [...]} wrapper.

Recipes

Short copy-paste snippets covering the most common needs. Each works as either a Python start_test call or the LD_start_test action.

Recipe Demonstrates
HTTP smoke fast_http_user + status_code assertion + summary report.
Auth flow extract token from login response, reuse via ${var.auth} header on protected calls.
Weighted mix mode: "weighted" with weight per task to skew traffic toward hot endpoints.
WebSocket echo websocket_user connect → sendrecv → close with expect substring assertion.
gRPC unary grpc_user with stub_path / request_path + metadata tuple list + per-call timeout.
MQTT pub/sub mqtt_user connect → subscribe → publish → disconnect against a local broker.
Raw TCP/UDP socket_user with payload (text or hex:…) and expect_substring.
Distributed run One runner_mode="master" + N runner_mode="worker" processes against the same action JSON.
HAR replay LD_load_harLD_har_to_action_json with regex include / exclude.
Metrics export LD_start_prometheus_exporter, LD_start_influxdb_sink, LD_start_opentelemetry_exporter.
Persist results LD_persist_records to SQLite with label + metadata, then LD_list_runs for trend.
MCP-driven Wire Claude to python -m je_load_density.mcp_server and call run_test / generate_reports.
SLA gate LD_assert_sla with latency_p95 / failure_rate rules to fail CI on regression.
Spike shape load_shape="spike" + shape_config to drive baseline → spike → baseline ramp.
Think time + throttle task["think_time"] and task["throttle"]={"rps":...} to pace traffic.
Postman / OpenAPI / cURL LD_postman_to_action_json / LD_openapi_to_action_json / LD_curl_to_task for one-shot imports.
Redis / Kafka / SQL Use user_detail_dict={"user": "redis_user"} etc with protocol-specific task fields.

Pair the table with the dedicated chapter (see Table of Contents) for the full parameter surface.

Core API

from je_load_density import (
    start_test, prepare_env, create_env,
    execute_action, execute_files, executor, add_command_to_executor,
    test_record_instance, locust_wrapper_proxy,
    register_variable, register_variables,
    register_csv_source, register_csv_sources,
    parameter_resolver, resolve,
    har_to_action_json, har_to_tasks, load_har,
    persist_records, list_runs, fetch_run_records,
    start_prometheus_exporter, stop_prometheus_exporter,
    start_influxdb_sink, stop_influxdb_sink,
    start_opentelemetry_exporter, stop_opentelemetry_exporter,
    start_load_density_socket_server,
    generate_html_report, generate_json_report, generate_xml_report,
    generate_csv_report, generate_junit_report, generate_summary_report,
    build_summary,
    create_project_dir, callback_executor, read_action_json,
)

The full public surface lives in __all__ at je_load_density/__init__.py.

Action Executor

The action executor maps a string command name to a Python callable. Every backend, exporter, and report helper registers under event_dict.

Built-in LD_* commands

Group Commands
Core LD_start_test, LD_execute_action, LD_execute_files, LD_add_package_to_executor, LD_start_socket_server
Reports LD_generate_html(_report), LD_generate_json(_report), LD_generate_xml(_report), LD_generate_csv_report, LD_generate_junit_report, LD_generate_summary_report, LD_generate_chart_report, LD_summary
Persistence LD_persist_records, LD_list_runs, LD_fetch_run_records, LD_clear_records
Parameters LD_register_variable(s), LD_register_csv_source(s), LD_register_db_source(s), LD_clear_resolver
Recording LD_load_har, LD_har_to_*, LD_postman_to_*, LD_openapi_to_*, LD_curl_to_task, LD_k6_script_to_*, LD_jmeter_to_*
Metrics LD_start/stop_prometheus_exporter, LD_start/stop_influxdb_sink, LD_start/stop_opentelemetry_exporter, LD_start/stop_statsd_sink
Quality / DX LD_lint_action, LD_lint_action_file, LD_export_schema, LD_emit_github_annotations
SLA / regression LD_evaluate_sla, LD_assert_sla, LD_diff_runs
Reliability LD_install_failure_budget, LD_uninstall_failure_budget, LD_install_network_conditioner, LD_uninstall_network_conditioner
Dashboard / notify LD_start_dashboard, LD_stop_dashboard, LD_post_slack_summary, LD_post_teams_summary

Safe Python built-ins (print, len, range, …) are also accepted; eval, exec, compile, __import__, breakpoint, open, and input are explicitly blocked.

Custom commands

from je_load_density import add_command_to_executor

def slack_notify(message: str) -> None:
    ...

add_command_to_executor({"LD_slack_notify": slack_notify})

User Templates

Every template registers under start_test via user_detail_dict={"user": "<key>"}. Tasks share the same shape across HTTP, WebSocket, gRPC, MQTT, and raw socket users; only the protocol-specific fields differ.

HTTP / FastHttp

start_test(
    user_detail_dict={"user": "fast_http_user"},
    user_count=50, spawn_rate=10, test_time=60,
    variables={"base": "https://api.example.com"},
    tasks=[
        {"method": "post", "request_url": "${var.base}/login",
         "json": {"email": "u@example.com", "password": "secret"},
         "extract": [{"var": "auth", "from": "json_path", "path": "data.token"}]},
        {"method": "get", "request_url": "${var.base}/profile",
         "headers": {"Authorization": "Bearer ${var.auth}"},
         "assertions": [{"type": "status_code", "value": 200}]},
    ],
)

fast_http_user is the default; http_user swaps the client for requests-style synchronous calls when third-party adapters require it.

WebSocket

pip install "je_load_density[websocket]"

start_test(
    user_detail_dict={"user": "websocket_user"},
    user_count=10, spawn_rate=5, test_time=60,
    tasks=[
        {"method": "connect", "request_url": "wss://echo.example.com/socket"},
        {"method": "sendrecv", "payload": '{"ping": 1}', "expect": "pong"},
        {"method": "close"},
    ],
)

gRPC

pip install "je_load_density[grpc]"

start_test(
    user_detail_dict={"user": "grpc_user"},
    user_count=20, spawn_rate=5, test_time=60,
    tasks=[{
        "name": "say_hello",
        "target": "localhost:50051",
        "stub_path": "pkg.greeter_pb2_grpc.GreeterStub",
        "request_path": "pkg.greeter_pb2.HelloRequest",
        "method": "SayHello",
        "payload": {"name": "world"},
        "metadata": [["x-token", "abc"]],
        "timeout": 5,
    }],
)

stub_path and request_path are validated against a strict identifier regex before importlib.import_module, so traversal-style attacks are rejected.

MQTT

pip install "je_load_density[mqtt]"

start_test(
    user_detail_dict={"user": "mqtt_user"},
    user_count=10, spawn_rate=5, test_time=60,
    tasks=[
        {"method": "connect",   "broker": "127.0.0.1:1883"},
        {"method": "subscribe", "topic":  "telemetry/in", "qos": 1},
        {"method": "publish",   "topic":  "telemetry/out", "payload": "ping", "qos": 1},
        {"method": "disconnect"},
    ],
)

Raw TCP / UDP

Stdlib only; nothing to install.

start_test(
    user_detail_dict={"user": "socket_user"},
    user_count=20, spawn_rate=5, test_time=60,
    tasks=[
        {"protocol": "tcp", "target": "127.0.0.1:9000",
         "payload": "PING\n", "expect_bytes": 64,
         "expect_substring": "PONG"},
        {"protocol": "udp", "target": "127.0.0.1:9000",
         "payload": "hex:DEADBEEF", "expect_bytes": 4},
    ],
)

Parameter Resolver

Placeholders are expanded automatically on every task:

Placeholder Resolves to
${var.NAME} Value passed to register_variable(s)
${env.NAME} Environment variable NAME
${csv.SOURCE.COL} Next row from CSV source SOURCE (cycles by default)
${faker.METHOD} Faker().METHOD() (lazy import)
${uuid()} New UUID 4 string
${now()} Local ISO-8601 timestamp (seconds)
${randint(min, max)} Cryptographically-strong random int
from je_load_density import register_variable, register_csv_source

register_variable("base", "https://api.example.com")
register_csv_source("users", "users.csv")

Or from action JSON:

["LD_register_variables", {"variables": {"base": "https://api.example.com"}}]
["LD_register_csv_sources", {"sources": [{"name": "users", "file_path": "users.csv"}]}]

Unknown placeholders are left in place so missing data is visible during a dry run.

Scenario Modes

{
  "mode": "weighted",
  "tasks": [
    {"method": "get", "request_url": "/products", "weight": 3},
    {"method": "get", "request_url": "/expensive", "weight": 1}
  ]
}
Mode Behaviour
sequence Run every task in order each tick (default)
weighted Pick one task per tick by weight
conditional Use run_if / skip_if predicates evaluated against the parameter resolver

Predicates: bool, "${var.x}", {"equals": [a,b]}, {"not_equals": [a,b]}, {"in": [needle, haystack]}, {"truthy": value}.

Assertions & Extractors

Both run under Locust's catch_response; failed assertions surface in every report.

{
  "method": "post",
  "request_url": "${var.base}/login",
  "json": {"email": "u@example.com", "password": "secret"},
  "assertions": [
    {"type": "status_code", "value": 200},
    {"type": "json_path", "path": "data.role", "value": "admin"}
  ],
  "extract": [
    {"var": "auth_token", "from": "json_path", "path": "data.token"},
    {"var": "request_id", "from": "header",    "name": "X-Request-Id"}
  ]
}

Assertion types: status_code, contains, not_contains, json_path, header. Extractor sources: json_path, header, status_code.

Reports

Six formats consumed from test_record_instance:

from je_load_density import (
    generate_html_report, generate_json_report, generate_xml_report,
    generate_csv_report, generate_junit_report, generate_summary_report,
)

generate_html_report("report")           # report.html
generate_json_report("report")           # report_success.json + report_failure.json
generate_xml_report("report")            # report_success.xml  + report_failure.xml
generate_csv_report("report")            # report.csv
generate_junit_report("report-junit")    # report-junit.xml (CI)
generate_summary_report("report-sum")    # totals + per-name p50/p90/p95/p99
Format Output shape Spec-driven?
HTML <base>.html (success + failure table, colour-coded) single
JSON <base>_success.json + <base>_failure.json split
XML <base>_success.xml + <base>_failure.xml split
CSV <base>.csv single
JUnit <base>-junit.xml (CI-native) single
Summary <base>.json (per-name p50/p90/p95/p99) single

Observability

from je_load_density import (
    start_prometheus_exporter, start_influxdb_sink, start_opentelemetry_exporter,
)

start_prometheus_exporter(port=9646, addr="127.0.0.1")
start_influxdb_sink(transport="udp", host="influxdb", port=8089)
start_opentelemetry_exporter(endpoint="http://otel-collector:4317",
                             service_name="loaddensity")
Sink Metrics
Prometheus loaddensity_requests_total, loaddensity_request_latency_ms, loaddensity_response_bytes
InfluxDB loaddensity_request line-protocol points (UDP or HTTP)
OTel loaddensity.requests, loaddensity.request.latency, loaddensity.response.size

All three are loaded lazily and gated by the matching install extra.

Distributed Master / Worker

# master
start_test(
    user_detail_dict={"user": "fast_http_user"},
    runner_mode="master",
    master_bind_host="0.0.0.0", master_bind_port=5557,
    expected_workers=4,
    web_ui_dict={"host": "0.0.0.0", "port": 8089},
    user_count=400, spawn_rate=40, test_time=600,
    tasks=[...],
)

# worker
start_test(
    user_detail_dict={"user": "fast_http_user"},
    runner_mode="worker",
    master_host="10.0.0.10", master_port=5557,
    tasks=[...],
)

The master waits up to 60 s for expected_workers workers to register before starting the load ramp.

HAR Record / Replay

from je_load_density import load_har, har_to_action_json

har = load_har("recording.har")
action_json = har_to_action_json(
    har,
    user="fast_http_user",
    user_count=20, spawn_rate=10, test_time=120,
    include=[r"api\.example\.com"],
    exclude=[r"\.svg$"],
)

Captures from Chrome / Firefox DevTools, mitmproxy, Charles, etc. all work. Status codes flow through as status_code assertions on every generated task.

Persistent Records (SQLite)

from je_load_density import persist_records, list_runs, fetch_run_records

run_id = persist_records(
    "loadtests.db",
    label="checkout-2026-04-28",
    metadata={"branch": "dev", "commit": "abc1234"},
)
for row in list_runs("loadtests.db", limit=10):
    print(row)

Schema is created lazily; an empty file is fine. Indexes on run_id and name keep cross-run queries fast.

MCP Server (for Claude)

pip install "je_load_density[mcp]"
python -m je_load_density.mcp_server

Wire it into Claude Desktop / Code:

{
  "mcpServers": {
    "loaddensity": {
      "command": "python",
      "args": ["-m", "je_load_density.mcp_server"]
    }
  }
}

Eleven tools are exposed: run_test, run_action_json, create_project, list_executor_commands, import_har, generate_reports, summary, persist_records, list_runs, fetch_run, clear_records.

Hardened Control Socket

python -m je_load_density serve \
    --host 0.0.0.0 --port 9940 --framed \
    --token "$LOAD_DENSITY_SOCKET_TOKEN" \
    --tls-cert /etc/loaddensity/server.crt \
    --tls-key /etc/loaddensity/server.key
  • 4-byte big-endian length-prefixed frames (1 MiB cap)
  • Optional TLS (cert/key on disk; ssl.create_default_context, TLS 1.2+ minimum)
  • Shared-secret token compared with hmac.compare_digest; once configured, all payloads must use {"token": "...", "command": [...]} and may set "op": "quit" to stop the server
  • Token also reads from the LOAD_DENSITY_SOCKET_TOKEN env var
  • Legacy unauthenticated mode preserved for backwards compatibility

GUI

pip install "je_load_density[gui]"
import sys
from PySide6.QtWidgets import QApplication
from je_load_density.gui.main_window import LoadDensityUI

app = QApplication(sys.argv)
window = LoadDensityUI()
window.show()
sys.exit(app.exec())

The GUI ships English, Traditional Chinese, Japanese, and Korean translations and a live stats panel that polls test_record_instance once a second (RPS, average / p95 latency, failure count).

CLI Usage

python -m je_load_density run FILE              # execute one action JSON file
python -m je_load_density run-dir DIR           # execute every .json in DIR
python -m je_load_density run-str JSON          # execute an inline JSON string
python -m je_load_density init PATH             # scaffold a project skeleton
python -m je_load_density serve [--host ...]    # start the control socket

Legacy single-flag form (-e/-d/-c/--execute_str) is still accepted for backwards compatibility with downstream tools.

Test Record

test_record_instance.test_record_list and error_record_list collect every request with Method, test_url, name, status_code, response_time_ms, response_length, and (for failures) error. Reports and the SQLite sink read directly from these lists.

Exception Handling

LoadDensityTestException
├── LoadDensityTestJsonException
├── LoadDensityGenerateJsonReportException
├── LoadDensityTestExecuteException
├── LoadDensityAssertException
├── LoadDensityHTMLException
├── LoadDensityAddCommandException
├── XMLException → XMLTypeException
└── CallbackExecutorException

All custom exceptions inherit from LoadDensityTestException; catching that one class covers the public surface.

Logging

LoadDensity exposes a single configured logger (load_density_logger) under je_load_density.utils.logging.loggin_instance. Hook it into your existing log infrastructure with the standard logging module APIs.

Supported Platforms

Platform Status
Windows 10 / 11 Fully supported
macOS Fully supported
Ubuntu / Linux Fully supported
Raspberry Pi Tested on 3B+ and later

Python 3.10+ required.

SLA Gates & Regression Diff

from je_load_density import assert_sla, build_summary, diff_runs

assert_sla([
    {"type": "failure_rate", "value": 0.02},
    {"type": "latency_p95", "value": 800},
    {"type": "latency_p95", "name": "/checkout", "value": 500},
    {"type": "requests", "op": "gte", "value": 1000},
], summary=build_summary())

report = diff_runs("loadtests.db",
                   baseline_run_id=42, current_run_id=43,
                   tolerance=0.10)
if report["has_regressions"]:
    raise SystemExit(report["regressions"])

Supported rule types: latency_p50 / _p90 / _p95 / _p99, latency_mean, failure_rate, requests. op is lt (default lte), gt, gte. Per-endpoint rules pass name.

Load Shapes

start_test(
    user_detail_dict={"user": "fast_http_user"},
    load_shape="spike",
    shape_config={"baseline_users": 20, "spike_users": 200,
                  "spawn_rate": 50, "pre_seconds": 30,
                  "spike_seconds": 30, "post_seconds": 30},
    tasks=[...],
)

Built-ins: "stages" (list of {duration, users, spawn_rate}), "spike", "soak". All return Locust LoadTestShape subclasses behind the scenes.

Think Time & Throttle

[
  {"method": "get", "request_url": "${var.base}/home",
   "think_time": {"min": 0.5, "max": 1.5}},
  {"method": "get", "request_url": "${var.base}/checkout",
   "throttle": {"key": "checkout", "rps": 25, "burst": 5}}
]

Both controls are per-task and resolved before the request fires. Throttle buckets are shared across users by key.

Importers

from je_load_density import (
    load_har, har_to_action_json,
    load_postman_collection, postman_to_action_json,
    load_openapi, openapi_to_action_json,
    curl_to_task,
)

action_a = har_to_action_json(load_har("recording.har"))
action_b = postman_to_action_json(load_postman_collection("collection.json"))
action_c = openapi_to_action_json(load_openapi("openapi.yaml"))
task     = curl_to_task("curl -X POST https://api/login -d '{\"x\":1}'")

OpenAPI substitutes {param} path segments with ${var.param} so the caller can supply values via register_variables.

Action JSON Linter, Schema & LSP

from je_load_density import lint_action, export_schema

findings = lint_action({"load_density": [["LD_typo"]]})
# [{'rule': 'unknown-command', 'severity': 'error', ...}]

export_schema("docs/reference/loaddensity-action-schema.json")

Stdlib LSP for editor integration:

python -m je_load_density.action_lsp   # or: loaddensity-lsp

textDocument/completion returns every LD_* command; publishDiagnostics runs the linter on every change.

GitHub Actions Annotations

from je_load_density import emit_github_annotations

emit_github_annotations(title="LoadDensity")
# ::error title=LoadDensity::GET /checkout (HTTP 500): timeout

One ::error:: line per failure record; reviewers see them inline in the PR Files Changed view.

Examples & Local Lab

  • examples/ ships 12 runnable recipes (smoke, auth flow, weighted mix, WebSocket, MQTT, Redis, spike shape, SLA gates, HAR / Postman / OpenAPI imports).
  • docker/ brings up httpbin, Mosquitto (MQTT), Redis, Kafka, and Prometheus with one docker compose up -d.

Reliability

from je_load_density import (
    AdaptiveRetryPolicy, run_with_retry,
    install_failure_budget, install_network_conditioner,
    with_watchdog,
)

# Adaptive retry — exponential backoff + jitter + per-error-class budget
policy = AdaptiveRetryPolicy(transient_budget=5, flaky_budget=2,
                              base_delay=0.1, max_delay=2.0)
run_with_retry(lambda: do_request(), policy=policy)

# Per-task retry (declarative)
# task["retry"] = {"transient": 3, "flaky": 1, "base_delay": 0.2}

# Failure budget — abort the run when 5% of the last 30s fail
install_failure_budget(threshold=0.05, window_seconds=30,
                       runner_quit_callback=lambda: env.runner.quit())

# Network conditioner — inject latency / jitter / loss
install_network_conditioner(latency_ms=50, jitter_ms=20, loss_rate=0.01,
                             name_filter="/checkout")

# Watchdog — hard-kill a hung CI run
with_watchdog(lambda: execute_action(action_json), timeout_seconds=600)

Live Dashboard

from je_load_density import start_dashboard

start_dashboard(host="127.0.0.1", port=8765, refresh_seconds=1.0)
# open http://127.0.0.1:8765 → /events streams JSON snapshots via SSE

Slack / Teams / StatsD

from je_load_density import (
    post_slack_summary, post_teams_summary, start_statsd_sink,
)

start_statsd_sink(host="dogstatsd", port=8125, prefix="loaddensity")
post_slack_summary("https://hooks.slack.com/services/...")
post_teams_summary("https://outlook.office.com/webhook/...")

Auth

from je_load_density import (
    OAuth2Client, sign_jwt, sign_aws_request,
)

client = OAuth2Client("https://idp/token", "id", "secret", scope="read:x")
token = client.get_client_credentials()  # cached for the lifetime of expires_in

jwt = sign_jwt({"sub": "alice"}, secret="topsecret",
                algorithm="HS256", expires_in_seconds=300)

aws_headers = sign_aws_request(
    method="GET",
    url="https://s3.amazonaws.com/mybucket/key",
    region="us-east-1", service="s3",
    access_key="AK", secret_key="sk",
)

mTLS:

{"method": "get", "request_url": "https://mtls.api/x",
 "cert": ["/etc/ssl/client.pem", "/etc/ssl/key.pem"]}

k6 / JMeter Importers

from je_load_density import (
    load_k6_script, k6_script_to_action_json,
    load_jmeter_jmx, jmeter_to_action_json,
)

action = k6_script_to_action_json(load_k6_script("script.js"))
action = jmeter_to_action_json(load_jmeter_jmx("plan.jmx"))

Combined with the existing HAR / Postman / OpenAPI / cURL importers, LoadDensity reads from every common load-test source format.

GitHub Action & pre-commit

# .github/workflows/load.yml
- uses: ./   # or: Integration-Automation/LoadDensity@v1
  with:
    action-file: actions/smoke.json
    extras: "metrics,websocket"
    fail-on-error: "true"
# .pre-commit-config.yaml
- repo: https://github.com/Integration-Automation/LoadDensity
  rev: v1.0.0
  hooks:
    - id: loaddensity-lint

VS Code Extension

editors/vscode/ ships a minimal extension that launches python -m je_load_density.action_lsp over stdio for completion + diagnostics. Build with npm install && npm run package and install the resulting .vsix.

License

MIT — see LICENSE.

Copyright (c) 2022~2026 JE-Chen

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

je_load_density-0.0.72.tar.gz (156.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

je_load_density-0.0.72-py3-none-any.whl (193.7 kB view details)

Uploaded Python 3

File details

Details for the file je_load_density-0.0.72.tar.gz.

File metadata

  • Download URL: je_load_density-0.0.72.tar.gz
  • Upload date:
  • Size: 156.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for je_load_density-0.0.72.tar.gz
Algorithm Hash digest
SHA256 9658bead1380fcf164c28e0ccc38314c0809fa07f80f00947121d0de55d3427e
MD5 951adcb9d9ac39e374e82555e03ab6dc
BLAKE2b-256 105ae18dbe6c528afd46250724432e6ea826c4eba8f2227605751e8b82c9b446

See more details on using hashes here.

File details

Details for the file je_load_density-0.0.72-py3-none-any.whl.

File metadata

File hashes

Hashes for je_load_density-0.0.72-py3-none-any.whl
Algorithm Hash digest
SHA256 30e5afd2fd275b638e5f1447116d721f275a7e58425215854770df5d1aae8445
MD5 3651f98ff2cafaa9c4e6ea891aa878d3
BLAKE2b-256 66f4f9deb92450e2a70d902f2abe16803c650079b8e096e5ecc42df4704ecd71

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page