Skip to main content

A lightweight framework for running functions concurrently across multiple threads while maintaining a defined execution order.

Project description

ci PyPI version

threaded-order

A lightweight Python framework for running functions concurrently across multiple threads while maintaining defined execution order. It lets you declare dependencies between tasks—so some run only after others complete—without complex orchestration code.

Ideal for dependency-aware test execution, build pipelines, and automation workflows that benefit from controlled concurrency.

Key Features

  • Concurrent task execution using Python threads
  • Dependency graph automatically determines order
  • Simple registration and decorator API
  • Shared thread-safe state: tasks can opt in to receive a shared state dict and read/write values across dependent tasks
  • Automatic result capture: each task’s return value is stored under state['results'][task_name]
  • Thread-safe logging, callbacks, and run summary
  • Graceful shutdown on interrupt

Installation

pip install threaded-order

Simple Example

graph

from threaded_order import Scheduler, ThreadProxyLogger
import time
import random

s = Scheduler(workers=3, setup_logging=True)
logger = ThreadProxyLogger()

def run(name):
    time.sleep(random.uniform(.5, 3.5))
    logger.info(f'{name} completed')

@s.dregister()
def a(): run('a')

@s.dregister(after=['a'])
def b(): run('b')

@s.dregister(after=['a'])
def c(): run('c')

@s.dregister(after=['c'])
def d(): run('d')

@s.dregister(after=['c'])
def e(): run('e')

@s.dregister(after=['b', 'd'])
def f(): run('f')

if __name__ == '__main__':
    s.on_scheduler_done(lambda s: print(f"Passed:{len(s['passed'])} Failed:{len(s['failed'])}"))
    s.start()

example4

Shared State Example

import json
from time import sleep
from threaded_order import Scheduler

s = Scheduler(workers=3, state={})

@s.dregister(with_state=True)
def load(state):
    state["x"] = 10; return "loaded"

@s.dregister(with_state=True)
def behave(state):
    sleep(3); return "behaved"

@s.dregister(after=["load"], with_state=True)
def compute(state):
    state["x"] += 5; return state["x"]

s.start()
print(json.dumps(s.state, indent=2))

Output:

{
  "results": {
    "load": "loaded",
    "compute": 15,
    "behave": "behaved"
  },
  "x": 15
}

ProgressBar Integration Example

Can be done by using the on_task_done callback. See example5

example5

See examples in examples folder. To run examples, follow instructions below to build and run the Docker container then execute:

API Overview

class Scheduler(workers=None, setup_logging=False, add_stream_handler=True)

Runs registered callables across multiple threads while respecting declared dependencies.

Core Methods

Method Description
register(obj, name, after=None) Register a callable for execution. after defines dependencies by name.
dregister(after=None) Decorator variant of register() for inline task definitions.
start() Start execution, respecting dependencies. Returns a summary dictionary.

Callbacks

All are optional and run on the scheduler thread (never worker threads).

Callback When Fired Signature
on_task_start(fn) Before a task starts (name)
on_task_run(fn) When tasks starts running on a thread (name, thread)
on_task_done(fn) After a task finishes (name, ok)
on_scheduler_start(fn) Before scheduler starts running tasks (meta)
on_scheduler_done(fn) After all tasks complete (summary)

Interrupt Handling

Press Ctrl-C during execution to gracefully cancel outstanding work:

  • Running tasks finish naturally or are marked as cancelled
  • Remaining queued tasks are discarded
  • Final summary reflects all results

Development

Clone the repository and ensure the latest version of Docker is installed on your development server.

Build the Docker image:

docker image build \
-t threaded-order:latest .

Run the Docker container:

docker container run \
--rm \
-it \
-v $PWD:/code \
threaded-order:latest \
bash

Execute the dev pipeline:

make dev

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded_order-1.3.4.tar.gz (15.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

threaded_order-1.3.4-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file threaded_order-1.3.4.tar.gz.

File metadata

  • Download URL: threaded_order-1.3.4.tar.gz
  • Upload date:
  • Size: 15.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for threaded_order-1.3.4.tar.gz
Algorithm Hash digest
SHA256 06af6663a8fedc6ef5fa58f0b94bb0b76adb16034fdfc72cfbbb39fc9c7c9110
MD5 5d6d780d90a6b7bcca25400b6936126a
BLAKE2b-256 a59bde002dbbf8f847bbc5d4a7270991364e4211834740490a9e646416ccf38e

See more details on using hashes here.

File details

Details for the file threaded_order-1.3.4-py3-none-any.whl.

File metadata

  • Download URL: threaded_order-1.3.4-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for threaded_order-1.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 ecdc5112e2e4d1afd8fa7aa96e0af1456bc840586fed756f633ff8df3d258fb1
MD5 fb2c2f4a03098ae00704a0e80f1296f5
BLAKE2b-256 830b972feb61d96f56de2b53cbdcaa97b25c9d6afc82fce0cdd3609ae5bf8172

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page