Skip to main content

AMQP Fabric is an AMQP based microservice orchestration and communication framework

Project description

PyPI-Server

amqp-fabric

AMQP Fabric is an AMQP based microservice orchestration and communication framework.

Description

AMQP Fabric is a very simple microservice communication and orchestration mechanism based on AMQ protocol. Instead of relying on multiple technologies and orchestration frameworks - it's a "one-stop shop" library for implementing a light-weight microservices topology.

Each service in the ecosystem can publish it's own API and use API of another service. A service can send an asynchronous stream of data that other services can subscribe to. Services can optionally send periodic "keep-alives" to allow tracking its uptime.

Features

  • Microservice communication via synchronous API (RPC)
  • Asynchronous data transmission
  • Decentralized registry
  • Remote logging based on standard Python logging mechanism
  • High-availability
  • Secure and firewall friendly access from remote locations

Installation

pip install amqp-fabric

Getting Started

Each service participating in the ecosystem is assigned with:

  • "Domain" - (i.e. project1) any string identifying a group services communicating with each other. Different domains can co-exist under the same AMQP broker.
  • "Service Type" - (i.e. media_encoder) - services holding the same service type, should have the same API.
  • "Service Id" - (i.e. encoder1) Multiple services of the same type can be distinguished by a different Id.
  • "Service Version" - evolution of the services and their API should be tracked by a version

High Availability

Multiple services with the same Domain, Type and Id - will create a high-availability "clique" - API calls will be redirected to the next available service.

Server Side Example

import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector
from amqp_fabric.abstract_service_api import AbstractServiceApi

# API Definition
class MyServiceApi(AbstractServiceApi):

    def multiply(self, x, y):
        return x * y


class MyService:

    amq = None

    async def init(self):

        self.amq = AmqBrokerConnector(
            amqp_uri="amqp://guest:guest@127.0.0.1/",
            service_domain="my_project",
            service_id="my_app",
            service_type="server_app",
            keep_alive_seconds=5)
        await self.amq.open(timeout=10)

        api = MyServiceApi()
        await self.amq.rpc_register(api)

    async def close(self):
        await self.amq.close()


def run_event_loop():
    agent = MyService()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(agent.init())

    try:
        loop.run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        loop.run_until_complete(agent.close())
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

if __name__ == "__main__":

    run_event_loop()

Client Side Example

import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector


async def exec_multiply():

    amq = AmqBrokerConnector(
        amqp_uri="amqp://guest:guest@127.0.0.1/",
        service_domain="my_project",
        service_id="my_client",
        service_type="client_app",
        keep_alive_seconds=5)
    await amq.open(timeout=10)


    srv_proxy = await amq.rpc_proxy("my_project", "my_app", "server_app")

    result = await srv_proxy.multiply(x=5, y=7)
    print(f'result = {result}')

    await amq.close()


if __name__ == '__main__':

    task = exec_multiply()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)

Uptime Tracking

amq.list_services will return a list of currently available services. The list can optionally be filtered, by service domain and/or service type.

Publishing and Subscribing to asynchronous data streams

Data can be published along with one or more headers which can later be used to filter the received messages:

amq.publish_data(items={'msg': 'Checkout committed', 'order_total': 55, 'products':['book']},
                 headers={'msg_type': 'CUSTOMER_TRANSACTIONS', 'currency': 'USD'})
amq.subscribe_data(subscriber_name='accountant',
                   headers={'msg_type': 'CUSTOMER_TRANSACTIONS'},
                   callback=process_data)

Using logs

TBD

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amqp-fabric-0.1.2.tar.gz (32.0 kB view details)

Uploaded Source

Built Distribution

amqp_fabric-0.1.2-py2.py3-none-any.whl (11.1 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file amqp-fabric-0.1.2.tar.gz.

File metadata

  • Download URL: amqp-fabric-0.1.2.tar.gz
  • Upload date:
  • Size: 32.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for amqp-fabric-0.1.2.tar.gz
Algorithm Hash digest
SHA256 b57e595e27a06cdcf680f7302052fec7893b4aa4199319d037c0f9c5bd9a4e4d
MD5 7650d40cb141b197cf11ab66e6644cb7
BLAKE2b-256 f7b356feeef68f222284ff53171abc43db9a483ef14c10e730ac4161a7cff949

See more details on using hashes here.

File details

Details for the file amqp_fabric-0.1.2-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for amqp_fabric-0.1.2-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 6f9339e029776df3caa3184dc8ecafec31b18030e46e1fe7614793311c8a2e2d
MD5 844731193aa2d55d2a4a4ed9cfccbc71
BLAKE2b-256 15d895e25feec096222cd4b21b813df06ddc077a42460ab4a038298a86123107

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page