Skip to main content

SQS Workers.

Project description

SQS Workers

How can I use it?

Unless you are the part of the Doist development team, you most likely don't need it. It's something opinionated, built out of our own internal needs and probably provides little value for outside developers.

Queue processors are in abundance (see http://queues.io/ for examples), and there is no shortage of SQS queue processors on PyPI, so please don't put your high hopes on this particular implementation

Got it, but how can I start using it anyway?

Install the package with

pip install sqs-workers

Configure your boto3 library to provide access requisites for your installation with something like this:

aws configure

Don't forget to set your preferred AWS region.

Then you will start managing two systems (most likely, from the same codebase): one of them adds messages to the queue and another one executes them.

from sqs_workers import SQSEnv

# This environment will use AWS requisites from ~/.aws/ directory
sqs = SQSEnv()

# Create a new queue.
# Note that you can use AWS web interface for the same action as well, the
# web interface provides more options. You only need to do it once.
sqs.create_standard_queue('emails')

# Register a queue processor
@sqs.processor('emails', 'send_email')
def send_email(to, subject, body):
    print(f"Sending email {subject} to {to}")

Then there are two ways of adding tasks to the queue. Classic (aka "explicit"):

sqs.add_job(
    'emails', 'send_email', to='user@examile.com', subject='Hello world', body='hello world')

And the "Celery way" (we mimic the Celery API to some extent)

send_email.delay(to='user@examile.com', subject='Hello world', body='hello world')

To process the queue you have to run workers manually. Create a new file which will contain the definition of the sqs object and register all processors (most likely, by importing necessary modules from your project), and then run SQS

from sqs_workers import SQSEnv
sqs = SQSEnv()
...
sqs.process_queue('emails')

In production we usually don't handle multiple queues in the same process, but for the development environment it's easier to tackle with all the queues at once with

sqs.process_queues()

Serialization

There are two serializers: json and pickle.

Exception processing

If task processing ended up with an exception, the error is logged and the task is returned back to the queue after a while. The exact behavior is defined by queue settings.

Batch processing

Instead of using sqs.processor decorator you can use sqs.batch_processor. In this case the function must accept parameter "messages" containing the list of dicts.

Dead-letter queues and redrive

On creating the queue you can set the fallback dead-letter queue and redrive policy, which can look like this

from sqs_workers import SQSEnv
sqs = SQSEnv()
sqs.create_standard_queue('emails_deadletters')
sqs.create_standard_queue('emails', 
    redrive_policy=sqs.redrive_policy('emails_deadletters', 3)
)

This means "move the message to the email_deadletters queue after four (3 + 1) failed attempts to send it to the recipient"

Backoff policies

You can define the backoff policy for the entire environment or for specific processor.

@sqs.processor('emails', 'send_email', backoff_policy=DEFAULT_BACKOFF)
def send_email(to, subject, body):
    print(f"Sending email {subject} to {to}")

Default policy is the exponential backoff. It's recommended to always set both backoff policy and dead-letter queue to limit the maximum number of execution attempts.

Alternatively you can set the backoff to IMMEDIATE_RETURN to re-execute failed task immediately.

@sqs.processor('emails', 'send_email', backoff_policy=IMMEDIATE_RETURN)
def send_email(to, subject, body):
    print(f"Sending email {subject} to {to}")

Shutdown policies

When launching the queue processor with process_queue(), it's possible to optionally set when it has to be stopped.

Following shutdown policies are supported:

  • IdleShutdown(idle_seconds): return from function when no new tasks are seend for specific period of time

  • MaxTasksShutdown(max_tasks): return from function after processing at least max_task jobs. Can be helpful to prevent memory leaks

Default policy is NeverShutdown. It's also possible to combine two previous policies with OrShutdown or AndShutdown policies, or create custom classes for specific behavior.

Example of stopping processing the queue after 5 minutes of inactivity:

from sqs_workers import SQSEnv
from sqs_workers.shutdown_policies import IdleShutdown

sqs = SQSEnv()
sqs.process_queue('foo', shutdown_policy=IdleShutdown(idle_seconds=300))

Processing dead-letter queue by pushing back failed messages

The most common way to process a dead-letter queue is to fix the main bug causing messages to appear there in the first place, and then to re-process these messages again.

With sqs-workers in can be done by putting back all the messages from a dead-letter queue back to the main one. It can be performed with a special fallback processor called DeadLetterProcessor.

The dead-letter processor has opinion on how queues are organized and uses some hard-coded options.

It is supposed to process queues "something_dead" which is supposed to be a configured dead-letter queue for "something". While processing the queue, the processor takes every message and push it back to the queue "something" with a hard-coded delay of 1 second.

If the queue name does't end with "_dead", the DeadLetterProcessor behaves like generic FallbackProcessor: shows the error message and keep message in the queue. It's made to prevent from creating infinite loops when the message from the dead letter queue is pushed back to the same queue, then immediately processed by the same processor again, etc.

Usage example:

from sqs_workers import SQSEnv
from sqs_workers.processors import DeadLetterProcessor
from sqs_workers.shutdown_policies import IdleShutdown

sqs = SQSEnv(fallback_processor_maker=DeadLetterProcessor)
sqs.process_queue("foo_dead", shutdown_policy=IdleShutdown(10))

This code takes all the messages in foo_dead queue and push them back to the foo queue. Then it waits 10 seconds to ensure no new messages appear, and quit.

Processing dead-letter with processors from the main queue

Instead of pushing back tasks to the main queue you can copy processors from the main queue to dead-letter and process all tasks in place.

Usage example:

from sqs_workers import SQSEnv
from sqs_workers.shutdown_policies import IdleShutdown

sqs = SQSEnv()
...
sqs.copy_processors('foo', 'foo_dead')
sqs.process_queue('foo_dead', shutdown_policy=IdleShutdown(10))

Using in unit tests with MemoryEnv

There is a special MemoryEnv which can be used as a quick'n'dirty replacement for real queues in unit tests. If you have a function create_task which adds some tasks to the queue and you want to test how it works, you can technically write your tests like this:

from sqs_workers import SQSEnv
env = SQSEnv()

def test_task_creation_side_effects():
    create_task()
    env.process_batch('foo')
    ...

The problem is that your test starts depending on AWS (or localstack) infrastructure, which you don't always need. What you can do instead is you can replace SQSEnv with a MemoryEnv() and rewrite your tests like this.

from sqs_workers.memory_env import MemoryEnv
env = MemoryEnv()

Please note that MemoryEnv has some serious limitations, and may not fit well your use-case. Namely, when you work with MemoryEnv:

  • Redrive policy doesn't work
  • There is no differences between standard and FIFO queues
  • FIFO queues don't support content-based deduplication
  • Delayed tasks executed ineffectively: the task is gotten from the queue, and if the time hasn't come yet, the task is put back.
  • API can return slightly different results

Testing with AWS

Make sure you have all dependencies installed, and boto3 client configured (ref) and then run

pytest -k aws

Alternatively, to test all supported versions, run

tox -- -k aws

Testing with localstack

Localstack tests should perform faster than testing against AWS, and besides, they work well in offline.

Run localstack and make sure that the SQS endpoint is available by its default address http://localhost:4576

Then run

pytest -k localstack

or

tox -- -k localstack

Why it depends on werkzeug? 😱

The only reason is werkzeug.utils.validate_arguments which we love and we are lazy enough to move it to this codebase.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqs-workers-0.3.5.tar.gz (19.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqs_workers-0.3.5-py2.py3-none-any.whl (19.4 kB view details)

Uploaded Python 2Python 3

File details

Details for the file sqs-workers-0.3.5.tar.gz.

File metadata

  • Download URL: sqs-workers-0.3.5.tar.gz
  • Upload date:
  • Size: 19.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.7.0

File hashes

Hashes for sqs-workers-0.3.5.tar.gz
Algorithm Hash digest
SHA256 d7decc4e4eecbaa48db1e21dbabb5a510e97f948f02abb26e7254bc9c0d60564
MD5 cf2d1e2ad4ae9c2f08fae0ad50d86d2b
BLAKE2b-256 7ff5b7e6ef817be767595bca15791b5ba30f81e064841995573c3ba79e8b9bce

See more details on using hashes here.

File details

Details for the file sqs_workers-0.3.5-py2.py3-none-any.whl.

File metadata

  • Download URL: sqs_workers-0.3.5-py2.py3-none-any.whl
  • Upload date:
  • Size: 19.4 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.7.0

File hashes

Hashes for sqs_workers-0.3.5-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 62453b736fda52be322a8a5ea3f971096660bdc36d841443b7abe6439ac60037
MD5 7d60769c5642618f0eb95b839ecddef5
BLAKE2b-256 bdda0c4dd7233aa8b4ce9c5ab3cfdff00095483eca4a7b59bd39cb5f3824e53a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page