Skip to main content

Celery and RabbitMQ producer and consumer made easy with celery-rmq

Project description

celery-rmq

celery-rmq is a python library to implement Celery and RabbitMQ message broker. it's a standalone library required celery and kombu (python library for rabbitmq). Celery and RabbitMQ producer and consumer made easy with celery-rmq

Quick Start

First of all, install rabbitmq and start.

Create a python app and create an virtual environment too. Activate the virtual environment.

1:: Install it:

pip install celery-rmq

2:: Basic Usage:

** app.py **

from celery_rmq.app import CeleryAppProvider
from celery_rmq.exchange import register_exchange
from celery_rmq.queue import register_queue

from .consumers.testconsumer import BasicTestConsumer

apps = ['tests.testapp']

app_provider = CeleryAppProvider(app_name='test_celery_broker', installed_apps=apps)

app = app_provider.get_app()


# adding exchanges

def register_exchanges():
    register_exchange(app_provider, "test_exchange")


def register_queues():
    register_queue(app_provider, "test_queue", "test_routing", "test_exchange")


def add_consumers():
    app_provider.add_consumer(BasicTestConsumer)


# execution of following two functions
# "register_exchanges()" and "register_queues()"
# needs to be synced
# i.e one after another.
# because queues are depended on exchanges

register_exchanges()
register_queues()

add_consumers()

app.start()

register as many as exchanges, queues and consumers

3:: consumer.py

import kombu
from celery import bootsteps

from celery_rmq.registry import get_queue


class BasicTestConsumer(bootsteps.ConsumerStep):

    def handle_message(self, body, message):
        print(body)
        message.ack()

    def get_consumers(self, channel):
        queue = get_queue("test_queue", "test_routing")
        return [kombu.Consumer(
            channel,
            queues=[queue],
            callbacks=[self.handle_message],
            accept=['json']
        )]

4:: If you want to add this with any framework, create an app (example: Django apps) and create a tasks.py file under the app:

from __future__ import absolute_import, unicode_literals

from celery import shared_task

from tests.app import app_provider


@shared_task
def simple_json_message(message, exchange_name, route_key):
    producer = app_provider.get_producer()
    producer.publish(message, content_type='application/json', exchange=exchange_name, routing_key=route_key)

   and call the simple_json_message function from the view as per your need.

5:: Run the celery worker:

celery worker -l info -A app

here app is app.py file

Done.

for testing purpose, we would like to send messages directly from RabbitMQ web panel. Go the the queue section and publish message.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-rmq-0.0.2.tar.gz (6.7 kB view hashes)

Uploaded Source

Built Distribution

celery_rmq-0.0.2-py3-none-any.whl (9.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page