Skip to main content

Decorators for running functions in Thread/ThreadPool/IOLoop

Project description

threaded

https://travis-ci.org/python-useful-helpers/threaded.svg?branch=master https://coveralls.io/repos/github/python-useful-helpers/threaded/badge.svg?branch=master Documentation Status https://img.shields.io/pypi/v/threaded.svg https://img.shields.io/pypi/pyversions/threaded.svg https://img.shields.io/pypi/status/threaded.svg https://img.shields.io/github/license/python-useful-helpers/threaded.svg

threaded is a set of decorators, which wrap functions in:

  • concurrent.futures.ThreadPool

  • threading.Thread

  • asyncio.Task in Python 3.

  • gevent.threadpool.ThreadPool if gevent is installed.

Why? Because copy-paste of loop.create_task, threading.Thread and thread_pool.submit is boring, especially if target functions is used by this way only.

Pros:

Python 2.7
Python 3.4
Python 3.5
Python 3.6
Python 3.7
PyPy
PyPy3 3.5+
Jyton 2.7

Decorators:

  • ThreadPooled - native concurrent.futures.ThreadPool usage on Python 3 and it’s backport on Python 2.7.

  • threadpooled is alias for ThreadPooled.

  • Threaded - wrap in threading.Thread.

  • threaded is alias for Threaded.

  • AsyncIOTask - wrap in asyncio.Task. Uses the same API, as Python 3 ThreadPooled.

  • asynciotask is alias for AsyncIOTask.

  • GThreadPooled - wrap function in gevent.threadpool.ThreadPool.

  • gthreadpooled is alias for GThreadPooled.

Usage

ThreadPooled

Mostly it is required decorator: submit function to ThreadPoolExecutor on call.

threaded.ThreadPooled.configure(max_workers=3)

Python 2.7 usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage with asyncio:

loop = asyncio.get_event_loop()
@threaded.ThreadPooled(loop_getter=loop, loop_getter_need_context=False)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Python 3.3+ usage with asyncio and loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.ThreadPooled(loop_getter=loop_getter, loop_getter_need_context=True)  # loop_getter_need_context is required
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

During application shutdown, pool can be stopped (while it will be recreated automatically, if some component will request).

threaded.ThreadPooled.shutdown()

Threaded

Classic threading.Thread. Useful for running until close and self-closing threads without return.

Usage example:

@threaded.Threaded
def func(*args, **kwargs):
    pass

thread = func()
thread.start()
thread.join()

Without arguments, thread name will use pattern: 'Threaded: ' + func.__name__

Override name can be don via corresponding argument:

@threaded.Threaded(name='Function in thread')
def func(*args, **kwargs):
    pass

Thread can be daemonized automatically:

@threaded.Threaded(daemon=True)
def func(*args, **kwargs):
    pass

Also, if no any addition manipulations expected before thread start, it can be started automatically before return:

@threaded.Threaded(started=True)
def func(*args, **kwargs):
    pass

AsyncIOTask

Wrap in asyncio.Task.

usage with asyncio:

@threaded.AsyncIOTask
def func():
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(), timeout))

Provide event loop directly:

loop = asyncio.get_event_loop()
@threaded.AsyncIOTask(loop_getter=loop)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Usage with loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.AsyncIOTask(loop_getter=loop_getter, loop_getter_need_context=True)
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

GThreadPooled

Post function to gevent.threadpool.ThreadPool.

threaded.GThreadPooled.configure(max_workers=3)

Basic usage example:

@threaded.GThreadPooled
def func():
    pass

func().wait()

Testing

The main test mechanism for the package threaded is using tox. Available environments can be collected via tox -l

CI systems

For code checking several CI systems is used in parallel:

  1. Travis CI: is used for checking: PEP8, pylint, bandit, installation possibility and unit tests. Also it’s publishes coverage on coveralls.

  2. coveralls: is used for coverage display.

CD system

Travis CI: is used for package delivery on PyPI.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded-1.0.5.tar.gz (18.9 kB view hashes)

Uploaded Source

Built Distributions

threaded-1.0.5-py2-none-any.whl (20.9 kB view hashes)

Uploaded Python 2

threaded-1.0.5-cp37-cp37m-manylinux1_x86_64.whl (737.9 kB view hashes)

Uploaded CPython 3.7m

threaded-1.0.5-cp37-cp37m-manylinux1_i686.whl (701.4 kB view hashes)

Uploaded CPython 3.7m

threaded-1.0.5-cp36-cp36m-manylinux1_x86_64.whl (737.3 kB view hashes)

Uploaded CPython 3.6m

threaded-1.0.5-cp36-cp36m-manylinux1_i686.whl (700.1 kB view hashes)

Uploaded CPython 3.6m

threaded-1.0.5-cp35-cp35m-manylinux1_x86_64.whl (719.0 kB view hashes)

Uploaded CPython 3.5m

threaded-1.0.5-cp35-cp35m-manylinux1_i686.whl (683.4 kB view hashes)

Uploaded CPython 3.5m

threaded-1.0.5-cp34-cp34m-manylinux1_x86_64.whl (734.6 kB view hashes)

Uploaded CPython 3.4m

threaded-1.0.5-cp34-cp34m-manylinux1_i686.whl (696.8 kB view hashes)

Uploaded CPython 3.4m

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page