Skip to main content

AMQP Fabric is an AMQP based microservice orchestration and communication framework

Project description

PyPI-Server

amqp-fabric

AMQP Fabric is an AMQP based microservice orchestration and communication framework.

Description

AMQP Fabric is a very simple microservice communication and orchestration mechanism based on AMQ protocol. Instead of relying on multiple technologies and orchestration frameworks - it's a "one-stop shop" library for implementing a light-weight microservices topology.

Each service in the ecosystem can publish it's own API and use API of another service. A service can send an asynchronous stream of data that other services can subscribe to. Services can optionally send periodic "keep-alives" to allow tracking its uptime.

Features

  • Microservice communication via synchronous API (RPC)
  • Asynchronous data transmission
  • Decentralized registry
  • Remote logging based on standard Python logging mechanism
  • High-availability
  • Secure and firewall friendly access from remote locations

Installation

pip install amqp-fabric

Getting Started

Each service participating in the ecosystem is assigned with:

  • "Domain" - (i.e. project1) any string identifying a group services communicating with each other. Different domains can co-exist under the same AMQP broker.
  • "Service Type" - (i.e. media_encoder) - services holding the same service type, should have the same API.
  • "Service Id" - (i.e. encoder1) Multiple services of the same type can be distinguished by a different Id.
  • "Service Version" - evolution of the services and their API should be tracked by a version

High Availability

Multiple services with the same Domain, Type and Id - will create a high-availability "clique" - API calls will be redirected to the next available service.

Server Side Example

import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector
from amqp_fabric.abstract_service_api import AbstractServiceApi

# API Definition
class MyServiceApi(AbstractServiceApi):

    def multiply(self, x, y):
        return x * y


class MyService:

    amq = None

    async def init(self):

        self.amq = AmqBrokerConnector(
            amqp_uri="amqp://guest:guest@127.0.0.1/",
            service_domain="my_project",
            service_id="my_app",
            service_type="server_app",
            keep_alive_seconds=5)
        await self.amq.open(timeout=10)

        api = MyServiceApi()
        await self.amq.rpc_register(api)

    async def close(self):
        await self.amq.close()


def run_event_loop():
    agent = MyService()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(agent.init())

    try:
        loop.run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        loop.run_until_complete(agent.close())
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

if __name__ == "__main__":

    run_event_loop()

Client Side Example

import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector


async def exec_multiply():

    amq = AmqBrokerConnector(
        amqp_uri="amqp://guest:guest@127.0.0.1/",
        service_domain="my_project",
        service_id="my_client",
        service_type="client_app",
        keep_alive_seconds=5)
    await amq.open(timeout=10)


    srv_proxy = await amq.rpc_proxy("my_project", "my_app", "server_app")

    result = await srv_proxy.multiply(x=5, y=7)
    print(f'result = {result}')

    await amq.close()


if __name__ == '__main__':

    task = exec_multiply()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)

Uptime Tracking

amq.list_services will return a list of currently available services. The list can optionally be filtered, by service domain and/or service type.

Publishing and Subscribing to asynchronous data streams

Data can be published along with one or more headers which can later be used to filter the received messages:

amq.publish_data(items={'msg': 'Checkout committed', 'order_total': 55, 'products':['book']},
                 headers={'msg_type': 'CUSTOMER_TRANSACTIONS', 'currency': 'USD'})
amq.subscribe_data(subscriber_name='accountant',
                   headers={'msg_type': 'CUSTOMER_TRANSACTIONS'},
                   callback=process_data)

Using logs

TBD

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amqp-fabric-0.1.1.tar.gz (31.9 kB view details)

Uploaded Source

Built Distribution

amqp_fabric-0.1.1-py2.py3-none-any.whl (11.1 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file amqp-fabric-0.1.1.tar.gz.

File metadata

  • Download URL: amqp-fabric-0.1.1.tar.gz
  • Upload date:
  • Size: 31.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for amqp-fabric-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e1d0f594e561918a791b4bff433cb6c78831d93ed30236fdb1f72bfadd26246e
MD5 5b96ae8f483ee0ce480e8e5a4758f182
BLAKE2b-256 ea057ea7e418cd2b78127b14c8fb47fe26f969923a2e49545b707dc524f6897f

See more details on using hashes here.

Provenance

File details

Details for the file amqp_fabric-0.1.1-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for amqp_fabric-0.1.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 d8fc5308f29f97b2600d3d91743ac782f5ae1e18032f2c923cf1baded853bc79
MD5 eaaa4e6aa7c1a3321b480fd71eebee44
BLAKE2b-256 2237ccc1e497816bd6c60854e23f96317c4d503f21f184d439230b190f9a3cc3

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page