Skip to main content

Lightweight Kahn-based ready queue for dependency-driven scheduling and workflows

Project description

kahn-queue (Python)

PyPI

Getting started

  • Python: 3.10+ recommended (venv + requirements-dev.txt; no version pinned in-repo).
  • Tests: make test-python, or cd python, create .venv, install requirements-dev.txt, then pytest.
  • Imports: Examples assume src is on PYTHONPATH (the test suite does this via tests/conftest.py). From the repo: PYTHONPATH=src or pip install -e . once packaging is configured.

Pieces

Piece Role
Dag / Dag.builder() Immutable DAG: add, connect, build().
KahnScheduler run(), signal_complete, signal_failed; is_finished (property); get_result()DagResult.
DagResult completed, failed, pruned — each a frozenset of node ids.
KahnQueue / DefaultKahnQueue / ConcurrentKahnQueue DefaultKahnQueue when queue is omitted; pass ConcurrentKahnQueue(dag) for concurrent pop / prune.
IllegalGraphException Invalid graphs (self-loop, cycle at build(), etc.).
NodeProgressTracker Optional per-node progress in [0, 1] (tracker module); not required for scheduling.

Examples

Single threaded queue

from dag import Dag
from scheduler import KahnScheduler

b = Dag.builder()
root = b.add("lint")
mid = b.add("compile")
leaf = b.add("test")
b.connect(root, mid).connect(mid, leaf)
dag = b.build()


def execute_node(node_id: int, sched: KahnScheduler[str]) -> None:
    try:
        run_step(dag[node_id])  # your step
        sched.signal_complete(node_id)
    except Exception:
        sched.signal_failed(node_id)

sched = KahnScheduler(dag, execute_node)
sched.run()

Concurrent queue

from dag import Dag
from kahnQueue.concurrent_kahn_queue import ConcurrentKahnQueue
from scheduler import KahnScheduler

b = Dag.builder()
a = b.add("a")
c = b.add("c")
jn = b.add("join")
b.connect(a, jn).connect(c, jn)
dag = b.build()


def execute_node(node_id: int, sched: KahnScheduler[str]) -> None:
    ...

sched = KahnScheduler(dag, execute_node, queue=ConcurrentKahnQueue(dag))
sched.run()

Result

result = sched.get_result()
# result.completed, result.failed, result.pruned — frozensets of ids
done = sched.is_finished

Manual KahnQueue

from dag import Dag
from kahnQueue.default_kahn_queue import DefaultKahnQueue

b = Dag.builder()
lint = b.add("lint")
compile = b.add("compile")
test = b.add("test")
b.connect(lint, compile).connect(compile, test)
dag = b.build()

q = DefaultKahnQueue(dag)

ready = list(q.peek())

while ready:
    id_ = ready.pop(0)

    # do work for `id_` (e.g. run_step(dag[id_]))

    promoted = q.pop(id_)
    ready.extend(promoted)

    # If a node fails, you can prune it (and its descendants):
    # q.prune(id_)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kahn_queue-1.0.2.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kahn_queue-1.0.2-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file kahn_queue-1.0.2.tar.gz.

File metadata

  • Download URL: kahn_queue-1.0.2.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for kahn_queue-1.0.2.tar.gz
Algorithm Hash digest
SHA256 f7a252739e594fb515fb05046d4e4084431e537ccf753cef60f8f7077593887f
MD5 1cca0d1735c9e613b8257b0eedb90765
BLAKE2b-256 73483ef3299c523eff9d2932ee4abda1692364bb45881f2ec25abd41e83a8419

See more details on using hashes here.

File details

Details for the file kahn_queue-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: kahn_queue-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 10.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for kahn_queue-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 41aac2b280b0e392b3711b20c9cd0a227a8cd70b96950040262a5bf888f63d12
MD5 b3084bda7f4d5de6e2fa1944bc14807b
BLAKE2b-256 db92133554a029d5ab6cb993ffb8bb3df3f4d0aa76a4b7c4f60cfd3db02be047

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page