Skip to main content

Distributed In-Memory Queue

Project description

DIMQ - Distributed In-Memory Queue

A general-purpose distributed task processing framework built on ZeroMQ. Workers connect to a central orchestrator, receive tasks, and return results. The orchestrator adaptively tunes per-worker parallelization to maximize throughput.

Architecture

                    +-------------------+
  Clients -------->| Orchestrator      |<-------- Workers
  (DEALER)         | (2x ROUTER)       |          (DEALER)
                   |                   |
  SUBMIT/STATUS/   | - FIFO task queue |  READY/HEARTBEAT/
  RESULT queries   | - Result storage  |  RESULT messages
                   | - Retry logic     |
                   | - Adaptive tuning |
                   +-------------------+

Orchestrator runs two ZMQ ROUTER sockets: one for workers (registration, heartbeats, task dispatch/results) and one for clients (task submission, status queries, result retrieval).

Workers connect via ZMQ DEALER sockets, register with their CPU count, and pull tasks. They run tasks concurrently (sync tasks in a thread pool, async tasks natively) and report results back. Workers handle timeout cancellation locally; the orchestrator owns all retry decisions.

Adaptive parallelization starts each worker's parallel task limit at its CPU count, then probes upward. If throughput plateaus or drops, it scales back by one and enters steady mode, re-probing periodically.

Tasks are plain Python functions with Pydantic-typed input and output. No base class or decorator needed -- the framework introspects types via inspect.

Prerequisites

  • Python 3.9+
  • uv
  • Rust toolchain (for the LoadTask extension only)

Setup

# Install Python dependencies
uv sync

# (Optional) Build the Rust LoadTask extension
cd dimq_load_task
uv tool run maturin develop --uv
cd ..

Running

Create a config file:

# config.yaml
endpoint: "tcp://0.0.0.0:5555"
client_endpoint: "tcp://0.0.0.0:5556"
heartbeat_interval_seconds: 5
heartbeat_timeout_missed: 3

tasks:
  - name: "my_app.tasks:process"
    max_retries: 3
    timeout_seconds: 30

Start the orchestrator and one or more workers:

# Terminal 1: start orchestrator
uv run dimq orchestrator --config config.yaml

# Terminal 2: start a worker (same machine)
uv run dimq worker --config config.yaml

# Terminal 3: start a worker on another machine (override endpoint)
uv run dimq worker --config config.yaml --endpoint tcp://orchestrator-host:5555

Submitting tasks programmatically

import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.connect("tcp://localhost:5556")

# Submit a task
sock.send_multipart([
    b"SUBMIT",
    b"my_app.tasks:process",    # task type
    b"task-001",                 # task ID
    b'{"input_field": "value"}', # JSON payload matching the Pydantic input model
])

# Receive ACK
ack = sock.recv_multipart()  # [b"ACK", b"task-001"]

# Later, query the result
sock.send_multipart([b"RESULT", b"task-001"])
reply = sock.recv_multipart()
# [b"RESULT_REPLY", b"task-001", b"COMPLETED", b'{"output_field": "result"}']

Testing

# Run all tests (excludes e2e by default if Docker unavailable)
uv run pytest -v

# Run specific test modules
uv run pytest tests/test_orchestrator.py -v
uv run pytest tests/test_integration.py -v

# Run LoadTask tests (requires Rust extension to be built)
uv run pytest tests/test_load_task.py -v

# Run end-to-end Docker test (requires Docker daemon)
# Spins up 3 worker containers, submits load tasks, verifies adaptive tuning
uv run pytest tests/test_e2e_docker.py -v -s

Writing Custom Tasks

A task is a plain function with Pydantic-typed input and output. It can be sync or async.

# my_app/tasks.py
from pydantic import BaseModel

class ImageInput(BaseModel):
    url: str
    width: int
    height: int

class ImageOutput(BaseModel):
    thumbnail_path: str
    original_size_bytes: int

def resize(input: ImageInput) -> ImageOutput:
    # Your logic here
    return ImageOutput(
        thumbnail_path=f"/tmp/{input.width}x{input.height}.jpg",
        original_size_bytes=1024,
    )

