AMQP Fabric is an AMQP based microservice orchestration and communication framework
Project description
amqp-fabric
AMQP Fabric is an AMQP based microservice orchestration and communication framework.
Description
AMQP Fabric is a very simple microservice communication and orchestration mechanism based on AMQ protocol. Instead of relying on multiple technologies and orchestration frameworks - it's a "one-stop shop" library for implementing a light-weight microservices topology.
Each service in the ecosystem can publish it's own API and use API of another service. A service can send an asynchronous stream of data that other services can subscribe to. Services can optionally send periodic "keep-alives" to allow tracking its uptime.
Features
- Microservice communication via synchronous API (RPC)
- Asynchronous data transmission
- Decentralized registry
- Remote logging based on standard Python logging mechanism
- High-availability
- Secure and firewall friendly access from remote locations
Installation
pip install amqp-fabric
Getting Started
Each service participating in the ecosystem is assigned with:
- "Domain" - (i.e.
project1
) any string identifying a group services communicating with each other. Different domains can co-exist under the same AMQP broker. - "Service Type" - (i.e.
media_encoder
) - services holding the same service type, should have the same API. - "Service Id" - (i.e.
encoder1
) Multiple services of the same type can be distinguished by a different Id. - "Service Version" - evolution of the services and their API should be tracked by a version
High Availability
Multiple services with the same Domain, Type and Id - will create a high-availability "clique" - API calls will be redirected to the next available service.
Server Side Example
import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector
from amqp_fabric.abstract_service_api import AbstractServiceApi
# API Definition
class MyServiceApi(AbstractServiceApi):
def multiply(self, x, y):
return x * y
class MyService:
amq = None
async def init(self):
self.amq = AmqBrokerConnector(
amqp_uri="amqp://guest:guest@127.0.0.1/",
service_domain="my_project",
service_id="my_app",
service_type="server_app",
keep_alive_seconds=5)
await self.amq.open(timeout=10)
api = MyServiceApi()
await self.amq.rpc_register(api)
async def close(self):
await self.amq.close()
def run_event_loop():
agent = MyService()
loop = asyncio.get_event_loop()
loop.run_until_complete(agent.init())
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
pass
finally:
loop.run_until_complete(agent.close())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
if __name__ == "__main__":
run_event_loop()
Client Side Example
import asyncio
from amqp_fabric.amq_broker_connector import AmqBrokerConnector
async def exec_multiply():
amq = AmqBrokerConnector(
amqp_uri="amqp://guest:guest@127.0.0.1/",
service_domain="my_project",
service_id="my_client",
service_type="client_app",
keep_alive_seconds=5)
await amq.open(timeout=10)
srv_proxy = await amq.rpc_proxy("my_project", "my_app", "server_app")
result = await srv_proxy.multiply(x=5, y=7)
print(f'result = {result}')
await amq.close()
if __name__ == '__main__':
task = exec_multiply()
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
Uptime Tracking
amq.list_services
will return a list of currently available services. The list can optionally be filtered, by
service domain and/or service type.
Publishing and Subscribing to asynchronous data streams
Data can be published along with one or more headers which can later be used to filter the received messages:
amq.publish_data(items={'msg': 'Checkout committed', 'order_total': 55, 'products':['book']},
headers={'msg_type': 'CUSTOMER_TRANSACTIONS', 'currency': 'USD'})
amq.subscribe_data(subscriber_name='accountant',
headers={'msg_type': 'CUSTOMER_TRANSACTIONS'},
callback=process_data)
Using logs
TBD
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file amqp-fabric-0.1.1.tar.gz
.
File metadata
- Download URL: amqp-fabric-0.1.1.tar.gz
- Upload date:
- Size: 31.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e1d0f594e561918a791b4bff433cb6c78831d93ed30236fdb1f72bfadd26246e |
|
MD5 | 5b96ae8f483ee0ce480e8e5a4758f182 |
|
BLAKE2b-256 | ea057ea7e418cd2b78127b14c8fb47fe26f969923a2e49545b707dc524f6897f |
Provenance
File details
Details for the file amqp_fabric-0.1.1-py2.py3-none-any.whl
.
File metadata
- Download URL: amqp_fabric-0.1.1-py2.py3-none-any.whl
- Upload date:
- Size: 11.1 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d8fc5308f29f97b2600d3d91743ac782f5ae1e18032f2c923cf1baded853bc79 |
|
MD5 | eaaa4e6aa7c1a3321b480fd71eebee44 |
|
BLAKE2b-256 | 2237ccc1e497816bd6c60854e23f96317c4d503f21f184d439230b190f9a3cc3 |