Skip to main content

Rust-powered load generation task for DIMQ

Project description

DIMQ - Distributed In-Memory Queue

A general-purpose distributed task processing framework built on ZeroMQ. Workers connect to a central orchestrator, receive tasks, and return results. The orchestrator adaptively tunes per-worker parallelization to maximize throughput.

Architecture

                    +-------------------+
  Clients -------->| Orchestrator      |<-------- Workers
  (DEALER)         | (2x ROUTER)       |          (DEALER)
                   |                   |
  SUBMIT/STATUS/   | - FIFO task queue |  READY/HEARTBEAT/
  RESULT queries   | - Result storage  |  RESULT messages
                   | - Retry logic     |
                   | - Adaptive tuning |
                   +-------------------+

Orchestrator runs two ZMQ ROUTER sockets: one for workers (registration, heartbeats, task dispatch/results) and one for clients (task submission, status queries, result retrieval).

Workers connect via ZMQ DEALER sockets, register with their CPU count, and pull tasks. They run tasks concurrently (sync tasks in a thread pool, async tasks natively) and report results back. Workers handle timeout cancellation locally; the orchestrator owns all retry decisions.

Adaptive parallelization starts each worker's parallel task limit at its CPU count, then probes upward. If throughput plateaus or drops, it scales back by one and enters steady mode, re-probing periodically.

Tasks are plain Python functions with Pydantic-typed input and output. No base class or decorator needed -- the framework introspects types via inspect.

Prerequisites

  • Python 3.9+
  • uv
  • Rust toolchain (for the LoadTask extension only)

Setup

# Install Python dependencies
uv sync

# (Optional) Build the Rust LoadTask extension
cd dimq_load_task
uv tool run maturin develop --uv
cd ..

Running

Create a config file:

# config.yaml
endpoint: "tcp://0.0.0.0:5555"
client_endpoint: "tcp://0.0.0.0:5556"
heartbeat_interval_seconds: 5
heartbeat_timeout_missed: 3

tasks:
  - name: "my_app.tasks:process"
    max_retries: 3
    timeout_seconds: 30

Start the orchestrator and one or more workers:

# Terminal 1: start orchestrator
uv run dimq orchestrator --config config.yaml

# Terminal 2: start a worker (same machine)
uv run dimq worker --config config.yaml

# Terminal 3: start a worker on another machine (override endpoint)
uv run dimq worker --config config.yaml --endpoint tcp://orchestrator-host:5555

Submitting tasks programmatically

import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.connect("tcp://localhost:5556")

# Submit a task
sock.send_multipart([
    b"SUBMIT",
    b"my_app.tasks:process",    # task type
    b"task-001",                 # task ID
    b'{"input_field": "value"}', # JSON payload matching the Pydantic input model
])

# Receive ACK
ack = sock.recv_multipart()  # [b"ACK", b"task-001"]

# Later, query the result
sock.send_multipart([b"RESULT", b"task-001"])
reply = sock.recv_multipart()
# [b"RESULT_REPLY", b"task-001", b"COMPLETED", b'{"output_field": "result"}']

Testing

# Run all tests (excludes e2e by default if Docker unavailable)
uv run pytest -v

# Run specific test modules
uv run pytest tests/test_orchestrator.py -v
uv run pytest tests/test_integration.py -v

# Run LoadTask tests (requires Rust extension to be built)
uv run pytest tests/test_load_task.py -v

# Run end-to-end Docker test (requires Docker daemon)
# Spins up 3 worker containers, submits load tasks, verifies adaptive tuning
uv run pytest tests/test_e2e_docker.py -v -s

Writing Custom Tasks

A task is a plain function with Pydantic-typed input and output. It can be sync or async.

# my_app/tasks.py
from pydantic import BaseModel

class ImageInput(BaseModel):
    url: str
    width: int
    height: int

class ImageOutput(BaseModel):
    thumbnail_path: str
    original_size_bytes: int

