Skip to main content

Decorators for running functions in Thread/ThreadPool/IOLoop

Project description

threaded

https://travis-ci.com/python-useful-helpers/threaded.svg?branch=master https://github.com/python-useful-helpers/threaded/workflows/Python%20package/badge.svg https://coveralls.io/repos/github/python-useful-helpers/threaded/badge.svg?branch=master Documentation Status https://img.shields.io/pypi/v/threaded.svg https://img.shields.io/pypi/pyversions/threaded.svg https://img.shields.io/pypi/status/threaded.svg https://img.shields.io/github/license/python-useful-helpers/threaded.svg https://img.shields.io/badge/code%20style-black-000000.svg

threaded is a set of decorators, which wrap functions in:

  • concurrent.futures.ThreadPool
  • threading.Thread
  • asyncio.Task in Python 3.

Why? Because copy-paste of loop.create_task, threading.Thread and thread_pool.submit is boring, especially if target functions is used by this way only.

Pros:

Python 3.4
Python 3.5
Python 3.6
Python 3.7
PyPy3 3.5+

Note

For python 2.7/PyPy you can use versions 1.x.x

Decorators:

  • ThreadPooled - native concurrent.futures.ThreadPool.
  • threadpooled is alias for ThreadPooled.
  • Threaded - wrap in threading.Thread.
  • threaded is alias for Threaded.
  • AsyncIOTask - wrap in asyncio.Task. Uses the same API, as ThreadPooled.
  • asynciotask is alias for AsyncIOTask.

Usage

ThreadPooled

Mostly it is required decorator: submit function to ThreadPoolExecutor on call.

Note

API quite differs between Python 3 and Python 2.7. See API section below.

threaded.ThreadPooled.configure(max_workers=3)

Note

By default, if executor is not configured - it configures with default parameters: max_workers=CPU_COUNT * 5

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.5+ usage with asyncio:

Note

if loop_getter is not callable, loop_getter_need_context is ignored.

loop = asyncio.get_event_loop()
@threaded.ThreadPooled(loop_getter=loop, loop_getter_need_context=False)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Python 3.5+ usage with asyncio and loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.ThreadPooled(loop_getter=loop_getter, loop_getter_need_context=True)  # loop_getter_need_context is required
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

During application shutdown, pool can be stopped (while it will be recreated automatically, if some component will request).

threaded.ThreadPooled.shutdown()

Threaded

Classic threading.Thread. Useful for running until close and self-closing threads without return.

Usage example:

@threaded.Threaded
def func(*args, **kwargs):
    pass

thread = func()
thread.start()
thread.join()

Without arguments, thread name will use pattern: 'Threaded: ' + func.__name__

Note

If func.__name__ is not accessible, str(hash(func)) will be used instead.

Override name can be don via corresponding argument:

@threaded.Threaded(name='Function in thread')
def func(*args, **kwargs):
    pass

Thread can be daemonized automatically:

@threaded.Threaded(daemon=True)
def func(*args, **kwargs):
    pass

Also, if no any addition manipulations expected before thread start, it can be started automatically before return:

@threaded.Threaded(started=True)
def func(*args, **kwargs):
    pass

AsyncIOTask

Wrap in asyncio.Task.

usage with asyncio:

@threaded.AsyncIOTask
def func():
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(), timeout))

Provide event loop directly:

Note

if loop_getter is not callable, loop_getter_need_context is ignored.

loop = asyncio.get_event_loop()
@threaded.AsyncIOTask(loop_getter=loop)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Usage with loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.AsyncIOTask(loop_getter=loop_getter, loop_getter_need_context=True)
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

Testing

The main test mechanism for the package threaded is using tox. Available environments can be collected via tox -l

CI systems

For code checking several CI systems is used in parallel:

  1. Travis CI: is used for checking: PEP8, pylint, bandit, installation possibility and unit tests. Also it’s publishes coverage on coveralls.
  2. GitHub actions: is used for checking: PEP8, pylint, bandit, installation possibility and unit tests.
  3. coveralls: is used for coverage display.

CD system

Travis CI: is used for package delivery on PyPI.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded-4.1.0.tar.gz (28.1 kB view hashes)

Uploaded source

Built Distributions

threaded-4.1.0-py3-none-any.whl (17.9 kB view hashes)

Uploaded py3

threaded-4.1.0-cp39-cp39-win_amd64.whl (190.3 kB view hashes)

Uploaded cp39

threaded-4.1.0-cp39-cp39-win32.whl (167.2 kB view hashes)

Uploaded cp39

threaded-4.1.0-cp39-cp39-manylinux1_i686.whl (825.4 kB view hashes)

Uploaded cp39

threaded-4.1.0-cp38-cp38-win_amd64.whl (191.5 kB view hashes)

Uploaded cp38

threaded-4.1.0-cp38-cp38-win32.whl (168.9 kB view hashes)

Uploaded cp38

threaded-4.1.0-cp38-cp38-manylinux1_i686.whl (913.5 kB view hashes)

Uploaded cp38

threaded-4.1.0-cp37-cp37m-win_amd64.whl (186.7 kB view hashes)

Uploaded cp37

threaded-4.1.0-cp37-cp37m-win32.whl (163.1 kB view hashes)

Uploaded cp37

threaded-4.1.0-cp37-cp37m-manylinux1_i686.whl (783.9 kB view hashes)

Uploaded cp37

threaded-4.1.0-cp36-cp36m-manylinux1_i686.whl (771.4 kB view hashes)

Uploaded cp36

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page