Skip to main content

Baguette messaging framework

Project description

https://travis-ci.org/baguette-io/baguette-messaging.svg?branch=master

Tiny Framework to build micro services. Currently only support amqp, rpc over amqp and http stream based micro services.

How it works

AMQP

We declare a publisher:
the method decorated takes one parameter publish (farine.amqp.publisher.Publisher)
import farine.amqp

class Publish(object):

    @farine.amqp.publish(exchange='consume', routing_key='routing_key')
    def publish_dummy(self, publish):
        """
        :param publish: Send the data through AMQP.
        :type publish: farine.amqp.publisher.Publisher
        """
        publish({'result':0})
And then a consumer :
the method decorated takes two parameters body (dict) and message (kombu.message.Message).
import farine.amqp

class Consume(object):

    @farine.amqp.consume(exchange='publish', routing_key='routing_key')
    def consume_dummy(self, body, message):
        """
        :param body: The message's data.
        :type body: dict
        :param message: Message class.
        :type message: kombu.message.Message
        """
        message.ack()

RPC over AMQP

We need declare two services :
the server : Wait for a call(consumer), and answer(publisher).
The method decorated just takes args and kwargs
import farine.rpc

class Server(object):

    @farine.rpc.method()
    def dummy(self, *args, **kwargs):
        return True
And the client : Send a call(publisher), and wait for an answer(consumer).
the method decorated takes one argument rpc (farine.rpc.client.Client).
The result will be a dictionnary.
import farine.rpc

class Client(object):

    @farine.rpc.client('myotherservice')
    def dummy(self, rpc):
        """
        :param rpc: The RPC client.
        :type rpc: farine.rpc.client.Client
        """
        result = rpc.dummy()

RPC Stream

We can also do streaming RPC call.
All you need to do is to add __stream__ = True* to your RPC call.
Also, a generator is returned.

Example:

import farine.rpc

class Server(object):

    @farine.rpc.method()
    def dummy(self, *args, **kwargs):
        yield 'a'
        yield 'b'
import farine.rpc

class Client(object):

    @farine.rpc.client('myotherservice')
        def dummy(self, rpc):
        """
        :param rpc: The RPC client.
        :type rpc: farine.rpc.client.Client
        """
        for result in rpc.dummy(__stream__=True):
            print result

HTTP Stream

We can declare a service that will listen to an HTTP SSE event :
the method decorated takes one argument data (dict).
In the configuration file, we just need to setup the endpoint to listen.
import farine.stream

class Client(object):

    @farine.stream.http()
    def listen_event(self, data):
        """
        :param data: The event sent.
        :type data: dict
        """
        return True

Execute method

If all we need is to execute a method, we can use farine.execute.method.
It will restart it, if it ends (customizable).
import farine.execute

class Client(object):

    @farine.execute.method()
    def my_own_stuff(self):
    """
    Your own code.
    """

Database

baguette-messaging is using peewee to manage databases connections (only postgresql is supported for the moment)
In order for it to detect that you are using a database, you need to create a models.py module.
Then, each time we enter into a method (amqp,rpc,stream) a database connection will be created and closed.
You can use the database connection using self.db, to manage transactions for example.

Example

models.py:

from farine.connectors.sql import *

class User(Model):
    name = CharField()

service.py:

import farine.amqp
from models import User

class Client(object):

    @farine.amqp.consume(exchange='exchange', routing_key='routing_key')
    def select(self, body, message):
        return User.select().where(User.id==1)

Overview

You can mix in a service, everything:
it can be a consumer of an HTTP stream, and send back the result in RPC, etc.

Example

import farine.rpc
import farine.stream

class Client(object):

    @farine.stream.http()
    def get(self, data):
        return self.send(data)

    @farine.rpc.client('myotherservice')
    def send(self, rpc, data):
        return rpc.process(data)

Configuration

By default the configuration file is located in /etc/farine.ini. You can override this path using the environment variable FARINE_INI.

It must contains one section by service (using the lowercase class name).
a DEFAULT section can also be present.

Example

[DEFAULT]
amqp_uri = amqp://baguette:baguette@127.0.0.1:5672/baguette

[consume]
enabled = true

Database

If you use a database connection, you have to add in the [DEFAULT] section the db parameters:

[DEFAULT]
db_connector = postgres (required)
db_name = name (required)
db_user = user (required)
db_host = host (required)
db_password = password (required)
db_port = port (optional)
db_max_conn = max_connections (optional)
db_stale_timeout = stale timeout (optional)
db_timeout = timeout (optional)

Launch

To launch a service, just run:

farine --start=mymodule

It will try to import mymodule.service and launch it.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

baguette-messaging-0.18.tar.gz (18.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page