Skip to main content

A package that sets up thread-groups to run methods concurrently that can run concurrently in a relatively simple and intuitive way.

Project description

threadgroup-lib

This package is used to define thread groups and register functions to run concurrently in that group

Usage

Using it is fairly simply and intuitive:

import threadgroup

first = threadgroup.ThreadGroup()
second = threadgroup.ThreadGroup()

@first.register()
def f1():
    pass

@first.register()
def f2():
    pass

@second.register()
def s1():
    pass

@second.register()
def s2():
    pass

first.execute()
assert first.executed
assert second.executed is False

second.execute()
assert second.executed

What we do here is to define two separate groups, first and second, then register which functions we want to run in each group

Using with varvault

Varvault can be leveraged to create some very interesting code that runs concurrently while also being very flexible and easy to maintain:

import time
import threadgroup
import varvault

class Keyring(varvault.Keyring):
    k1 = varvault.Key("k1", valid_type=str)
    k2 = varvault.Key("k2", valid_type=int)
    k3 = varvault.Key("k3", valid_type=float)


vault = varvault.create_vault(Keyring, "vault")
sleep = 0.5
init = threadgroup.ThreadGroup()
use = threadgroup.ThreadGroup()

@init.register()
@vault.vaulter(return_keys=Keyring.k1)
def set_k1():
    time.sleep(sleep)
    return "valid"

@init.register()
@vault.vaulter(return_keys=Keyring.k2)
def set_k2():
    time.sleep(sleep)
    return 1

@init.register()
@vault.vaulter(return_keys=Keyring.k3)
def set_k3():
    time.sleep(sleep)
    return 3.14

@use.register()
@vault.vaulter(input_keys=Keyring.k1)
def use_k1(k1=None):
    time.sleep(sleep)
    assert k1 == "valid"
    return k1

@use.register()
@vault.vaulter(input_keys=Keyring.k2)
def use_k2(k2=None):
    time.sleep(sleep)
    assert k2 == 1
    return k2

@use.register()
@vault.vaulter(input_keys=Keyring.k3)
def use_k3(k3=None):
    time.sleep(sleep)
    assert k3 == 3.14
    return k3

start = time.time()
init.execute()
assert sleep - 0.2 < time.time() - start < sleep + 0.2, "Took too long to run. Concurrency seems broken"
assert Keyring.k1 in vault and vault.get(Keyring.k1) == "valid"
assert Keyring.k2 in vault and vault.get(Keyring.k2) == 1
assert Keyring.k3 in vault and vault.get(Keyring.k3) == 3.14
assert init.executed
assert use.executed is False

start = time.time()
use.execute()
assert sleep - 0.2 < time.time() - start < sleep + 0.2, "Took too long to run. Concurrency seems broken"
results = use.get_results()

We define a varvault object called vault which we set the variables to. The functions set_k1, set_k2, set_k3 set each of the keys to the vault and they all run in the thread group init. We then use those variables in use_k1, use_k2, use_k3.

The benefit of using varvault together with thread-groups is that varvault already abstracts away input variables and return variables from functions, allowing for a decoupled flow that works extremely well to run in thread-groups that are executed in this fashion. Since functions are registered and then executed when execute() is called, supplying input variables to the functions and handling return variables from the functions is a bit of a hassle. Varvault handles this entirely, allowing the user to focus on other things and running things concurrently (as much as Python will allow).

Running without thread groups

It's entirely possible to use the library to just run functions concurrently. This is how you may use it:

import threadgroup
from typing import List

def f1(a1):
    assert a1 == 1
    return a1

def f2(a2):
    assert a2 == 2
    return a2

def f3(a1, kw1=None):
    assert a1 == 1
    assert kw1 is True
    return a1, kw1

functions: List[threadgroup.Function] = [threadgroup.create_function(f1, 1), threadgroup.create_function(f2, 2), threadgroup.create_function(f3, 1, kw1=True)]
result: threadgroup.ResultList[threadgroup.ResultStruct] = threadgroup.threaded_execution(functions)
assert f"{result}" == "[(function=f1; result=1), (function=f2; result=2), (function=f3; result=(1, True))]"

What we do here is to define three different functions that each take different arguments. We then build a list of functions with their arguments. We then run those functions concurrently by calling threadgroup.threaded_execution.

It's also possible to run the same function multiple times with different input parameters. This is how you'd do it:

import threadgroup
from typing import List

arg_values = [1, 2, 3, 4, 5]

def f1(a1):
    assert a1 in arg_values, f"{a1} not in {arg_values}"
    return a1

functions: List[threadgroup.Function] = [threadgroup.create_function(f1, arg_value) for arg_value in arg_values]
result: threadgroup.ResultList[threadgroup.ResultStruct] = threadgroup.threaded_execution(functions)
assert f"{result}" == "[(function=f1; result=1), (function=f1; result=2), (function=f1; result=3), (function=f1; result=4), (function=f1; result=5)]"

In this example we also create a list of functions, but this time we do it based on the input parameters in arg_values, allowing it to scale easily based on an iterable. We then do the same as previously and run the functions concurrently through threadgroup.threaded_execution.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threadgroup-lib-1.2.1.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

threadgroup_lib-1.2.1-py3-none-any.whl (9.8 kB view details)

Uploaded Python 3

File details

Details for the file threadgroup-lib-1.2.1.tar.gz.

File metadata

  • Download URL: threadgroup-lib-1.2.1.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.13

File hashes

Hashes for threadgroup-lib-1.2.1.tar.gz
Algorithm Hash digest
SHA256 a90d46e8ce2cbaf0e497f7805cda66114d51f76deb4b2b3a8aebdfaad15e5732
MD5 32c02e75f399ec07a0cb5d04278f09f6
BLAKE2b-256 75abd2c272527eec1809c5b9bdb9419153e835c26518ccb529cf126f7c51d8b8

See more details on using hashes here.

File details

Details for the file threadgroup_lib-1.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for threadgroup_lib-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6f9024d6a169b6c90bc44895557d1a07ef451349981402695092ab8995bff607
MD5 403a404aa431a4bbaa56148b1718f656
BLAKE2b-256 161296039f295b6e433501cf29a9fd0197d2b5d876687678abda586d529db7bd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page