Skip to main content

Distributed In-Memory Queue

Project description

DIMQ - Distributed In-Memory Queue

A general-purpose distributed task processing framework built on ZeroMQ. Workers connect to a central orchestrator, receive tasks, and return results. The orchestrator adaptively tunes per-worker parallelization to maximize throughput.

Architecture

                    +-------------------+
  Clients -------->| Orchestrator      |<-------- Workers
  (DEALER)         | (2x ROUTER)       |          (DEALER)
                   |                   |
  SUBMIT/STATUS/   | - FIFO task queue |  READY/HEARTBEAT/
  RESULT queries   | - Result storage  |  RESULT messages
                   | - Retry logic     |
                   | - Adaptive tuning |
                   +-------------------+

Orchestrator runs two ZMQ ROUTER sockets: one for workers (registration, heartbeats, task dispatch/results) and one for clients (task submission, status queries, result retrieval).

Workers connect via ZMQ DEALER sockets, register with their CPU count, and pull tasks. They run tasks concurrently (sync tasks in a thread pool, async tasks natively) and report results back. Workers handle timeout cancellation locally; the orchestrator owns all retry decisions.

Adaptive parallelization starts each worker's parallel task limit at its CPU count, then probes upward. If throughput plateaus or drops, it scales back by one and enters steady mode, re-probing periodically.

Tasks are plain Python functions with Pydantic-typed input and output. No base class or decorator needed -- the framework introspects types via inspect.

Prerequisites

  • Python 3.9+
  • uv
  • Rust toolchain (for the LoadTask extension only)

Setup

# Install Python dependencies
uv sync

# (Optional) Build the Rust LoadTask extension
cd dimq_load_task
uv tool run maturin develop --uv
cd ..

Running

Create a config file:

# config.yaml
endpoint: "tcp://0.0.0.0:5555"
client_endpoint: "tcp://0.0.0.0:5556"
heartbeat_interval_seconds: 5
heartbeat_timeout_missed: 3

tasks:
  - name: "my_app.tasks:process"
    max_retries: 3
    timeout_seconds: 30

Start the orchestrator and one or more workers:

# Terminal 1: start orchestrator
uv run dimq orchestrator --config config.yaml

# Terminal 2: start a worker (same machine)
uv run dimq worker --config config.yaml

# Terminal 3: start a worker on another machine (override endpoint)
uv run dimq worker --config config.yaml --endpoint tcp://orchestrator-host:5555

Submitting tasks programmatically

import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.connect("tcp://localhost:5556")

# Submit a task
sock.send_multipart([
    b"SUBMIT",
    b"my_app.tasks:process",    # task type
    b"task-001",                 # task ID
    b'{"input_field": "value"}', # JSON payload matching the Pydantic input model
])

# Receive ACK
ack = sock.recv_multipart()  # [b"ACK", b"task-001"]

# Later, query the result
sock.send_multipart([b"RESULT", b"task-001"])
reply = sock.recv_multipart()
# [b"RESULT_REPLY", b"task-001", b"COMPLETED", b'{"output_field": "result"}']

Testing

# Run all tests (excludes e2e by default if Docker unavailable)
uv run pytest -v

# Run specific test modules
uv run pytest tests/test_orchestrator.py -v
uv run pytest tests/test_integration.py -v

# Run LoadTask tests (requires Rust extension to be built)
uv run pytest tests/test_load_task.py -v

# Run end-to-end Docker test (requires Docker daemon)
# Spins up 3 worker containers, submits load tasks, verifies adaptive tuning
uv run pytest tests/test_e2e_docker.py -v -s

Writing Custom Tasks

A task is a plain function with Pydantic-typed input and output. It can be sync or async.

# my_app/tasks.py
from pydantic import BaseModel

class ImageInput(BaseModel):
    url: str
    width: int
    height: int

class ImageOutput(BaseModel):
    thumbnail_path: str
    original_size_bytes: int

def resize(input: ImageInput) -> ImageOutput:
    # Your logic here
    return ImageOutput(
        thumbnail_path=f"/tmp/{input.width}x{input.height}.jpg",
        original_size_bytes=1024,
    )

