Skip to main content

AMQP resource for Tamarco microservice framework.

Project description

tamarco-amqp

Build Status Coverage Quality Gate Status

AMQP resource for Tamarco microservice framework.

Settings

This resource depends on the following configuration schema:

    system:
        resources:
            amqp:
                host: 127.0.0.1
                port: 5672
                vhost: /
                user: microservice
                password: 1234
                connection_timeout: 10
                queues_prefix: "prefix"

Inputs and outputs

The inputs and outputs need to be declared in the resource.

Three different communication patterns can be used.

A specific input is required for each message pattern, but the same output is valid for all message patterns. Each message pattern has a different method in the output.

With the JsonCodec the input and the output are directly Python dictionaries.

Publish-subscribe pattern

In the pub-sub pattern a message reaches all the services subscribed to the queue. It can be used to notify events to anyone who needs it.

Input

The AMQPSubscriptionInput can be used as a decorator.

class AmqpMicroservice():
    amqp = AMQPResource()

    @AMQPSubscriptionInput(resource=amqp, queue='cows', codec=JsonCodec)
    async def consume_messages(self, message):
        self.logger.info(f'Consumed message from cows subscription queue: {message}')

Or as a async iterator:

class AmqpMicroservice():
    cows_input = AMQPSubscriptionInput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource(inputs=[cows_input])

    @task
    async def consume_cows(self):
        async for message in self.cows_input:
            self.logger.info(f'Consumed message from cows subscription queue: {message}')

Output

Use the publish method of the output.

class AmqpMicroservice():
    cows_output = AMQPOutput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource(outputs=[cows_output])

    @task_timer(interval=1000, autostart=True)
    async def metric_producer(self):
        await cows_output.publish({'my_json_message': 'to_cow_queue'})

Push-pull pattern

The push-pull pattern is a work queue where each message is only pulled by one of the services subscribed to the queue. Commonly used to distribute the work in a pull of instances.

Input

AMQPPullInput is used to declare a input pull queue, and example used as decorator:

class AmqpMicroservice():
    amqp = AMQPResource()

    @AMQPPullInput(resource=amqp, queue='cows', codec=JsonCodec)
    async def consume_messages(self, message):
        self.logger.info(f'Consumed message from cows pull queue: {message}')

It can be used as an async iterator in the same way that the pub/sub pattern.

Output

Use the push method of the AMQPOutput. In the following

class AmqpMicroservice():
    cows_output = AMQPOutput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource(outputs=[cows_output])

    @task_timer(interval=1000, autostart=True)
    async def metric_producer(self):
        await cows_output.push({'my_json_message': 'to_cow_queue'})

Request-response pattern

In the request-response pattern each request is handled by one of the instances subscribed to the queue and unlike the other patterns, each request has an answer.

Input

AMQPRequestInput is used to declare a input request response queue. An example used as decorator:

class AmqpMicroservice():
    amqp = AMQPResource()

    @AMQPRequestInput(resource=amqp, queue='cows', codec=JsonCodec)
    async def consume_messages(self, message, response_handler):
        self.logger.info(f'Consumed message from cows queue: {message}')
        await response_handler.send_response({'cows': 'response'})

In this case the handler takes two input objects, and one of them send the response.

Output

The output is like the rest of them but it returns a response.

class AmqpMicroservice():
    cows_output = AMQPOutput(queue='cows', codec=JsonCodec)
    amqp = AMQPResource()

    @task_timer(interval=1000, autostart=True)
    async def request_response(self):
        message = {'cow': 'MOOOO'}
        response = await cows_output.request(message)
        self.logger.info(f'Response from cows queue: {response}')

How to run the examples

To run them you just need to launch the docker-compose, install the requirements and run it.

pip install -r examples/requirements.txt
docker-compose up -d
python examples/microservice_stream_input.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tamarco-amqp-0.1.0.tar.gz (14.6 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page