Asynchronous task queue
Project description
- Badges:
- Master CI:
- Downloads:
- Source:
- Mailing list:
- Design by:
- Platforms:
Linux, OSX, Windows. Python 3.5 and above
- Keywords:
server, asynchronous, concurrency, actor, process, queue, tasks, redis
Asynchronous server for consuming asynchronous IO tasks, green IO tasks, blocking IO tasks and long running CPU bound tasks.
Fully configurable
Consumers poll tasks from distributed message brokers (redis broker implemented)
Publish/subscribe for real-time event and logging (redis pub/sub backend)
Can schedule tasks when run as a scheduler (--schedule-periodic flag)
Four steps tutorial
1 - Create a script
A simple python file which runs your application:
vim manage.py
from pq.api import TaskApp
task_paths = ['sampletasks.*', 'pq.jobs']
def app():
return TaskApp(config=__file__)
if __name__ == '__main__':
app().start()
2 - Implement Jobs
Create the modules where Jobs are implemented. It can be a directory containing several submodules.
mkdir sampletasks
cd sampletasks
vim mytasks.py
import asyncio
import time
from pq import api
@api.job()
def addition(self, a=0, b=0):
return a + b
@api.job(concurrency=api.ASYNC_IO)
async def asynchronous(self, lag=1):
start = time.time()
await asyncio.sleep(lag)
return time.time() - start
3 - Run the server
Run the server with two task consumers (pulsar actors).
NOTE: Make sure you have Redis server up and running before you start the queue.
python manage.py -w 2
4 - Queue tasks
Launch a python shell and play with the api
>>> from manage import app
>>> api = app().backend
>>> task = api.queue_task('addition', a=4, b=6)
>>> task
<TaskFuture pending ID=i26ad5c14c5bb422e87b0f7ccbce5ba06>
>>> task = task.wait()
task.addition<i24ab99ddf2744902a375e039790dcbc4><SUCCESS>
>>> task.result
10
>>> task.status_string
'SUCCESS'
API
Tasks backend
The tasks backend is obtained from the Task application backend attribute:
from pq.api import TaskApp
tasks = TaskApp(...).backend
tasks. queue_task (jobname, *args, **kwargs)
Queue a task and return a TaskFuture which is resolved once the task has finished. It is possible to obtain a task future resolved when the task has been queued, rather than finished, by passing the callback=False parameter:
task = await tasks.queue_task(..., callback=False) task.status_string # QUEUED
tasks. queue_task_local (jobname, *args, **kwargs)
Queue a job in the local task queue. The local task queue is processed by the same server instance. It is equivalent to execute:
task = await tasks.queue_task(..., queue=tasks.node_name) task.queue # tasks.node_name
tasks. execute_task (jobname, *args, **kwargs)
Execute a task immediately, it does not put the task in the task queue. This method is useful for debugging and testing. It is equivalent to execute:
task = await tasks.queue_task(..., queue=False) task.queue # None task.status_string # SUCCESS
tasks. queues ()
Return the list of queue names the backend is subscribed. This list is not empty when the backend is a task consumer.
tasks. job_list (jobnames = None)
Returns a list of job_name, job_description tuples. The job_name is a string which must be used as the jobname parameter when executing or queing tasks. The job_description is a dictionary containing metadata and documentation for the job. Example:
jobs = dict(tasks.job_lits()) jobs['execute.python'] # { # 'type': 'regular', # 'concurrency': 'asyncio', # 'doc_syntax': 'markdown', # 'doc': 'Execute arbitrary python code on a subprocess ... ' # }
The Job class
The Job class is how task factories are implemented and added to the tasks backend registry. When writing a new Job one can either subclass:
import asyncio
class AsyncSleep(api.Job):
concurrency api.ASYNC_IO
async def __call__(self, lag=1):
await asyncio.sleep(lag)
or use the less verbose job decorator:
@api.job(concurrency=api.ASYNC_IO
async def asyncsleep(self, lag=1):
await asyncio.sleep(lag)
In either cases the self parameter is an instance of the Job class.
job. backend
The tasks backend that is processing this Job run
job. task
The Task instance associated with this job run
job. http
Best possible HTTP session handler for the job concurrency mode.
job. queue_task (jobname, *args, **kwargs)
Queue a new job. It is equivalent to:
meta_params = {'from_task': self.task.id} self.backend.queue_task(..., meta_params=meta_params)
The Task
A task contains the metadata information of a job run and it is exchanged between task producers and task consumers via a distributed task queue.
Task States
A Task can have one of the following task.status:
QUEUED = 6 a task queued but not yet executed.
STARTED = 5 a task where execution has started.
RETRY = 4 a task is retrying calculation.
REVOKED = 3 the task execution has been revoked (or timed-out).
FAILURE = 2 task execution has finished with failure.
SUCCESS = 1 task execution has finished with success.
FULL_RUN_STATES
The set of states for which a Task has run: FAILURE and SUCCESS
READY_STATES
The set of states for which a Task has finished: REVOKED, FAILURE and SUCCESS
Configuration
There are several parameters you can use to twick the way the task queue works.
concurrent_tasks (--concurrent-tasks 5)
The maximum number of concurrent tasks for a given worker in task consumer server.
schedule_periodic (--schedule-periodic)
When True, the task application can schedule periodic Jobs. Usually, only one running server is responsible for scheduling tasks.
Tasks Concurrency
A task can run in one of four concurrency modes. If not specified by the Job, the concurrency mode is given by the default_task_concurrency parameter whch can be specified in the config file or in the command line.
ASYNC_IO
The asynchronous IO mode is associated with tasks which return an asyncio Future or a coroutine. These tasks run concurrently in the worker event loop. An example can be a Job to scrape web pages and create new tasks to process the html
@api.job(concurrency=api.ASYNC_IO)
async def scrape(self, url=None):
assert url, "url is required"
request = await self.http.get(url)
html = request.text()
task = self.queue_task('process.html', html=html, callback=False)
return task.id
GREEN_IO
The green IO mode is associated with tasks that runs on a child greenlet. This can be useful when using applications which use the greenlet library for implicit asynchronous behaviour.
THREAD_IO
It assumes the task performs blocking IO operations which make it suitable to be run in the event loop executor. You can use this model for most blocking operation unless
Long running CPU bound
The operation does not release the GIL
CPUBOUND
It assumes the task performs blocking CPU bound operations. These tasks are run on sub-processes.
Configure
It is possible to enhance the task queue by passing an application callable during initialisation (usually a class or an instance factory). This callable must be picklable and should return an object which can implement one or more methods which override the beckend implementation.
For example:
class Application:
def __init__(self, backend):
self.backend = backend
async def store_task(self, task):
"""Store task into a backend database"""
...
tq = TaskApp(Application, ...)
The application callable is invoked when the backend handler is initialised (on each consumer and in the scheduler).
Changelog
License
This software is licensed under the BSD 3-clause License. See the LICENSE file in the top distribution directory for the full license text. Logo designed by Ralf Holzemer, creative common license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pulsar_queue-0.3.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 448ab1cf05016c3f29ac5b4927c962d35f732b33f543502be397f7b445f3c72b |
|
MD5 | cd2d698fc2d8f3af53b451d4a6d4c7d8 |
|
BLAKE2b-256 | f77d35e421633ffc5e89ec6043084bc19f838ee1f744008d6f9d54f556894105 |