OmniQ v1 - Redis+Lua queue Python SDK
Project description
OmniQ (Python)
OmniQ is a Redis + Lua, language-agnostic job queue. This package is the Python client for OmniQ v1.
Key ideas:
- Hybrid lanes: ungrouped jobs by default, optional grouped jobs (FIFO per group + per-group concurrency).
- Lease-based execution: workers reserve a job with a time-limited lease.
- Token-gated ACK/heartbeat:
reserve()returns alease_tokenthat must be used byheartbeat()andack_*(). - Pause / resume (flag-only): pausing a queue prevents new reserves; it does not move jobs or stop running jobs.
- Admin-safe operations: strict
remove,remove_batch,retry, andretry_batchoperations. - Handler-driven completion primitive:
check_completionfor parent/child workflows.
Core project / docs: https://github.com/not-empty/omniq
Install
pip install omniq
Quick start
Publish
from omniq.client import OmniqClient
uq = OmniqClient(
host="omniq-redis",
port=6379,
)
job_id = uq.publish(
queue="demo",
payload={"hello": "world"},
timeout_ms=30_000,
)
print("OK", job_id)
Consume
import time
from omniq.client import OmniqClient
def handler(ctx):
print("Waiting 2 seconds")
time.sleep(2)
print("Done")
uq = OmniqClient(
host="omniq-redis",
port=6379,
)
uq.consume(
queue="demo",
handler=handler,
verbose=True,
drain=False,
)
Client initialization
from omniq.client import OmniqClient
# Option A: host/port
uq = OmniqClient(host="localhost", port=6379, db=0)
# Option B: Redis URL
uq = OmniqClient(redis_url="redis://:password@localhost:6379/0")
Administrative Operations
These operations are strict and atomic (Lua-backed).
retry_failed()
Retry a single failed job (resets attempt=0 and moves back to
waiting).
uq.retry_failed(queue="demo", job_id="01ABC...")
- Only works if job
state == "failed". - Safe under grouping rules.
retry_failed_batch()
Retry up to 100 failed jobs atomically.
results = uq.retry_failed_batch(
queue="demo",
job_ids=["01A...", "01B...", "01C..."]
)
for job_id, status, reason in results:
print(job_id, status, reason)
- Max 100 jobs per call.
- Returns per-job result.
- Jobs not in
failedstate returnERR NOT_FAILED.
remove_job()
Remove a single non-active job from a specific lane.
uq.remove_job(
queue="demo",
job_id="01ABC...",
lane="failed", # wait | delayed | failed | completed | gwait
)
Rules:
- Cannot remove
activejobs. - Lane must match job state.
- Group safety is preserved.
remove_jobs_batch()
Remove up to 100 jobs from a specific lane.
results = uq.remove_jobs_batch(
queue="demo",
lane="failed",
job_ids=["01A...", "01B...", "01C..."]
)
for job_id, status, reason in results:
print(job_id, status, reason)
- Strict lane validation.
- Atomic per batch.
- Safe for grouped jobs.
Handler Context
Inside handler(ctx) you receive:
queuejob_idpayload_rawpayloadattemptlock_until_mslease_tokengidcheck_completion
check_completion (Parent / Child workflows)
A handler-driven primitive for parallel fan-out workflows.
No TTL. Cleanup occurs only when the counter reaches zero.
Parent Example
def parent_handler(ctx):
document_id = ctx.payload["document_id"]
pages = ctx.payload["pages"]
key = f"document:{document_id}"
ctx.check_completion.InitJobCounter(key, pages)
for p in range(1, pages + 1):
uq.publish(
queue="pages",
payload={
"document_id": document_id,
"page": p,
"completion_key": key,
},
)
Child Example
def page_handler(ctx):
key = ctx.payload["completion_key"]
# do work...
remaining = ctx.check_completion.JobDecrement(key)
if remaining == 0:
print("Last page finished.")
Properties:
- Idempotent decrement (safe under retries).
- No accidental completion.
- Cross-queue safe.
- Fully user-controlled business logic.
Grouped jobs (FIFO + concurrency)
uq.publish(queue="demo", payload={"i": 1}, gid="company:acme", group_limit=2)
uq.publish(queue="demo", payload={"i": 2}, gid="company:acme")
- FIFO inside group
- Groups run in parallel
- Concurrency limited per group
License
See the repository license.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file omniq-1.4.0.tar.gz.
File metadata
- Download URL: omniq-1.4.0.tar.gz
- Upload date:
- Size: 31.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e398af0cdeeef473a28384e486ccff33d9a9c681721c65f338322464d4b513e2
|
|
| MD5 |
7858e5490d50ea8d93d05c7359653a30
|
|
| BLAKE2b-256 |
8850cf82ba48d03a7e497fa4650d33d346e2a31b0052362e6c3f158a5c8b3f9e
|
File details
Details for the file omniq-1.4.0-py3-none-any.whl.
File metadata
- Download URL: omniq-1.4.0-py3-none-any.whl
- Upload date:
- Size: 41.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ea4f82e0b1af5aa234454baf2ee1a49467104424cbfdd3fcf1a541cf45f54066
|
|
| MD5 |
0084e9e6dbadf84963f00f998f6a333c
|
|
| BLAKE2b-256 |
08f87d8f282c1d2d1f20037d680886007a7f5c7027f6eab2ac13b82dcca28c6c
|