A simple task-queue for SQS.
Project description
WARNING: This library is still in beta. It has a stable API and has been deployed in production, but we have not received feedback from a large number of use cases, and it is possible there are unknown bugs.
PyQS is a simple task manager for SQS. It’s goal is to provide a simple and reliable celery-compatible interface to working with SQS. It uses boto3 under the hood to authenticate and talk to SQS.
Installation
PyQS is available from PyPI and can be installed in all the usual ways. To install via CLI:
$ pip install pyqs
Or just add it to your requirements.txt.
Usage
PyQS uses some very simple semantics to create and read tasks. Most of this comes from SQS having a very simple API.
Creating Tasks
Adding a task to queue is pretty simple.
from pyqs import task
@task(queue='email')
def send_email(subject, message):
pass
send_email.delay(subject='Hi there')
NOTE: This assumes that you have your AWS keys in the appropriate environment variables, or are using IAM roles. PyQS doesn’t do anything too special to talk to AWS, it only creates the appropriate boto connection.
If you don’t pass a queue, PyQS will use the function path as the queue name. For example the following function lives in email/tasks.py.
@task()
def send_email(subject):
pass
This would show up in the email.tasks.send_email queue.
You can also specify the function path if you want to reference a function in a different project:
@task(custom_function_path="foo.bar.send_email")
# This references function send_email in foo/bar.py instead of email/tasks.py
def send_email(subject):
pass
Reading Tasks
To read tasks we need to run PyQS. If the task is already in your PYTHON_PATH to be imported, we can just run:
$ pyqs email.tasks.send_email
If we want want to run all tasks with a certain prefix. This is based on Python’s fnmatch.
$ pyqs email.*
We can also read from multiple different queues with one call by delimiting with commas:
$ pyqs send_email,read_email,write_email
If you want to run more workers to process tasks, you can up the concurrency. This will spawn additional processes to work through messages.
$ pyqs send_email --concurrency 10
Simple Process Worker
To use a simpler version of PyQS that deals with some of the edge cases in the original implementation, pass the simple-worker flag.
$ pyqs send_email --simple-worker
The Simple Process Worker differs in the following way from the original implementation.
Does not use an internal queue and removes support for the prefetch-multiplier flag. This helps simply the mental model required, as messages are not on both the SQS queue and an internal queue.
When the simple-worker flag is passed, the default batchsize is 1 instead of 10. This is configurable.
- Does not check the visibility timeout when reading or processing a message from SQS.
Allowing the worker to process the message even past its visibility timeout means we solve the problem of never processing a message if max_receives=1 and we incorrectly set a shorter visibility timeout and exceed the visibility timeout. Previously, this message would have ended up in the DLQ, if one was configured, and never actually processed.
It increases the probability that we process a message more than once, especially if batchsize > 1, but this can be solved by the developer checking if the message has already been processed.
Hooks
PyQS has an event registry which can be used to run a function before or after every tasks runs.
from pyqs import task, events
def print_pre_process(context):
print({"pre_process": context})
def print_post_process(context):
print({"pre_process": context})
events.register_event("pre_process", print_pre_process)
events.register_event("post_process", print_post_process)
@task(queue="my_queue")
def send_email(subject):
pass
Operational Notes
Dead Letter Queues
It is recommended to use a Dead Letter Queue for any queues that are managed by PyQS. This is because the current strategy for fetching messages does not delete them upon initial receipt. A message is ONLY deleted from SQS upon successful completion. This is probably unexpected behavior if you are coming from Celery with SQS. Celery attempted to manage this behavior internally, with varying success.
If an error arises during message processing, it will be discarded and will re-appear after the visibility timeout. This can lead to behavior where there are messages that will never leave the queue and continuously throw errors. A Dead Letter Queue helps resolve this by collecting messages that have be retried a specified number of times.
Worker Seppuku
Each process worker will shut itself down after 100 tasks have been processed (or failed to process). This is to prevent issues with stale connections lingering and blocking tasks forever. In addition it helps guard against memory leaks, though in a rather brutish fashion. After the process worker shut itself down the managing process should notice and restart it promptly. The value of 100 is currently hard-coded, but could be configurable.
Queue Blocking
While there are multiple workers for reading from different queues, they all append to the same internal queue. This means that if you have one queue with lots of fast tasks, and another with a few slow tasks, they can block eachother and the fast tasks can build up behind the slow tasks. The simplest solution is to just run two different PyQS commands, one for each queue with appropriate concurrency settings.
Visibility Timeout
Care is taken to not process messages that have exceeded the visibility timeout of their queue. The goal is to prevent double processing of tasks. However, it is still quite possible for this to happen since we do not use transactional semantics around tasks. Therefore, it is important to properly set the visibility timeout on your queues based on the expected length of your tasks. If the timeout is too short, tasks will be processed twice, or very slowly. If it is too long, ephemeral failures will delay messages and reduce the queue throughput drastically. This is related to the queue blocking described above as well. SQS queues are free, so it is good practice to keep the messages stored in each as homogenous as possible.
Compatibility
Celery:
PyQS was created to replace celery inside of our infrastructure. To achieve this goal we wanted to make sure we were compatible with the basic Celery APIs. To this end, you can easily start trying out PyQS in your Celery-based system. PyQS can read messages that Celery has written to SQS. It will read pickle and json serialized SQS messages (Although we recommend JSON).
Operating Systems:
UNIX. Due to the use of the os.getppid system call. This feature can probably be worked around if anyone actually wants windows support.
Boto3:
Currently PyQS only supports a few basic connection parameters being explicitly passed to the connection. Any work boto3 does to transparently find connection credentials, such as IAM roles, will still work properly.
When running PyQS from the command-line you can pass --region, --access-key-id, and --secret-access-key to override the default values.
Caveats
Durability:
When we read a batch of messages from SQS we attempt to add them to our internal queue until we exceed the visibility timeout of the queue. Once this is exceeded, we discard the messages and grab a new batch. Additionally, when a process worker gets a message from the internal queue, the time the message was fetched from SQS is checked against the queues visibility timeout and discarded if it exceeds the timeout. The goal is to reduce double processing. However, this system does not provide transactions and there are cases where it is possible to process a message whos’ visibility timeout has been exceeded. It is up to you to make sure that you can handle this edge case.
Task Importing:
Currently there is not advanced logic in place to find the location of modules to import tasks for processing. PyQS will try using importlib to get the module, and then find the task inside the module. Currently we wrap our usage of PyQS inside a Django admin command, which simplifies task importing. We call the **_main()** method directly, skipping main() since it only performs argument parsing.
Running inside of containers
PyQS assumes that the process id is not 1. If you are running PyQS inside of a container, you should wrap it in supervisor or something like dummy-init.
Why not just use Celery?
We like Celery. We (Yipit.com) even sponsored the original SQS implementation. However, SQS is pretty different from the rest of the backends that Celery supports. Additionally the Celery team does not have the resources to create a robust SQS implementation in addition to the rest of their duties. This means the SQS is carrying around a lot extra features and a complex codebase that makes it hard to debug.
We have personally experienced some very vexing resource leaks with Celery that have been hard to trackdown. For our use case, it has been simpler to switch to a simple library that we fully understand. As this library evolves that may change and the the costs of switching may not be worth it. However, we want to provide the option to others who use python and SQS to use a simpler setup.
Changelog
1.0.1
Add MessageId for tracking task executions
1.0.0
Drop Py2 support
Add new SimpleProcessWorker (https://github.com/spulec/PyQS/pull/76)
0.1.6
Fix broken pickle of botocore clients.
0.1.5
Add events hooks for pre and post processors.
0.1.4
Improve behavior when a queue is created after a worker has started. The worker will now refresh the queues every 30 seconds to check for new queues.
0.1.3
Change PPID checking to check for actual parent ID, instead of PID 1. This fixes issues running on docker containers where PPID of 1 is expected.
0.1.2
419ce2e Merge pull request #56 from orangain/honor-aws-region
7c793d0 Merge pull request #55 from orangain/fix-indentation-error
0643fbb Honor aws region configured by .aws/config or env var
f5c1db9 Fix indentation error
cdae257 Merge pull request #52 from cedarai/master
a2ac378 Merge pull request #53 from p1c2u/fix/nosetest-remove-stop-parameter
dbaa391 Merge pull request #51 from p1c2u/fix/pep8-styles-fixes
1577382 Nosetest remove stop parameter
b7420e3 Add current directory to PYTHONPATH
8d04b62 Graceful shutdown logging msg fix
796acbc PEP8 styles fixes
72dcb62 Merge pull request #50 from hobbsh/add_example
d00d31f Update readme
dfbf459 Use .delay() to submit messages
612158f Merge pull request #49 from hobbsh/no_log_0_msg
09a649f Use logger.debug for success SQS log line
dfd56c3 Fix typos in readme
a774155 Add example flask app
17e7b7c Don’t log message retrieve success when there are 0 messages
14eb827 Add shutdown signal logging.
0.1.1
Fix KeyError on accounts without queues
0.1.0
Upgrade to boto3
0.0.22
Fix Python 3 support
Allow overwriting the delay_seconds attibute at call time
0.0.21
Add ability to tune PREFETCH_MULTIPLIER with --prefetch-multiplier.
0.0.20
Respect --batch-size when sizing internal queue on ManagerWorker
0.0.19
Add ability to run with tunable BATCHSIZE and INTERVAL. Thanks to @masayang
Add ability to create tasks with a visibility delay. Thanks to @joshbuddy
Add ability to create tasks with a custom function location, allowing cross project tasks
0.0.18
Convert Changelog to .rst
Add Changelog to long description on Pypi. Thanks to @adamchainz
0.0.17
Fix typos in README
Add notes on Dead Letter Queues to README
0.0.16
Switch README to reStructuredText (.rst) format so it renders on PyPI
0.0.15
Process workers will kill themselves after attempting to process 100 requests, instead of checking the internal queue 100 times.
If we find no messages on the internal queue, sleep for a moment before rechecking.
0.0.14
Process workers will kill themselves after processing 100 requests
Process workers will check a message’s fetch time and visibility timeout before processing, discarding it if it has exceeded the timeout.
Log the process_time() used to process a task to the INFO level.
0.0.13
Only pass SQS Queue ID to internal queue. This is attempting to fix a bug when processing messages from multiple queues.
0.0.12
Remove extraneous debugging code
0.0.11
Add additional debugging to investigate message deletion errors
0.0.10
Give each process worker its own boto connection to avoid multiprocess race conditions during message deletion
0.0.9
Change long polling interval to a valid value, 0<=LPI<=20
0.0.8
Switched to long polling when pulling down messages from SQS.
Moved message deletion from SQS until after message has been processed.
0.0.7
Added capability to read JSON encoded celery messages.
0.0.6
Switched shutdown logging to INFO
Added brief sleep to message retrieval loop so that we don’t look like we are using a ton of CPU spinning.
0.0.5
Switching task failure logging to ERROR (actually this time)
Moved task success logging to INFO
Added INFO level logging for number of messages retrieved from an SQS queue.
Moved Reader and Worker process counts to DEBUG
0.0.4
Added ability to pass region, access_key_id and secret_access_key through to Boto when creating connections
Switched logging of task failure to the ERROR logger, from INFO.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pyqs-1.0.1.tar.gz
.
File metadata
- Download URL: pyqs-1.0.1.tar.gz
- Upload date:
- Size: 21.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 56a36bfa88020111f6dc58ef485f525d47d08e5d195fc45fc1f54ec24a91bad7 |
|
MD5 | f09c0bdc6406e607c830423bbc93c90a |
|
BLAKE2b-256 | f2dc3199101fb15909a89421cef68d7045f9759e7f0bfcd9a40c0cc3b5f3ec44 |
File details
Details for the file pyqs-1.0.1-py3-none-any.whl
.
File metadata
- Download URL: pyqs-1.0.1-py3-none-any.whl
- Upload date:
- Size: 16.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fdd97625afeb4f675008a41667b926307bf248b19c9a32f566feea0af2ae7600 |
|
MD5 | dbeab5f463e730c3dc298001b143fce3 |
|
BLAKE2b-256 | ae3d53da6006926985c0c3346f431c4e3ed63d6045c6ad7e6ae12367e5d298cd |