Skip to main content

Rust-powered load generation task for DIMQ

Project description

DIMQ - Distributed In-Memory Queue

A general-purpose distributed task processing framework built on ZeroMQ. Workers connect to a central orchestrator, receive tasks, and return results. The orchestrator adaptively tunes per-worker parallelization to maximize throughput.

Architecture

                    +-------------------+
  Clients -------->| Orchestrator      |<-------- Workers
  (DEALER)         | (2x ROUTER)       |          (DEALER)
                   |                   |
  SUBMIT/STATUS/   | - FIFO task queue |  READY/HEARTBEAT/
  RESULT queries   | - Result storage  |  RESULT messages
                   | - Retry logic     |
                   | - Adaptive tuning |
                   +-------------------+

Orchestrator runs two ZMQ ROUTER sockets: one for workers (registration, heartbeats, task dispatch/results) and one for clients (task submission, status queries, result retrieval).

Workers connect via ZMQ DEALER sockets, register with their CPU count, and pull tasks. They run tasks concurrently (sync tasks in a thread pool, async tasks natively) and report results back. Workers handle timeout cancellation locally; the orchestrator owns all retry decisions.

Adaptive parallelization starts each worker's parallel task limit at its CPU count, then probes upward. If throughput plateaus or drops, it scales back by one and enters steady mode, re-probing periodically.

Tasks are plain Python functions with Pydantic-typed input and output. No base class or decorator needed -- the framework introspects types via inspect.

Prerequisites

  • Python 3.9+
  • uv
  • Rust toolchain (for the LoadTask extension only)

Setup

# Install Python dependencies
uv sync

# (Optional) Build the Rust LoadTask extension
cd dimq_load_task
uv tool run maturin develop --uv
cd ..

Running

Create a config file:

# config.yaml
endpoint: "tcp://0.0.0.0:5555"
client_endpoint: "tcp://0.0.0.0:5556"
heartbeat_interval_seconds: 5
heartbeat_timeout_missed: 3

tasks:
  - name: "my_app.tasks:process"
    max_retries: 3
    timeout_seconds: 30

Start the orchestrator and one or more workers:

# Terminal 1: start orchestrator
uv run dimq orchestrator --config config.yaml

# Terminal 2: start a worker (same machine)
uv run dimq worker --config config.yaml

# Terminal 3: start a worker on another machine (override endpoint)
uv run dimq worker --config config.yaml --endpoint tcp://orchestrator-host:5555

Submitting tasks programmatically

import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.connect("tcp://localhost:5556")

# Submit a task
sock.send_multipart([
    b"SUBMIT",
    b"my_app.tasks:process",    # task type
    b"task-001",                 # task ID
    b'{"input_field": "value"}', # JSON payload matching the Pydantic input model
])

# Receive ACK
ack = sock.recv_multipart()  # [b"ACK", b"task-001"]

# Later, query the result
sock.send_multipart([b"RESULT", b"task-001"])
reply = sock.recv_multipart()
# [b"RESULT_REPLY", b"task-001", b"COMPLETED", b'{"output_field": "result"}']

Testing

# Run all tests (excludes e2e by default if Docker unavailable)
uv run pytest -v

# Run specific test modules
uv run pytest tests/test_orchestrator.py -v
uv run pytest tests/test_integration.py -v

# Run LoadTask tests (requires Rust extension to be built)
uv run pytest tests/test_load_task.py -v

# Run end-to-end Docker test (requires Docker daemon)
# Spins up 3 worker containers, submits load tasks, verifies adaptive tuning
uv run pytest tests/test_e2e_docker.py -v -s

Writing Custom Tasks

A task is a plain function with Pydantic-typed input and output. It can be sync or async.

# my_app/tasks.py
from pydantic import BaseModel

class ImageInput(BaseModel):
    url: str
    width: int
    height: int

class ImageOutput(BaseModel):
    thumbnail_path: str
    original_size_bytes: int

