Skip to main content

Lightweight Kahn-based ready queue for dependency-driven scheduling and workflows

Project description

kahn-queue (Python)

Getting started

  • Python: 3.10+ recommended (venv + requirements-dev.txt; no version pinned in-repo).
  • Tests: make test-python, or cd python, create .venv, install requirements-dev.txt, then pytest.
  • Imports: Examples assume src is on PYTHONPATH (the test suite does this via tests/conftest.py). From the repo: PYTHONPATH=src or pip install -e . once packaging is configured.

Pieces

Piece Role
Dag / Dag.builder() Immutable DAG: add, connect, build().
KahnScheduler run(), signal_complete, signal_failed; is_finished (property); get_result()DagResult.
DagResult completed, failed, pruned — each a frozenset of node ids.
KahnQueue / DefaultKahnQueue / ConcurrentKahnQueue DefaultKahnQueue when queue is omitted; pass ConcurrentKahnQueue(dag) for concurrent pop / prune.
IllegalGraphException Invalid graphs (self-loop, cycle at build(), etc.).
NodeProgressTracker Optional per-node progress in [0, 1] (tracker module); not required for scheduling.

Examples

Single threaded queue

from dag import Dag
from scheduler import KahnScheduler

b = Dag.builder()
root = b.add("lint")
mid = b.add("compile")
leaf = b.add("test")
b.connect(root, mid).connect(mid, leaf)
dag = b.build()


def execute_node(node_id: int, sched: KahnScheduler[str]) -> None:
    try:
        run_step(dag[node_id])  # your step
        sched.signal_complete(node_id)
    except Exception:
        sched.signal_failed(node_id)

sched = KahnScheduler(dag, execute_node)
sched.run()

Concurrent queue

from dag import Dag
from kahnQueue.concurrent_kahn_queue import ConcurrentKahnQueue
from scheduler import KahnScheduler

b = Dag.builder()
a = b.add("a")
c = b.add("c")
jn = b.add("join")
b.connect(a, jn).connect(c, jn)
dag = b.build()


def execute_node(node_id: int, sched: KahnScheduler[str]) -> None:
    ...

sched = KahnScheduler(dag, execute_node, queue=ConcurrentKahnQueue(dag))
sched.run()

Result

result = sched.get_result()
# result.completed, result.failed, result.pruned — frozensets of ids
done = sched.is_finished

Manual KahnQueue

from dag import Dag
from kahnQueue.default_kahn_queue import DefaultKahnQueue

b = Dag.builder()
lint = b.add("lint")
compile = b.add("compile")
test = b.add("test")
b.connect(lint, compile).connect(compile, test)
dag = b.build()

q = DefaultKahnQueue(dag)

ready = list(q.ready_ids())

while ready:
    id_ = ready.pop(0)

    # do work for `id_` (e.g. run_step(dag[id_]))

    promoted = q.pop(id_)
    ready.extend(promoted)

    # If a node fails, you can prune it (and its descendants):
    # q.prune(id_)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kahn_queue-1.0.1.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kahn_queue-1.0.1-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file kahn_queue-1.0.1.tar.gz.

File metadata

  • Download URL: kahn_queue-1.0.1.tar.gz
  • Upload date:
  • Size: 13.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for kahn_queue-1.0.1.tar.gz
Algorithm Hash digest
SHA256 fd4a46e2966727e9a112d87da9bb8a33cb4d3f150615e5354290de68c989cbf2
MD5 09bcaa3bee52af25de05a25a72aa442b
BLAKE2b-256 d8d31dc1a6dc5ed52f48ec21b51e73c0969b6faf89cd98759e2cf3ee5e1a58c7

See more details on using hashes here.

File details

Details for the file kahn_queue-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: kahn_queue-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 10.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for kahn_queue-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 83622f97e6c946a9984987ca8e951c683d647e135b0b7053e76f6757c2de5e14
MD5 bd17d70323e592096ab2eb46079dd01a
BLAKE2b-256 2ce0ff4cdd9b1aea377c575733f9a8cabef5b7bbfed599b4f3553175adbf23e2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page