Skip to main content

Distributed event handling on top of Celery

Project description

Celery-AMQP-Events is a library that implements voluntary events handling on top of Celery.

  • AMQP-based robustness of event handling
  • Celery tasks interface
  • Anti-flood tactics

Installation

pip install celery-amqp-events

Configuration

  1. Pass a unique "service name" to Celery() instance for each service that has event handlers (see amqp_events.celery:events_app).
  2. Tell celery with imports setting where to find event handlers.
  3. Configure broker connection and other celery settings.
  4. Leave result backend empty - each event may have multiple consumers, event result is meaningless in this case.

You absolutely need to set separate name for each service that consumes events, because without that each fired event will be handled only by single randomly chosen service, because your services will share same queue for this event.

from amqp_events.celery import events_app

app = events_app(
    "service_name",  # important in multi-service environment
    imports=['demo.tasks'],  # modules where to find event handlers
    broker_url='amqp://guest:guest@rabbitmq:5672/',
)

Adding events and handlers

from demo.celery import app

@app.event('service_name.object_name.event_name')
def even_number_generated(number: int):
    # You may add some validation logic in event function body;
    if number % 2 != 0:
        raise ValueError("number is not even")

@app.handler('service_name.object_name.event_name')
def on_even_number_generated(number: int):
    # Handle event somehow
    print(f"even number {number} generated")

Running

  • Start ordinary celery worker for your consumer service

Note that mingle, gossip and heartbeat should be disabled if not used. These algorithms use broadcast events, which means that you'll have N^2 messages in RabbitMQ for N workers without any purpose.

celery worker -A your_service.celery \
  --without-mingle --without-heartbeat --without-gossip

Sending events

import random
from demo.events import number_is_even

try:
    number_is_even(random.randint(0, 100))
except ValueError:
    print("oops, number was odd")

Robustness

  • If event fails with unhandled error, it is retried to separate queue with exponential backoff.
  • Backoff is used to prevent resources exhausting (like free http connections)
  • If no retry attempts left, unhandled event is moved to "archive" queue
  • Archive is used to catch messages which always produce an error in consumer; these messages can be manually retried when fix is released.
  • Archive is limited both by message TTL and message count limit, so alerts should exist.
  • Retry is done via separate queue because of multiple reasons:
    • using countdown forces consumer to keep "unacknowledged" events in memory, which is bad for load balancing and resource usage.
    • retrying to same queue will slow down event processing if retry probability is high enough
    • two faulty consumers retrying same event with same routing key will cause exponential growth of message count in RabbitMQ because message is split to multiple messages when published from same exchange to multiple queues.
  • By default, some fault-tolerance celery settings are enabled:
    • task_acks_late will delay task acknowledgement till end of processing
    • task_reject_on_worker_lost will prevent ack if worker was killed
    • Task.autoretry_for is set to retry on any Exception automatically
    • disabling task_acks_on_failure_or_timeout forces Celery to reject failed messages if autoretry_for failed to handle this.
    • confirm_publish in broker_transport_options will block producer till broker will confirm that it received incoming message.

Delay on broker side