def resize(input: ImageInput) -> ImageOutput:
    # Your logic here
    return ImageOutput(
        thumbnail_path=f"/tmp/{input.width}x{input.height}.jpg",
        original_size_bytes=1024,
    )

# Async tasks work the same way
async def fetch_and_resize(input: ImageInput) -> ImageOutput:
    ...

Register it in your config:

tasks:
  - name: "my_app.tasks:resize"
    max_retries: 2
    timeout_seconds: 60
  - name: "my_app.tasks:fetch_and_resize"
    max_retries: 3
    timeout_seconds: 120

The framework uses inspect to automatically extract:

  • Input type from the first parameter's annotation
  • Output type from the return annotation
  • Whether the function is sync or async

Sync tasks run in a thread pool. Async tasks run natively in the event loop. If a task exceeds timeout_seconds, the worker cancels it and reports a timeout to the orchestrator, which handles retries.

LoadTask (Rust Extension)

A built-in stress-testing task implemented in Rust (pyo3). It creates configurable CPU, I/O, and memory pressure while releasing the GIL so Python threading works efficiently.

import dimq_load_task

result = dimq_load_task.run(
    duration_seconds=5.0,
    concurrency=4,       # number of CPU threads
    cpu_load=0.7,        # fraction of duration for CPU work (SHA-256 hashing)
    io_load=0.2,         # fraction for I/O (temp file writes)
    memory_mb=100,       # memory to allocate and touch
)
# result = {
#     "phases": [
#         {"type": "memory", "start_seconds": 0.0, "duration_seconds": 0.001},
#         {"type": "cpu", "start_seconds": 0.001, "duration_seconds": 3.5},
#         {"type": "io", "start_seconds": 3.501, "duration_seconds": 1.0},
#     ],
#     "total_duration_seconds": 4.501,
#     "peak_memory_mb": 100,
# }

Project Structure