def resize(input: ImageInput) -> ImageOutput:
    # Your logic here
    return ImageOutput(
        thumbnail_path=f"/tmp/{input.width}x{input.height}.jpg",
        original_size_bytes=1024,
    )

# Async tasks work the same way
async def fetch_and_resize(input: ImageInput) -> ImageOutput:
    ...

Register it in your config:

tasks:
  - name: "my_app.tasks:resize"
    max_retries: 2
    timeout_seconds: 60
  - name: "my_app.tasks:fetch_and_resize"
    max_retries: 3
    timeout_seconds: 120

The framework uses inspect to automatically extract:

  • Input type from the first parameter's annotation
  • Output type from the return annotation
  • Whether the function is sync or async

Sync tasks run in a thread pool. Async tasks run natively in the event loop. If a task exceeds timeout_seconds, the worker cancels it and reports a timeout to the orchestrator, which handles retries.

LoadTask (Rust Extension)

A built-in stress-testing task implemented in Rust (pyo3). It creates configurable CPU, I/O, and memory pressure while releasing the GIL so Python threading works efficiently.

import dimq_load_task

result = dimq_load_task.run(
    duration_seconds=5.0,
    concurrency=4,       # number of CPU threads
    cpu_load=0.7,        # fraction of duration for CPU work (SHA-256 hashing)
    io_load=0.2,         # fraction for I/O (temp file writes)
    memory_mb=100,       # memory to allocate and touch
)
# result = {
#     "phases": [
#         {"type": "memory", "start_seconds": 0.0, "duration_seconds": 0.001},
#         {"type": "cpu", "start_seconds": 0.001, "duration_seconds": 3.5},
#         {"type": "io", "start_seconds": 3.501, "duration_seconds": 1.0},
#     ],
#     "total_duration_seconds": 4.501,
#     "peak_memory_mb": 100,
# }

Project Structure