Celery`s default retry mechanics is:

  • construct a copy of currently handling message
  • change arguments or options for the new message from retry params
  • publish the copy to the same exchange and routing_key found in message delivery_info
  • acknowledge current message
  • after receiving the new message, celery puts it to an in-memory queue to wait until message ETA, and then pass this message to a first free worker.

All this is because AMQP protocol doesn't define any delay-related properties. Currently, RabbitMQ has a plugin for delayed messages and some tricky schemes that allow to delay message delivery on the broker side. We use one of these schemes.

Retry causes

A Celery task could be retried in different cases:

  • manual retry from task implementation
  • retry on an exception listed in Task.autoretry_for
  • also a task may be rejected if retry() failed, if task task failed or if task has timeouted

Retry routing

To implement broker-side delays EventsCelery declares a set of retry exchanges and queues:

  • each exchange and queue is prefixes with "service name"
  • each queue is declared with x-message-ttl that is equal to 2^n, where n is AMQP_EVENTS_MAX_RETRIES env variable and Task.max_retries settings for every task handler.
  • each queue also defines x-dead-letter-exchange argument, which points to "recover" exchange
  • this "recover" exchange has topic type and is bound to same queues as default "events" exchange
  • "retry" exchange has type fanout, so any message published to this exchange, regardless it's routing key, will be routed to corresponding queue
  • with "message ttl" set up, retried message will expire in 2^n seconds, which is basically exponential backoff implementation
  • because of "dead letter exchange", expired message is moved to recover exchange, from which it is routed again to initial event queue via initial message routing key.

Reject handling

It's important to retry rejected messages when something goes wrong.

  • Each celery task queue is declared with x-dead-letter-exchange argument, which points to first "retry" exchange.
  • As described in "retry routing" section, rejected message is moved by RabbitMQ to a retry queue; after a second it expires and is moved again to "recover" exchange, and after that to initial event queue.

Archiving

By default, Celery just drops a task when MaxRetriesExceededError happens, but we want to archive such events for some time:

  • EventsCelery declares a fanout "archive" exchange and corresponding queue
  • "archive" exchange and queue are also prefixed with "service name"
  • "archive" queue is declared with x-message-ttl and x-max-length arguments, and message archive storage is limited both by time and message count.
  • When EventsCelery finds that task retries count is exceeded, it retries this task to "archive" exchange.

Caveeats

  1. Message reordering. What if broker-side delay will shuffle normal message ordering? This may break message ETA handling of change order of events.
    • By default, Celery receives retried and delayed message and keeps it in memory till message ETA. Before ETA it handles other incoming messages, and if these new messages have later ETA, they will be scheduled at corresponding time.
    • This is how things work when there are less than prefetch_multiplier messages in an incoming queue. If there are lot's of incoming messages, retried messages will arrive behind ETA time and of cource this changes event order.
    • With broker-side delay implementation task message-ttl is guaranteed to be less or equal to task ETA, so for celery worker the situation will look as a very long amqp basic.publish call. If there are no messages, celery-side ETA logic will apply and the task will start in time.
  2. Queue count. RabbitMQ is sensitive to a total queue count. What if we declare too much queues?
    • Total queue count is linear to a max retries count and to events count.
    • Each retry queue is available to any Celery task and implements a delay equal to a degree of 2. 20 queues provide max delay more than 23 days, but if somebody needs milliseconds delays, he will need 10 more queues.
    • By default each event is routed to it's own queue, but with topic exchange you can manually set up a set of queues which handle groups of event types. You can even manually setup a single queue for each handled event.
    • Worst case needs N + M + 1 queue, where N is max_retries limit and M is number of handled events.
    • Best case needs 2 queues, if max retries is set to 1 and all events are routed to single queue. "Archive" queue may be also disabled.
  3. Celery internals. What if Celery will change and all this stuff will break?
    • To trigger auxiluary queues declaration we use celery signals, which we consider as public interface.
    • To declare queues we use Celery.broker_connection() default channel and this could be changed to use separate amqp connection.
    • To bind event queues we extend Celery.conf.task_queues with kombu entities, which provide public interface for queue arguments and additional bindings.
    • To perform retries, we change Task.retry arguments, this is also public interface.
    • Other things set up to provide more robust events handling are described in Configuration and defaults section of Celery documentation.
    • At last, we have plans to run integration tests with real celery workers, not only unit-tests for mocked celery internals.

Broker-side Pros

  • Observability. There is no way to distinct delayed task and task in a deadlock state except celery events mechanics. RabbitMQ API provides clear way to monitor a number of retried and archived messages.
  • Lesser memory usage. Default Celery implementation may keep thousands of delayed messages in worker memory. Also it affects AMQP channel qos parameter, which could not be more that 1000 - this messes things up. Broker-side delayed messages are stored on disk in RabbitMQ db.
  • Autoretry for failed tasks. By default, each rejected message is lost forever. With our setup it will be retried, which is good for delivery guarantees but is also bad is message handling breaks constantly (i.e. in case of persistent celery failure). This should be handled carefully.
  • Robustness. Because of asynchronous nature of events handling pattern, we do all efforts to ensure that every single-time fired event is not lost somewhere between network failures and unhandled exceptions in business logic. It's worth mentioning that RabbitMQ is considered as durable system and we don't touch upon broker failures.

Cons

  • Celery internals. Despite public celery interface usage, we can't guarantee that changes in default celery behavior won't affect broker-side retry model as Celery authors decide to move fast and break things in 5.x and later versions.
  • Complex logic. Debugging such complex algorithms requires deep RabbitMQ architecture and celery-amqp-events code knowledge. It's not trivial to find "the man who can".
  • Broker memory usage. Broker-side retry system may require a lot of memory and disk space at RabbitMQ, and it's whell-known that RabbitMQ performs much better without any disk usage. Retries should not be used as "normal" event handling flow.

Related projects

Celery-message-consumer

Robustness tactics is inspired by celery-message-consumer project which aims to handle events published to AMQP broker from non-celery-based projects (maybe from other languages). The main difference is that Celery-AMQP-Events uses Celery tasks instead of including additional consumer step nearby the celery worker.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-amqp-events-0.1.0.tar.gz (17.3 kB view hashes)

Uploaded Source

Built Distribution

celery_amqp_events-0.1.0-py3-none-any.whl (13.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page