Distributed In-Memory Queue
Project description
DIMQ - Distributed In-Memory Queue
A general-purpose distributed task processing framework built on ZeroMQ. Workers connect to a central orchestrator, receive tasks, and return results. The orchestrator adaptively tunes per-worker parallelization to maximize throughput.
Architecture
+-------------------+
Clients -------->| Orchestrator |<-------- Workers
(DEALER) | (2x ROUTER) | (DEALER)
| |
SUBMIT/STATUS/ | - FIFO task queue | READY/HEARTBEAT/
RESULT queries | - Result storage | RESULT messages
| - Retry logic |
| - Adaptive tuning |
+-------------------+
Orchestrator runs two ZMQ ROUTER sockets: one for workers (registration, heartbeats, task dispatch/results) and one for clients (task submission, status queries, result retrieval).
Workers connect via ZMQ DEALER sockets, register with their CPU count, and pull tasks. They run tasks concurrently (sync tasks in a thread pool, async tasks natively) and report results back. Workers handle timeout cancellation locally; the orchestrator owns all retry decisions.
Adaptive parallelization starts each worker's parallel task limit at its CPU count, then probes upward. If throughput plateaus or drops, it scales back by one and enters steady mode, re-probing periodically.
Tasks are plain Python functions with Pydantic-typed input and output. No base class or decorator needed -- the framework introspects types via inspect.
Prerequisites
- Python 3.9+
- uv
- Rust toolchain (for the LoadTask extension only)
Setup
# Install Python dependencies
uv sync
# (Optional) Build the Rust LoadTask extension
cd dimq_load_task
uv tool run maturin develop --uv
cd ..
Running
Create a config file:
# config.yaml
endpoint: "tcp://0.0.0.0:5555"
client_endpoint: "tcp://0.0.0.0:5556"
heartbeat_interval_seconds: 5
heartbeat_timeout_missed: 3
tasks:
- name: "my_app.tasks:process"
max_retries: 3
timeout_seconds: 30
Start the orchestrator and one or more workers:
# Terminal 1: start orchestrator
uv run dimq orchestrator --config config.yaml
# Terminal 2: start a worker (same machine)
uv run dimq worker --config config.yaml
# Terminal 3: start a worker on another machine (override endpoint)
uv run dimq worker --config config.yaml --endpoint tcp://orchestrator-host:5555
Submitting tasks programmatically
import zmq
ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.connect("tcp://localhost:5556")
# Submit a task
sock.send_multipart([
b"SUBMIT",
b"my_app.tasks:process", # task type
b"task-001", # task ID
b'{"input_field": "value"}', # JSON payload matching the Pydantic input model
])
# Receive ACK
ack = sock.recv_multipart() # [b"ACK", b"task-001"]
# Later, query the result
sock.send_multipart([b"RESULT", b"task-001"])
reply = sock.recv_multipart()
# [b"RESULT_REPLY", b"task-001", b"COMPLETED", b'{"output_field": "result"}']
Testing
# Run all tests (excludes e2e by default if Docker unavailable)
uv run pytest -v
# Run specific test modules
uv run pytest tests/test_orchestrator.py -v
uv run pytest tests/test_integration.py -v
# Run LoadTask tests (requires Rust extension to be built)
uv run pytest tests/test_load_task.py -v
# Run end-to-end Docker test (requires Docker daemon)
# Spins up 3 worker containers, submits load tasks, verifies adaptive tuning
uv run pytest tests/test_e2e_docker.py -v -s
Writing Custom Tasks
A task is a plain function with Pydantic-typed input and output. It can be sync or async.
# my_app/tasks.py
from pydantic import BaseModel
class ImageInput(BaseModel):
url: str
width: int
height: int
class ImageOutput(BaseModel):
thumbnail_path: str
original_size_bytes: int
def resize(input: ImageInput) -> ImageOutput:
# Your logic here
return ImageOutput(
thumbnail_path=f"/tmp/{input.width}x{input.height}.jpg",
original_size_bytes=1024,
)
# Async tasks work the same way
async def fetch_and_resize(input: ImageInput) -> ImageOutput:
...
Register it in your config:
tasks:
- name: "my_app.tasks:resize"
max_retries: 2
timeout_seconds: 60
- name: "my_app.tasks:fetch_and_resize"
max_retries: 3
timeout_seconds: 120
The framework uses inspect to automatically extract:
- Input type from the first parameter's annotation
- Output type from the return annotation
- Whether the function is sync or async
Sync tasks run in a thread pool. Async tasks run natively in the event loop. If a task exceeds timeout_seconds, the worker cancels it and reports a timeout to the orchestrator, which handles retries.
LoadTask (Rust Extension)
A built-in stress-testing task implemented in Rust (pyo3). It creates configurable CPU, I/O, and memory pressure while releasing the GIL so Python threading works efficiently.
import dimq_load_task
result = dimq_load_task.run(
duration_seconds=5.0,
concurrency=4, # number of CPU threads
cpu_load=0.7, # fraction of duration for CPU work (SHA-256 hashing)
io_load=0.2, # fraction for I/O (temp file writes)
memory_mb=100, # memory to allocate and touch
)
# result = {
# "phases": [
# {"type": "memory", "start_seconds": 0.0, "duration_seconds": 0.001},
# {"type": "cpu", "start_seconds": 0.001, "duration_seconds": 3.5},
# {"type": "io", "start_seconds": 3.501, "duration_seconds": 1.0},
# ],
# "total_duration_seconds": 4.501,
# "peak_memory_mb": 100,
# }
Project Structure
DIMQ/
├── pyproject.toml
├── Dockerfile # Multi-stage build for worker containers
├── docker-compose.yml # 3 worker services for e2e testing
├── src/dimq/
│ ├── orchestrator.py # ZMQ ROUTER, dispatch, heartbeat, retry
│ ├── worker.py # ZMQ DEALER, task execution, timeout
│ ├── adaptive.py # Throughput-based parallelization tuning
│ ├── task.py # Task loading and introspection via inspect
│ ├── models.py # Pydantic models (TaskRecord, DimqConfig, etc.)
│ ├── config.py # YAML config loading
│ ├── cli.py # CLI entry points
│ └── tasks/
│ └── load.py # Pydantic wrapper for dimq_load_task
├── dimq_load_task/ # Rust extension (pyo3 + maturin)
│ ├── Cargo.toml
│ ├── pyproject.toml
│ └── src/lib.rs
├── e2e/
│ └── config.yaml # Worker config for Docker containers
└── tests/
├── test_orchestrator.py
├── test_worker.py
├── test_adaptive.py
├── test_integration.py
├── test_e2e_docker.py # Docker-based e2e test
├── test_load_task.py
├── test_tasks_load.py # LoadTask wrapper tests
├── test_models.py
├── test_config.py
├── test_task.py
└── sample_tasks.py # Test fixtures
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dimq-0.1.3.tar.gz.
File metadata
- Download URL: dimq-0.1.3.tar.gz
- Upload date:
- Size: 102.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7e1a065746fc9423afeaf985b9b01835c5c7ece4017143997def2f635e23a245
|
|
| MD5 |
64912a709d31d92602f42dbfb955cee6
|
|
| BLAKE2b-256 |
cadcb07f451a83d1a6d47e547900b6cdc4218d3ccc6170a8873398673c44e32e
|
Provenance
The following attestation bundles were made for dimq-0.1.3.tar.gz:
Publisher:
ci.yml on walnutgeek/dimq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dimq-0.1.3.tar.gz -
Subject digest:
7e1a065746fc9423afeaf985b9b01835c5c7ece4017143997def2f635e23a245 - Sigstore transparency entry: 1098912139
- Sigstore integration time:
-
Permalink:
walnutgeek/dimq@0426a66a6b2c5836b47fa5d7a4199a58b2d6aea2 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/walnutgeek
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@0426a66a6b2c5836b47fa5d7a4199a58b2d6aea2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file dimq-0.1.3-py3-none-any.whl.
File metadata
- Download URL: dimq-0.1.3-py3-none-any.whl
- Upload date:
- Size: 14.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
607531f9fb5db949b5ec5b4879df85c605dab4bc5417ff83644df7da2c92f859
|
|
| MD5 |
df26350ccc954409c486158368289d17
|
|
| BLAKE2b-256 |
875f597194c882f875904a834f2b86a73b6db8a0199b64603b16c97fcb37a40a
|
Provenance
The following attestation bundles were made for dimq-0.1.3-py3-none-any.whl:
Publisher:
ci.yml on walnutgeek/dimq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dimq-0.1.3-py3-none-any.whl -
Subject digest:
607531f9fb5db949b5ec5b4879df85c605dab4bc5417ff83644df7da2c92f859 - Sigstore transparency entry: 1098912191
- Sigstore integration time:
-
Permalink:
walnutgeek/dimq@0426a66a6b2c5836b47fa5d7a4199a58b2d6aea2 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/walnutgeek
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@0426a66a6b2c5836b47fa5d7a4199a58b2d6aea2 -
Trigger Event:
push
-
Statement type: