Skip to main content

Celery and RabbitMQ producer and consumer made easy with celery-rmq

Project description

celery-rmq

celery-rmq is a python library to implement Celery and RabbitMQ message broker. it's a standalone library required celery and kombu (python library for rabbitmq). Celery and RabbitMQ producer and consumer made easy with celery-rmq

Quick Start

First of all, install rabbitmq and start.

Create a python app and create an virtual environment too. Activate the virtual environment.

1:: Install it:

pip install celery-rmq

2:: Basic Usage:

** app.py **

from celery_rmq.app import CeleryAppProvider
from celery_rmq.exchange import register_exchange
from celery_rmq.queue import register_queue

from .consumers.testconsumer import BasicTestConsumer

apps = ['tests.testapp']

app_provider = CeleryAppProvider(app_name='test_celery_broker', installed_apps=apps)

app = app_provider.get_app()


# adding exchanges

def register_exchanges():
    register_exchange(app_provider, "test_exchange")


def register_queues():
    register_queue(app_provider, "test_queue", "test_routing", "test_exchange")


def add_consumers():
    app_provider.add_consumer(BasicTestConsumer)


# execution of following two functions
# "register_exchanges()" and "register_queues()"
# needs to be synced
# i.e one after another.
# because queues are depended on exchanges

register_exchanges()
register_queues()

add_consumers()

app.start()

register as many as exchanges, queues and consumers

3:: consumer.py

import kombu
from celery import bootsteps

from celery_rmq.registry import get_queue


class BasicTestConsumer(bootsteps.ConsumerStep):

    def handle_message(self, body, message):
        print(body)
        message.ack()

    def get_consumers(self, channel):
        queue = get_queue("test_queue", "test_routing")
        return [kombu.Consumer(
            channel,
            queues=[queue],
            callbacks=[self.handle_message],
            accept=['json']
        )]

4:: If you want to add this with any framework, create an app (example: Django apps) and create a tasks.py file under the app:

from __future__ import absolute_import, unicode_literals

from celery import shared_task

from tests.app import app_provider


@shared_task
def simple_json_message(message, exchange_name, route_key):
    producer = app_provider.get_producer()
    producer.publish(message, content_type='application/json', exchange=exchange_name, routing_key=route_key)

   and call the simple_json_message function from the view as per your need.

5:: Run the celery worker:

celery worker -l info -A app

here app is app.py file

Done.

for testing purpose, we would like to send messages directly from RabbitMQ web panel. Go the the queue section and publish message.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-rmq-0.0.2.tar.gz (6.7 kB view details)

Uploaded Source

Built Distribution

celery_rmq-0.0.2-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file celery-rmq-0.0.2.tar.gz.

File metadata

  • Download URL: celery-rmq-0.0.2.tar.gz
  • Upload date:
  • Size: 6.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.8.0

File hashes

Hashes for celery-rmq-0.0.2.tar.gz
Algorithm Hash digest
SHA256 21e5cdf760268caec8f0cb1424172cf205f6d04e202dfd9533b44d922b4676fa
MD5 996496a55497d8a827ca2f2b35aba282
BLAKE2b-256 15e245a6bd568f74dfc9b47b6481416f0b96469c73e8044ee472d83a47020a27

See more details on using hashes here.

File details

Details for the file celery_rmq-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: celery_rmq-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 9.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.8.0

File hashes

Hashes for celery_rmq-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4b39379dca6ae446a472ecb03dfdbc2679ed54c4aff28c728d753b02b8eccb9c
MD5 3476d4d295e491befb57b3e70f4af6bc
BLAKE2b-256 8c58e92d29b880c12aa2e735589ff458ad71b88547470e88b1cbc570f4792bb8

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page