DIMQ/
├── pyproject.toml
├── Dockerfile              # Multi-stage build for worker containers
├── docker-compose.yml      # 3 worker services for e2e testing
├── src/dimq/
│   ├── orchestrator.py     # ZMQ ROUTER, dispatch, heartbeat, retry
│   ├── worker.py           # ZMQ DEALER, task execution, timeout
│   ├── adaptive.py         # Throughput-based parallelization tuning
│   ├── task.py             # Task loading and introspection via inspect
│   ├── models.py           # Pydantic models (TaskRecord, DimqConfig, etc.)
│   ├── config.py           # YAML config loading
│   ├── cli.py              # CLI entry points
│   └── tasks/
│       └── load.py         # Pydantic wrapper for dimq_load_task
├── dimq_load_task/         # Rust extension (pyo3 + maturin)
│   ├── Cargo.toml
│   ├── pyproject.toml
│   └── src/lib.rs
├── e2e/
│   └── config.yaml         # Worker config for Docker containers
└── tests/
    ├── test_orchestrator.py
    ├── test_worker.py
    ├── test_adaptive.py
    ├── test_integration.py
    ├── test_e2e_docker.py   # Docker-based e2e test
    ├── test_load_task.py
    ├── test_tasks_load.py   # LoadTask wrapper tests
    ├── test_models.py
    ├── test_config.py
    ├── test_task.py
    └── sample_tasks.py      # Test fixtures

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dimq_load_task-0.1.3.tar.gz (10.2 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

dimq_load_task-0.1.3-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (323.1 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (322.9 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.3-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (322.9 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.3-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (326.1 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.3-cp39-abi3-win_amd64.whl (172.2 kB view details)

Uploaded CPython 3.9+Windows x86-64

dimq_load_task-0.1.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (323.0 kB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.3-cp39-abi3-macosx_11_0_arm64.whl (281.4 kB view details)

Uploaded CPython 3.9+macOS 11.0+ ARM64

dimq_load_task-0.1.3-cp39-abi3-macosx_10_12_x86_64.whl (289.3 kB view details)

Uploaded CPython 3.9+macOS 10.12+ x86-64

dimq_load_task-0.1.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (332.8 kB view details)

Uploaded CPython 3.8manylinux: glibc 2.17+ x86-64

File details

Details for the file dimq_load_task-0.1.3.tar.gz.

File metadata

  • Download URL: dimq_load_task-0.1.3.tar.gz
  • Upload date:
  • Size: 10.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dimq_load_task-0.1.3.tar.gz
Algorithm Hash digest
SHA256 677e395fad8273b01123dc2ea5b56c237278e45745492a89b6f19f4ec7deed68
MD5 dd73068e0916c0e8e6dbeb585bc0dae6
BLAKE2b-256 87885aa65fe8451dc2159da2fe36f3c4c884420e07755b947311d9ef8df2d3b7

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3.tar.gz:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 bb552aa33c05902501e653cebe7bfdb1553027452dd07894b961a8ebf47f2b5a
MD5 ae78058be22455068dd577c30e511898
BLAKE2b-256 c3d6fc44a525ff8c909813dc867d36396de46fea6211a96b4b8558aca30877aa

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 9170fbe15269ddd20f8537e958fab24a4eecf8d2c3c904170196b26cb89fb703
MD5 6eb4972ec291bfa606851ce6ccead3fd
BLAKE2b-256 cf6eb7e6059e985e8c400e9e031081521e5c0c156c63a3f4ffee8ca48e82b609

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 b6f1cb5658ef92cbe1b07e41699f488adb5eeb5514d541ab90c794136176eac8
MD5 3a87fbfd7c942697979b0c305371288f
BLAKE2b-256 abe14b12d479b310019971b47fd6cede631b404435bc3b41938f2935dc4cbc4b

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 91bda7add18e67bf0e79c64a8e18c636310c9ec21377d88c86acb0d966c0b2ba
MD5 2c7c143348ec368dbb5fc6f1a7b1c370
BLAKE2b-256 3a31b341be3778bb4871b49a9bb88346964e6264798cb61a526bc8de0287a034

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-cp39-abi3-win_amd64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 db076d71482df4f3c9ae3e5526f14e551f3392180925b496fbfe46331265b2c4
MD5 94e0fc22752b886a35c6c304633af354
BLAKE2b-256 e4112c2bd676dce842a8d5c38cdcd44a680147ef242226dbf0bc1e7447710d52

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-cp39-abi3-win_amd64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 7bb773fef57a41918d5c21ac78b00923783872f23d7f37f74955ad8112f16d0f
MD5 71a3606d29f83013cac8e2e4828e4b4d
BLAKE2b-256 864ad85f254e01dd1a835c1e5605de41069b0ac872787e24735c8a2445d35b11

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-cp39-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-cp39-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 a1443db9391cb39398130b68d296928344b2bc25a8c87bf09e190ff1e8c8583a
MD5 483e42584d546d22c61c152c6385be20
BLAKE2b-256 f4c40a45578eed9dc973cd924162a8df3f69576c0883ac491b860da5715d8838

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-cp39-abi3-macosx_11_0_arm64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-cp39-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-cp39-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 1dbe46f8dfd8a1ec14c26cbbcdcece739462e1a301cb1d8097cd6e0c689c62be
MD5 a7ed85e5836a85f97ff052af8571a207
BLAKE2b-256 8c3c73aeab29a12b0f7480a8e93921e60b792910521ac8439d29c59928e151d5

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-cp39-abi3-macosx_10_12_x86_64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 e8642fa92eda1efd285507f403f738a54ad693fa98533decae346f613a1d998f
MD5 cc951152ce160c99b4d0aeca38c02a11
BLAKE2b-256 221cf50ab6b630cd7363ccc54d7d17c87e306ed71cdbd98a29554d1f50992418

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page