Skip to main content

AMQP resource for Tamarco microservice framework.

Project description

tamarco-amqp

Build Status Coverage Quality Gate Status

AMQP resource for Tamarco microservice framework.

Settings

This resource depends on the following configuration schema:

    system:
        resources:
            amqp:
                host: 127.0.0.1
                port: 5672
                vhost: /
                user: microservice
                password: 1234
                connection_timeout: 10
                queues_prefix: "prefix"

Inputs and outputs

The inputs and outputs need to be declared in the resource.

Three different communication patterns can be used.

A specific input is required for each message pattern, but the same output is valid for all message patterns. Each message pattern has a different method in the output.

With the JsonCodec the input and the output are directly Python dictionaries.

Publish-subscribe pattern

In the pub-sub pattern a message reaches all the services subscribed to the queue. It can be used to notify events to anyone who needs it.

Input

The AMQPSubscriptionInput can be used as a decorator.

class AmqpMicroservice():
    amqp = AMQPResource()

    @AMQPSubscriptionInput(resource=amqp, queue='cows', codec=JsonCodec)
    async def consume_messages(self, message):
        self.logger.info(f'Consumed message from cows subscription queue: {message}')

Or as a async iterator:

class AmqpMicroservice():
    cows_input = AMQPSubscriptionInput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource(inputs=[cows_input])

    @task
    async def consume_cows(self):
        async for message in self.cows_input:
            self.logger.info(f'Consumed message from cows subscription queue: {message}')

Output

Use the publish method of the output.

class AmqpMicroservice():
    cows_output = AMQPOutput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource(outputs=[cows_output])

    @task_timer(interval=1000, autostart=True)
    async def metric_producer(self):
        await cows_output.publish({'my_json_message': 'to_cow_queue'})

Push-pull pattern

The push-pull pattern is a work queue where each message is only pulled by one of the services subscribed to the queue. Commonly used to distribute the work in a pull of instances.

Input

AMQPPullInput is used to declare a input pull queue, and example used as decorator:

class AmqpMicroservice():
    amqp = AMQPResource()

    @AMQPPullInput(resource=amqp, queue='cows', codec=JsonCodec)
    async def consume_messages(self, message):
        self.logger.info(f'Consumed message from cows pull queue: {message}')

It can be used as an async iterator in the same way that the pub/sub pattern.

Output

Use the push method of the AMQPOutput. In the following

class AmqpMicroservice():
    cows_output = AMQPOutput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource(outputs=[cows_output])

    @task_timer(interval=1000, autostart=True)
    async def metric_producer(self):
        await cows_output.push({'my_json_message': 'to_cow_queue'})

Request-response pattern

In the request-response pattern each request is handled by one of the instances subscribed to the queue and unlike the other patterns, each request has an answer.

Input

AMQPRequestInput is used to declare a input request response queue. An example used as decorator:

class AmqpMicroservice():
    amqp = AMQPResource()

    @AMQPRequestInput(resource=amqp, queue='cows', codec=JsonCodec)
    async def consume_messages(self, message, response_handler):
        self.logger.info(f'Consumed message from cows queue: {message}')
        await response_handler.send_response({'cows': 'response'})

In this case the handler takes two input objects, and one of them send the response.

Output

The output is like the rest of them but it returns a response.

class AmqpMicroservice():
    cows_output = AMQPOutput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource()

    @task_timer(interval=1000, autostart=True)
    async def request_response(self):
        message = {'cow': 'MOOOO'}
        response = await cows_output.request(message)
        self.logger.info(f'Response from cows queue: {response}')

How to run the examples

To run them you just need to launch the docker-compose, install the requirements and run it.

pip install -r examples/requirements.txt
docker-compose up -d
python examples/microservice_stream_input.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tamarco-amqp-0.1.0.tar.gz (14.6 kB view details)

Uploaded Source

File details

Details for the file tamarco-amqp-0.1.0.tar.gz.

File metadata

  • Download URL: tamarco-amqp-0.1.0.tar.gz
  • Upload date:
  • Size: 14.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.1

File hashes

Hashes for tamarco-amqp-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c81ae3ec0f453bd3394a1c28f79b224ea26795abe1cbf92d79370ea10448c49e
MD5 1c06847c2f4f7b4f036fd57d79fc452e
BLAKE2b-256 10c4488edb812d1934bfdae44761cb71b9635ad245c42a9f7778392c86320eb7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page