# Async tasks work the same way
async def fetch_and_resize(input: ImageInput) -> ImageOutput:
    ...

Register it in your config:

tasks:
  - name: "my_app.tasks:resize"
    max_retries: 2
    timeout_seconds: 60
  - name: "my_app.tasks:fetch_and_resize"
    max_retries: 3
    timeout_seconds: 120

The framework uses inspect to automatically extract:

  • Input type from the first parameter's annotation
  • Output type from the return annotation
  • Whether the function is sync or async

Sync tasks run in a thread pool. Async tasks run natively in the event loop. If a task exceeds timeout_seconds, the worker cancels it and reports a timeout to the orchestrator, which handles retries.

LoadTask (Rust Extension)

A built-in stress-testing task implemented in Rust (pyo3). It creates configurable CPU, I/O, and memory pressure while releasing the GIL so Python threading works efficiently.

import dimq_load_task

result = dimq_load_task.run(
    duration_seconds=5.0,
    concurrency=4,       # number of CPU threads
    cpu_load=0.7,        # fraction of duration for CPU work (SHA-256 hashing)
    io_load=0.2,         # fraction for I/O (temp file writes)
    memory_mb=100,       # memory to allocate and touch
)
# result = {
#     "phases": [
#         {"type": "memory", "start_seconds": 0.0, "duration_seconds": 0.001},
#         {"type": "cpu", "start_seconds": 0.001, "duration_seconds": 3.5},
#         {"type": "io", "start_seconds": 3.501, "duration_seconds": 1.0},
#     ],
#     "total_duration_seconds": 4.501,
#     "peak_memory_mb": 100,
# }

Project Structure

DIMQ/
├── pyproject.toml
├── Dockerfile              # Multi-stage build for worker containers
├── docker-compose.yml      # 3 worker services for e2e testing
├── src/dimq/
│   ├── orchestrator.py     # ZMQ ROUTER, dispatch, heartbeat, retry
│   ├── worker.py           # ZMQ DEALER, task execution, timeout
│   ├── adaptive.py         # Throughput-based parallelization tuning
│   ├── task.py             # Task loading and introspection via inspect
│   ├── models.py           # Pydantic models (TaskRecord, DimqConfig, etc.)
│   ├── config.py           # YAML config loading
│   ├── cli.py              # CLI entry points
│   └── tasks/
│       └── load.py         # Pydantic wrapper for dimq_load_task
├── dimq_load_task/         # Rust extension (pyo3 + maturin)
│   ├── Cargo.toml
│   ├── pyproject.toml
│   └── src/lib.rs
├── e2e/
│   └── config.yaml         # Worker config for Docker containers
└── tests/
    ├── test_orchestrator.py
    ├── test_worker.py
    ├── test_adaptive.py
    ├── test_integration.py
    ├── test_e2e_docker.py   # Docker-based e2e test
    ├── test_load_task.py
    ├── test_tasks_load.py   # LoadTask wrapper tests
    ├── test_models.py
    ├── test_config.py
    ├── test_task.py
    └── sample_tasks.py      # Test fixtures

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dimq-0.1.3.tar.gz (102.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dimq-0.1.3-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file dimq-0.1.3.tar.gz.

File metadata

  • Download URL: dimq-0.1.3.tar.gz
  • Upload date:
  • Size: 102.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dimq-0.1.3.tar.gz
Algorithm Hash digest
SHA256 7e1a065746fc9423afeaf985b9b01835c5c7ece4017143997def2f635e23a245
MD5 64912a709d31d92602f42dbfb955cee6
BLAKE2b-256 cadcb07f451a83d1a6d47e547900b6cdc4218d3ccc6170a8873398673c44e32e

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq-0.1.3.tar.gz:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: dimq-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dimq-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 607531f9fb5db949b5ec5b4879df85c605dab4bc5417ff83644df7da2c92f859
MD5 df26350ccc954409c486158368289d17
BLAKE2b-256 875f597194c882f875904a834f2b86a73b6db8a0199b64603b16c97fcb37a40a

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq-0.1.3-py3-none-any.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page