DIMQ/
├── pyproject.toml
├── Dockerfile              # Multi-stage build for worker containers
├── docker-compose.yml      # 3 worker services for e2e testing
├── src/dimq/
│   ├── orchestrator.py     # ZMQ ROUTER, dispatch, heartbeat, retry
│   ├── worker.py           # ZMQ DEALER, task execution, timeout
│   ├── adaptive.py         # Throughput-based parallelization tuning
│   ├── task.py             # Task loading and introspection via inspect
│   ├── models.py           # Pydantic models (TaskRecord, DimqConfig, etc.)
│   ├── config.py           # YAML config loading
│   ├── cli.py              # CLI entry points
│   └── tasks/
│       └── load.py         # Pydantic wrapper for dimq_load_task
├── dimq_load_task/         # Rust extension (pyo3 + maturin)
│   ├── Cargo.toml
│   ├── pyproject.toml
│   └── src/lib.rs
├── e2e/
│   └── config.yaml         # Worker config for Docker containers
└── tests/
    ├── test_orchestrator.py
    ├── test_worker.py
    ├── test_adaptive.py
    ├── test_integration.py
    ├── test_e2e_docker.py   # Docker-based e2e test
    ├── test_load_task.py
    ├── test_tasks_load.py   # LoadTask wrapper tests
    ├── test_models.py
    ├── test_config.py
    ├── test_task.py
    └── sample_tasks.py      # Test fixtures

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dimq_load_task-0.1.2.tar.gz (10.2 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

dimq_load_task-0.1.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (323.1 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (323.1 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.2-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (323.0 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.2-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (326.1 kB view details)

Uploaded PyPymanylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.2-cp39-abi3-win_amd64.whl (172.1 kB view details)

Uploaded CPython 3.9+Windows x86-64

dimq_load_task-0.1.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (322.9 kB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ ARM64

dimq_load_task-0.1.2-cp39-abi3-macosx_11_0_arm64.whl (281.3 kB view details)

Uploaded CPython 3.9+macOS 11.0+ ARM64

dimq_load_task-0.1.2-cp39-abi3-macosx_10_12_x86_64.whl (289.1 kB view details)

Uploaded CPython 3.9+macOS 10.12+ x86-64

dimq_load_task-0.1.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (332.8 kB view details)

Uploaded CPython 3.8manylinux: glibc 2.17+ x86-64

File details

Details for the file dimq_load_task-0.1.2.tar.gz.

File metadata

  • Download URL: dimq_load_task-0.1.2.tar.gz
  • Upload date:
  • Size: 10.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dimq_load_task-0.1.2.tar.gz
Algorithm Hash digest
SHA256 b8ac0f0965f63ac98737c54e84bdd5ae17227964e905101fb8f88a410a31e030
MD5 017ec58ad7b30e937b5b9fc84fd8c5c8
BLAKE2b-256 2b918a5c5a104c3385470c6ed12d27d30c185ef3adab5fd2e552ef46612833b4

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2.tar.gz:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 d588a45965202a4458f9ec68f8786b70fd7143acee8868b44281eb68e366bd95
MD5 2ea74d9996ffc98901355aabaa934740
BLAKE2b-256 3ffe81f45917be826d163d6a55eaee67c1cee9a2a56fec6d59d1cfee7d1ef19a

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 07cf032d95c6056d451df7aa2074acab0d4ea0cd2b48e68d59e98c766982d586
MD5 c0a2c733f4782e46a76332d3b3e03bfe
BLAKE2b-256 9ca124f31af0661eaf92a1678d2d40cf0ac088ccbf6fa4b7b39edb10671994cf

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 ada44c9b539f256dbc4ecdb7e9cb464893d8cd1ffc05dac8e5e11922012bdfa6
MD5 fb25886d2e63d9df3a35db3997356c55
BLAKE2b-256 e72571d86555b8d7c457a040099103d2b1f665dce0d42be261440172bca9b6cb

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 edc572d45a09906e0bb45cb1519309bf9157ad3af2009a06cc980b434572e344
MD5 cb49852b6bbc65ec4be2aa6a7aeee0c5
BLAKE2b-256 1f98337e0d7ad689e6a8ee962422644558c4ca2bf98a3b6b67a2c1641bf43442

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-cp39-abi3-win_amd64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 dd894e40cd8e3a0203fb30b932d33b81e20b69cab4cf19f5cab449580be4a387
MD5 68479111b22f9c96ee4a0e0da6b63f32
BLAKE2b-256 2225d22f0c405b4426e7caed2a0d41c2f20f846e027261daff5709ee00041e4f

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-cp39-abi3-win_amd64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 ce2fd6b37ecaf9ca5bbda629f9c3c80f40284640ddd3fcb6dd724671c2ebaf64
MD5 53b2a2bccdf0afb278d060bef2864662
BLAKE2b-256 754fc8c7021715ff1b2fb6cfb67e2b6b46daf324f1429853bca976f995fc059b

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-cp39-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-cp39-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 7eef58aa3b1e66542c215e480bedda7ef83879312796d3f4a69bf54599f820bd
MD5 9975af5740a7274e8219a05c0f03e39c
BLAKE2b-256 0d72fd6ea6965290e20cdab3c83866130b820a406a0e1b1e1b8a7764d8b51912

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-cp39-abi3-macosx_11_0_arm64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-cp39-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-cp39-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 29e66dd47b1e1bd44f57bf7941bbd7ff7bd6114e5ca3aa59296cdce038a06bf5
MD5 8a1d6e77aec8f4ce05c321db9ae14be2
BLAKE2b-256 f3b404b680ba72ac84981ddda0c402c99490ef7d0e8828946597a0403fae8dee

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-cp39-abi3-macosx_10_12_x86_64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq_load_task-0.1.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for dimq_load_task-0.1.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 93c05bc3c23fc5e7c60c10f604bcff6bd3a69c88ed327f5ddaf433fafb3dc025
MD5 43663def9b84025fec94aa03f1c97288
BLAKE2b-256 7b517304fbd4d75509716f59584fbb1d1a6433297a6e21ce167231d37afb05c6

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq_load_task-0.1.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page