# Async tasks work the same way
async def fetch_and_resize(input: ImageInput) -> ImageOutput:
    ...

Register it in your config:

tasks:
  - name: "my_app.tasks:resize"
    max_retries: 2
    timeout_seconds: 60
  - name: "my_app.tasks:fetch_and_resize"
    max_retries: 3
    timeout_seconds: 120

The framework uses inspect to automatically extract:

  • Input type from the first parameter's annotation
  • Output type from the return annotation
  • Whether the function is sync or async

Sync tasks run in a thread pool. Async tasks run natively in the event loop. If a task exceeds timeout_seconds, the worker cancels it and reports a timeout to the orchestrator, which handles retries.

LoadTask (Rust Extension)

A built-in stress-testing task implemented in Rust (pyo3). It creates configurable CPU, I/O, and memory pressure while releasing the GIL so Python threading works efficiently.

import dimq_load_task

result = dimq_load_task.run(
    duration_seconds=5.0,
    concurrency=4,       # number of CPU threads
    cpu_load=0.7,        # fraction of duration for CPU work (SHA-256 hashing)
    io_load=0.2,         # fraction for I/O (temp file writes)
    memory_mb=100,       # memory to allocate and touch
)
# result = {
#     "phases": [
#         {"type": "memory", "start_seconds": 0.0, "duration_seconds": 0.001},
#         {"type": "cpu", "start_seconds": 0.001, "duration_seconds": 3.5},
#         {"type": "io", "start_seconds": 3.501, "duration_seconds": 1.0},
#     ],
#     "total_duration_seconds": 4.501,
#     "peak_memory_mb": 100,
# }

Project Structure

DIMQ/
├── pyproject.toml
├── Dockerfile              # Multi-stage build for worker containers
├── docker-compose.yml      # 3 worker services for e2e testing
├── src/dimq/
│   ├── orchestrator.py     # ZMQ ROUTER, dispatch, heartbeat, retry
│   ├── worker.py           # ZMQ DEALER, task execution, timeout
│   ├── adaptive.py         # Throughput-based parallelization tuning
│   ├── task.py             # Task loading and introspection via inspect
│   ├── models.py           # Pydantic models (TaskRecord, DimqConfig, etc.)
│   ├── config.py           # YAML config loading
│   ├── cli.py              # CLI entry points
│   └── tasks/
│       └── load.py         # Pydantic wrapper for dimq_load_task
├── dimq_load_task/         # Rust extension (pyo3 + maturin)
│   ├── Cargo.toml
│   ├── pyproject.toml
│   └── src/lib.rs
├── e2e/
│   └── config.yaml         # Worker config for Docker containers
└── tests/
    ├── test_orchestrator.py
    ├── test_worker.py
    ├── test_adaptive.py
    ├── test_integration.py
    ├── test_e2e_docker.py   # Docker-based e2e test
    ├── test_load_task.py
    ├── test_tasks_load.py   # LoadTask wrapper tests
    ├── test_models.py
    ├── test_config.py
    ├── test_task.py
    └── sample_tasks.py      # Test fixtures

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dimq-0.1.2.tar.gz (100.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dimq-0.1.2-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file dimq-0.1.2.tar.gz.

File metadata

  • Download URL: dimq-0.1.2.tar.gz
  • Upload date:
  • Size: 100.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dimq-0.1.2.tar.gz
Algorithm Hash digest
SHA256 73beb8f3e8c1b482cdb0284b99d018db6d1f120a36800f5adeaec79bd7794871
MD5 7f08e2e503c9b91fbd9d633403202e1d
BLAKE2b-256 01db9e130714dcfdf67cf62775d6fd884333b3b78185c30274c9490c105006a6

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq-0.1.2.tar.gz:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dimq-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: dimq-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dimq-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 41d3c2d278a6a8120bf979125884396cb15f3a27e78db79396319b479a6e5d7d
MD5 35e5d6093aae3e0b43174912d2188194
BLAKE2b-256 62e267ae225278def685b35801af828704083367db5bf9ec7c2fefb5b723b93d

See more details on using hashes here.

Provenance

The following attestation bundles were made for dimq-0.1.2-py3-none-any.whl:

Publisher: ci.yml on walnutgeek/dimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page