Skip to main content

Parallel Stream Task for Python

Project description

Streamtask

Streamtask is a lightweight python parallel framework for parallelizing the computationally intensive pipelines. It is similar to Map/Reduce, while it is more lightweight. It parallelizes each module in the pipeline with a given processing number to make it possible to leverage the different speeds in different modules. It improves the performance especially there are some heavy I/O operations in the pipeline.

Example

Suppose we want to process the data in a pipline with 3 blocks, f1, f2 and f3. We can use the following code to parallelize the processing. We can also directly add a data list or give a file name by using add_data.

def f1(total):
    import time
    for i in range(total):
        time.sleep(0.002)
        yield i * 2

def f2(n, add, third = 0.01):
    time.sleep(0.02)
    return n + add + third

def f3_the_final(n):
    time.sleep(0.03)
    return n + 1

if __name__ == "__main__":
    total = 100000
    stk = StreamTask(total = total)
    stk.add_data(data = range(total), batch_size=10) # Also support directly stream reading file
    #stk.add_module(f1, 1, total = total)
    stk.add_module(f2, 2, args = [0.5], third = 0.02)
    stk.add_module(f3_the_final, 2)
    stk.run(parallel = True)
    stk.join()
    res = stk.get_results()
    print(stk.get_results())
stream_reader (1/1):  79%|██████████████████████████████████████████████▋            | 7923/10000 [00:05<00:01, 1567.68it/s]
f2 (2/2):  29%|████████████████████▉                                                   | 1927/6635 [00:04<00:09, 476.72it/s]
f3_the_final (2/2): 100%|█████████████████████████████████████████████████████████████▉| 1927/1928 [00:04<00:00, 476.55it/s]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamtask-0.0.9.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

streamtask-0.0.9-py3-none-any.whl (5.2 kB view details)

Uploaded Python 3

File details

Details for the file streamtask-0.0.9.tar.gz.

File metadata

  • Download URL: streamtask-0.0.9.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.13

File hashes

Hashes for streamtask-0.0.9.tar.gz
Algorithm Hash digest
SHA256 490c1226e76f6c088dc4a2f07b88c9bf3838ea6f358cd3a44593b40b3fbd4cfd
MD5 20a7c63e523c83365eb4232285e9b990
BLAKE2b-256 f5022d7baa4bbbd6bceab38e3fd00e552a8df2e332470ad25e11b11e9fc4e79b

See more details on using hashes here.

File details

Details for the file streamtask-0.0.9-py3-none-any.whl.

File metadata

  • Download URL: streamtask-0.0.9-py3-none-any.whl
  • Upload date:
  • Size: 5.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.13

File hashes

Hashes for streamtask-0.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 1eec316ecf80f7be4498fa2f28c5f49d53c73277adea12c7b23276d593e5849c
MD5 9b91393598e96a5bb984b3dd04f6f9d5
BLAKE2b-256 150ef07689023dd51ef1bd0f8f16dd55137f8ced7a1bd76ef43e47d9cbbfce38

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page