Skip to main content

A lightweight framework for running functions concurrently across multiple threads while maintaining a defined execution order.

Project description

ci PyPI version

threaded-order

A lightweight Python framework for running functions concurrently across multiple threads while maintaining defined execution order. It lets you declare dependencies between tasks—so some run only after others complete—without complex orchestration code.

Ideal for dependency-aware test execution, build pipelines, and automation workflows that benefit from controlled concurrency.

Key Features

  • Concurrent task execution using Python threads
  • Dependency graph automatically determines order
  • Simple registration and decorator API
  • Shared thread-safe state: tasks can opt in to receive a shared state dict and read/write values across dependent tasks
  • Automatic result capture: each task’s return value is stored under state['results'][task_name]
  • Thread-safe logging, callbacks, and run summary
  • Graceful shutdown on interrupt

Installation

pip install threaded-order

Simple Example

graph

from threaded_order import Scheduler, ThreadProxyLogger
import time
import random

s = Scheduler(workers=3, setup_logging=True)
logger = ThreadProxyLogger()

def run(name):
    time.sleep(random.uniform(.5, 3.5))
    logger.info(f'{name} completed')

@s.dregister()
def a(): run('a')

@s.dregister(after=['a'])
def b(): run('b')

@s.dregister(after=['a'])
def c(): run('c')

@s.dregister(after=['c'])
def d(): run('d')

@s.dregister(after=['c'])
def e(): run('e')

@s.dregister(after=['b', 'd'])
def f(): run('f')

if __name__ == '__main__':
    s.on_scheduler_done(lambda s: print(f"Passed:{len(s['passed'])} Failed:{len(s['failed'])}"))
    s.start()

example4

Shared State Example

import json
from time import sleep
from threaded_order import Scheduler

s = Scheduler(workers=3, state={})

@s.dregister(with_state=True)
def load(state):
    state["x"] = 10; return "loaded"

@s.dregister(with_state=True)
def behave(state):
    sleep(3); return "behaved"

@s.dregister(after=["load"], with_state=True)
def compute(state):
    state["x"] += 5; return state["x"]

s.start()
print(json.dumps(s.state, indent=2))

Output:

{
  "results": {
    "load": "loaded",
    "compute": 15,
    "behave": "behaved"
  },
  "x": 15
}

ProgressBar Integration Example

Can be done by using the on_task_done callback. See example5

example5

See examples in examples folder. To run examples, follow instructions below to build and run the Docker container then execute:

API Overview

class Scheduler(workers=None, setup_logging=False, add_stream_handler=True)

Runs registered callables across multiple threads while respecting declared dependencies.

Core Methods

Method Description
register(obj, name, after=None) Register a callable for execution. after defines dependencies by name.
dregister(after=None) Decorator variant of register() for inline task definitions.
start() Start execution, respecting dependencies. Returns a summary dictionary.

Callbacks

All are optional and run on the scheduler thread (never worker threads).

Callback When Fired Signature
on_task_start(fn) Before a task starts (name)
on_task_run(fn) When tasks starts running on a thread (name, thread)
on_task_done(fn) After a task finishes (name, ok)
on_scheduler_start(fn) Before scheduler starts running tasks (meta)
on_scheduler_done(fn) After all tasks complete (summary)

Interrupt Handling

Press Ctrl-C during execution to gracefully cancel outstanding work:

  • Running tasks finish naturally or are marked as cancelled
  • Remaining queued tasks are discarded
  • Final summary reflects all results

Development

Clone the repository and ensure the latest version of Docker is installed on your development server.

Build the Docker image:

docker image build \
-t threaded-order:latest .

Run the Docker container:

docker container run \
--rm \
-it \
-v $PWD:/code \
threaded-order:latest \
bash

Execute the dev pipeline:

make dev

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded_order-1.3.3.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threaded_order-1.3.3-py3-none-any.whl (14.4 kB view details)

Uploaded Python 3

File details

Details for the file threaded_order-1.3.3.tar.gz.

File metadata

  • Download URL: threaded_order-1.3.3.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for threaded_order-1.3.3.tar.gz
Algorithm Hash digest
SHA256 5542400c7262e9a951b638eb9adf73da97d781358cd6108c0488b6ba41e6d773
MD5 0c585ede7599a8f8f24bc229f2a3f0a3
BLAKE2b-256 43922a0f956744c9487c0b4e69d71e9244f3b08faf3d4e43c02187c77eaeda7f

See more details on using hashes here.

File details

Details for the file threaded_order-1.3.3-py3-none-any.whl.

File metadata

  • Download URL: threaded_order-1.3.3-py3-none-any.whl
  • Upload date:
  • Size: 14.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for threaded_order-1.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 5f892ff246f4b2895a24893ea06daba96865026345249f693462d7df6f8a8e6f
MD5 9ed93ff873f93c328cf28bef256ff939
BLAKE2b-256 7f10c822b57975cb7e0cceed15f68c5fd6b74e7a7a39581b30957bcb51b489d5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page