SQS Task Master for unwieldy python data processing by Upserve
Project description
SQS Task Master is one of many packages for distributed asynchronous task execution in python. This package is different in that it is designed to work with uncooperative tasks. It is designed for CPU intensive tasks which will not work in a python cooperative threading model. It allows for tasks that may take upto 12 hours (the limit for SQS). It also provides for applying a hard timeout for a task, less than 12 hours. The implementation relies on using os Singals to achieve these behaviors. Retries and failures are handled using SQS infrastructure. It is designed to work with standard SQS queues which guarantee at least once delivery.
Upserve developed this library for use with long running data science tools like xgBoost and OrTools. While python signal will attempt to interrupt the process, you should verify the desired behavior. This behavior and shortcomings with c libraries is documented in TestMessageHandler.test_cbusy_timeout.
- Other choices:
Installation
The sqstaskmaster project is public and can be installed from pypi with pip for python3 on mac and linux.
pip install sqstaskmaster
Usage
Publish Tasks
tm = TaskManager(sqs_url) tm.submit('MyTask', some='kwarg', someother='kwarg') tm.submit('MyOtherTask', **kwargs)
Create a Handler
class MyHandler(MessageHandler): def __init__(self, message, sqs_timeout, alarm_timeout, hard_timeout=4 * 60 * 60, **kwargs): super().__init__(message, sqs_timeout, alarm_timeout, hard_timeout) self.kwargs = kwargs # prefer parsing arguments (for instance dates) in the consume task and passing explicit arguments def running(self): # If you can inspect the task to see if it is hung, do so here return True def run(self): # Do some work here! def notify(self, exception, context=None): # Implement only for error notification service. Errors are already logged. pass
Consume Tasks
for task, kwargs, message in TaskManager(sqs_url).task_generator(sqs_timeout=30): if task == 'MyTask': # prefer parsing kwargs here and handle KeyError and ValueError appropriately with MyHandler(message, sqs_timeout=30, alarm_timeout=25, hard_timeout=300, **kwargs) as handler: handler.run() elif task == 'MyOtherTask': with MyOtherHandler(message, sqs_timeout=30, alarm_timeout=25, hard_timeout=300, **kwargs) as handler: handler.run() else: # Do something about unexpected task request
Local Integration Testing
Testing your production system should include a combination of: local stubbing using Mock; tools like localstack or a staging environment to examine behavior in a distributed system; and purely local validation of consumer / producer api. LocalQueue is a pure in memory solution that implements a minimal subset of the SQS API for this last purpose only. The intent here is not to test or validate behavior of consumers and producers interacting with SQS, but only the message content. LocalQueue could be extended to implement threading and behave more like sqs within a single process, but that is better done with another tool like local stack.
... tm = TaskManager(url, queue_constructor=LocalQueue) ...
By passing the queue_constructor argument to the TaskManager, you can bypass AWS SQS and use a local, in memory queue object to send and receive messages allowing local synchronous integration testing. This creates one global queue for each url. It is not for testing system behavior and does not implement retries, message timeouts or other SQS features. It is only for integration testing of the interface between your producer and consumer methods with the TaskManager serialization and MessageHandler execution.
Development
Git clone the respository:
git clone git@github.com:upserve/sqstaskmaster.git
Pip install the development dependencies in a virtual environment:
pip install -e .[dev]
Run unit tests:
python -m unittest -v
Run flake8:
flake8 .
Run black:
black .
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file sqstaskmaster-0.0.11.tar.gz
.
File metadata
- Download URL: sqstaskmaster-0.0.11.tar.gz
- Upload date:
- Size: 17.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.2 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.7.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 666376e7584c5ba9b952322f85af1272c11f1e7bc40c500b2e403dc6eb85a7ea |
|
MD5 | b8231349c5c9fb9e46055a81787e0170 |
|
BLAKE2b-256 | 441718d02a02e734b4b775fe45e35ecf5335556bf80a410f29f9659631581e37 |