Skip to main content

This library helps you execute a set of functions in a Directed Acyclic Graph (DAG) dependency structure in parallel in a production environment. It aims at providing This in a production environment

Project description

tawazi

Python 3.7 Python 3.8 Python 3.9 Python 3.10 CodeFactor

Tawazi GIF

Introduction

The tawazi library enables parallel execution of functions in a DAG dependency structure. This library satisfies the following:

  • Stable, robust, well tested
  • lightweight
  • Thread Safety
  • Low to no dependencies
  • Legacy Python versions support (in the future)
  • pypy support (in the future)

In the context of tawazi, the computation sequence to be run in parallel is referred to as DAG and the functions that must run in parallel are called ExecNodes.

This library supports:

  • Limitation of the number of "Threads"
  • Priority Choice of each ExecNode
  • Per ExecNode choice of parallelization (i.e. An ExecNode is allowed to run in parallel with other ExecNodes or not)

Note: The library is still at an advanced state of development. Your contributions are highly welcomed.

Usage

# type: ignore
from time import sleep, time
from tawazi import op, to_dag

@op
def a():
    print("Function 'a' is running", flush=True)
    sleep(1)
    return "A"

@op
def b():
    print("Function 'b' is running", flush=True)
    sleep(1)
    return "B"

@op
def c(a, b):
    print("Function 'c' is running", flush=True)
    print(f"Function 'c' received {a} from 'a' & {b} from 'b'", flush=True)
    return f"{a} + {b} = C"

@to_dag(max_concurrency=2)
def deps_describer():
  result_a = a()
  result_b = b()
  result_c = c(result_a, result_b)

if __name__ == "__main__":

  t0 = time()
  # executing the dag takes a single line of code
  deps_describer().execute()
  execution_time = time() - t0
  assert execution_time < 1.5
  print(f"Graph execution took {execution_time:.2f} seconds")

Advanced Usage

# type: ignore
from time import sleep, time
from tawazi import op, to_dag

@op
def a():
    print("Function 'a' is running", flush=True)
    sleep(1)
    return "A"

# optionally configure each op using the decorator:
# is_sequential = True to prevent op from running in parallel with other ops
# priority to choose the op in the next execution phase
# argument_name to choose the name of the argument that will be used
@op(is_sequential=True, priority=10, argument_name="arg_b")
def b():
    print("Function 'b' is running", flush=True)
    sleep(1)
    return "B"

@op
def c(a, arg_b):
    print("Function 'c' is running", flush=True)
    print(f"Function 'c' received {a} from 'a' & {arg_b} from 'b'", flush=True)
    return f"{a} + {arg_b} = C"

# optionally customize the DAG
@to_dag(max_concurrency=2, behavior="strict")
def deps_describer():
  result_a = a()
  result_b = b()
  result_c = c(result_a, result_b)

if __name__ == "__main__":

  t0 = time()
  # the dag instance is reusable.
  # This is recommended if you want to do the same computation multiple times
  dag = deps_describer()
  results_1 = dag.execute()
  execution_time = time() - t0
  print(f"Graph execution took {execution_time:.2f} seconds")

  # debugging the code using `dag.safe_execute()` is easier
  # because the execution doesn't go through the Thread pool
  results_2 = dag.safe_execute()

  # you can look throught the results of each operation like this:
  for my_op in [a, b, c]:
    assert results_1[my_op.id].result == results_2[my_op.id].result

Name explanation

The libraries name is inspired from the arabic word تَوَازٍ which means parallel.

Building the doc

The general documentation has no dedicated space at the moment and is being hosted offline (on your machine). Expect future developments on this side as well. To build it, simply run mkdocs build and mkdocs serve at the root of the repository.

Future developments

*This library is still in development. Breaking changes are expected.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tawazi-0.2.0.tar.gz (17.9 kB view hashes)

Uploaded Source

Built Distribution

tawazi-0.2.0-py3-none-any.whl (17.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page