Skip to main content

AMQP Fabric is an AMQP based microservice orchestration and communication framework

Project description

PyPI-Server Coveralls Wheel PyVersions

amqp-fabric

AMQP Fabric is an AMQP based "one-stop-shop" microservice orchestration and communication framework.

Description

AMQP Fabric is a very simple microservice communication and orchestration mechanism based on AMQ protocol. Instead of relying on multiple technologies and orchestration frameworks - it's a "one-stop shop" library for implementing a light-weight microservices topology.

Each service in the ecosystem can publish its own API and use API of another service. A service can send an asynchronous stream of data that other services can subscribe to. Services can optionally send periodic "keep-alives" to allow tracking its uptime.

Features

  • Microservice communication via synchronous API (RPC)
  • Asynchronous data transmission
  • Decentralized registry
  • Remote logging based on standard Python logging mechanism
  • High-availability
  • Secure and firewall friendly access from remote locations

Installation

pip install amqp-fabric

Getting Started

Each service participating in the ecosystem is assigned with:

  • "Domain" - (i.e. project1) any string identifying a group services communicating with each other. Different domains can co-exist under the same AMQP broker.
  • "Service Type" - (i.e. media_encoder) - services holding the same service type, should have the same API.
  • "Service Id" - (i.e. encoder1) Multiple services of the same type can be distinguished by a different Id.
  • "Service Version" - evolution of the services and their API should be tracked by a version

High Availability

Multiple services with the same Domain, Type and Id - will create a high-availability "clique" - API calls will be redirected to the next available service.

Server Side Example

import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector
from amqp_fabric.abstract_service_api import AbstractServiceApi

# API Definition
class MyServiceApi(AbstractServiceApi):

    def multiply(self, x, y):
        return x * y


class MyService:

    amq = None

    async def init(self):

        self.amq = AmqBrokerConnector(
            amqp_uri="amqp://guest:guest@127.0.0.1/",
            service_domain="my_project",
            service_id="my_app",
            service_type="server_app",
            keep_alive_seconds=5)
        await self.amq.open(timeout=10)

        api = MyServiceApi()
        await self.amq.rpc_register(api)

    async def close(self):
        await self.amq.close()


def run_event_loop():
    agent = MyService()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(agent.init())

    try:
        loop.run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        loop.run_until_complete(agent.close())
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

if __name__ == "__main__":

    run_event_loop()

Client Side Example

import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector


async def exec_multiply():

    amq = AmqBrokerConnector(
        amqp_uri="amqp://guest:guest@127.0.0.1/",
        service_domain="my_project",
        service_id="my_client",
        service_type="client_app",
        keep_alive_seconds=5)
    await amq.open(timeout=10)


    srv_proxy = await amq.rpc_proxy("my_project", "my_app", "server_app")

    result = await srv_proxy.multiply(x=5, y=7)
    print(f'result = {result}')

    await amq.close()


if __name__ == '__main__':

    task = exec_multiply()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)

Uptime Tracking

amq.list_services will return a list of currently available services. The list can optionally be filtered, by service domain and/or service type.

Publishing and Subscribing to asynchronous data streams

Data can be published along with one or more headers which can later be used to filter the received messages:

items={'msg': 'Checkout committed', 'order_total': 55, 'products':['book']}

encoded_msg = json.dumps(items, sort_keys=True, default=str).encode()

amq.publish_data(encoded_msg,
                 headers={'msg_type': 'CUSTOMER_TRANSACTIONS', 'currency': 'USD'})
amq.subscribe_data(subscriber_name='accountant',
                   headers={'msg_type': 'CUSTOMER_TRANSACTIONS'},
                   callback=process_data)

Using logs

TBD

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amqp_fabric-0.2.3.tar.gz (32.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amqp_fabric-0.2.3-py2.py3-none-any.whl (11.3 kB view details)

Uploaded Python 2Python 3

File details

Details for the file amqp_fabric-0.2.3.tar.gz.

File metadata

  • Download URL: amqp_fabric-0.2.3.tar.gz
  • Upload date:
  • Size: 32.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for amqp_fabric-0.2.3.tar.gz
Algorithm Hash digest
SHA256 b2ba2d1190ff34b6b557a6aa9098802a090aa5880afbcc9b05d5fb473eed196b
MD5 222ac950623a7bedcf13c4d7e04dcd69
BLAKE2b-256 7efc4960d7b0caa414f786c3b376ae18e6b8388e7328c9cb28b32061a58a2e2b

See more details on using hashes here.

File details

Details for the file amqp_fabric-0.2.3-py2.py3-none-any.whl.

File metadata

  • Download URL: amqp_fabric-0.2.3-py2.py3-none-any.whl
  • Upload date:
  • Size: 11.3 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for amqp_fabric-0.2.3-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 7f556067c45bc58d7ec7b5f4acacbf881031ac47d03595b2e3401ede830b89a4
MD5 e62d09c11b7e189659f620731aee5fa4
BLAKE2b-256 fbd179ae857ed9ac095aeb70bc3977676fdcadac2b9ebdb6f79b259b6fb68d00

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page