Library allows you to build and connect external processors & connectors to Seventech Integra product
Project description
Integra bridge
pip install integra-bridge
Allows you to move the business logic of complex services outside of Integra. Under the hood, interaction with Integra is carried out using the HTTP protocol based on REST FastAPI.
Bridge
- базовый класс отвечающий за сборку сервисов.
Params:
- title
string - input title of service;
- address
string | None - address of service location;
- description
string | None - address of service location;
- manual_path
string | Pathlike - Path to manual file;
Minimal example of data handler implementation:
from integra_bridge import Bridge
from integra_bridge.adapters import ProcessorAdapter
from integra_bridge.dto import SkeletonProcessor, ProcessorView
class SimpleProcessor(ProcessorAdapter):
async def execute(self, body: dict, params: dict) -> dict:
body['hello'] = 'world'
return body
async def get_view(self) -> ProcessorView:
skeleton = SkeletonProcessor()
return ProcessorView(
title="Simple processor",
description="",
skeleton=skeleton
)
bridge = Bridge(
title='My service',
address='Moscow',
description='For demo',
manual_path=Path(__file__).parent / 'manual.pdf'
)
bridge.register_handlers([SimpleProcessor])
application = bridge.build()
if __name__ == '__main__':
uvicorn.run(app="__main__:application", host='localhost', port=8000)
You can construct forms for your handler inside Integra using parameter classes and skeletons:
server_name = Parameter(
name='server_name',
label='Server name',
description='',
default_value='google.com',
is_required=True,
is_add_value=True,
bottom_text='bottom_text'
)
skeleton = SkeletonProcessor(parameters=[server_name,])
Validation of form fields at Integra level:
class SimpleProcessor(ProcessorAdapter):
async def execute(self, body: dict, params: dict) -> dict:
body['hello'] = 'world'
return body
async def validate(self, processor: Processor) -> ValidationResponse:
result = True
params_description = {}
if len(processor.params.get('field_to_check')) > 10:
result = False
params_description['field_to_check'] = ['Превышена максимальная длинна в 10 символов']
return ValidationResponse(
result=result,
params_description=params_description,
)
Пример реализации входящего коннектора с использованием брокера RabbitMQ:
# Class for handling input broker message
class ConsumerManager:
def __init__(self):
self.consumers = {}
async def create_consumer(self, queue_name: str, connection_params: dict):
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
exchange = await channel.declare_exchange(name='integra', type="fanout")
queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
await queue.bind(exchange=exchange, routing_key=queue_name)
callback = partial(self.on_message, queue_name=queue_name, connection_params=connection_params)
consumer_tag = await queue.consume(callback, no_ack=False)
self.consumers[queue_name] = {"connection": connection, "channel": channel, "consumer_tag": consumer_tag}
return consumer_tag
async def delete_consumer(self, queue_name: str, connection_params: dict):
if queue_name in self.consumers:
consumer = self.consumers[queue_name]
await consumer["channel"].close()
await consumer["connection"].close()
del self.consumers[queue_name]
async def on_message(self, message: AbstractIncomingMessage, queue_name, connection_params: dict) -> None:
data = json.loads(message.body.decode('utf-8'))
send_status = await ConnectorAdapter.push_to_integra(
input_body=data,
connect_to_block_id=queue_name
)
if send_status in (200, 201):
await message.ack()
#Connector form parametrs
server_name = Parameter(
name='server_name',
label='Server name',
is_required=True,
is_add_value=True,
)
url = Parameter(
name='url',
label='url',
is_required=True,
is_add_value=True,
)
class SimpleInputConnector(ConnectorAdapter):
async def get_view(self) -> ConnectorView:
skeleton_connector = SkeletonConnector(type_connect='input', parameters=[server_name, url, ])
return ConnectorView(
title="Simple input connector",
input_description="description",
skeleton_input_connect=skeleton_connector
)
async def on_after_deploy(self, connection_id: str, connector_params: dict):
await consumer_manager.create_consumer(connection_id, connector_params)
async def on_after_destroy(self, connection_id: str, connector_params: dict):
await consumer_manager.delete_consumer(connection_id, connector_params)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file integra_bridge-0.0.8.tar.gz
.
File metadata
- Download URL: integra_bridge-0.0.8.tar.gz
- Upload date:
- Size: 38.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.12.4 Windows/11
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b8934c58fc76b215a60b1fb300f1d2cb8d2b305d522801e4c4c1efbceee2b6f2 |
|
MD5 | 723b19a15b57bd8808ee8ed546bd8221 |
|
BLAKE2b-256 | cb77bc7c5d6f06b63c43142ef76456d407dae7625f291b9e34486416a6a4c2d3 |
File details
Details for the file integra_bridge-0.0.8-py3-none-any.whl
.
File metadata
- Download URL: integra_bridge-0.0.8-py3-none-any.whl
- Upload date:
- Size: 24.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.12.4 Windows/11
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e57c347e644a98a3d4c63cc2d6d75df2f8f10cb0d374d9b6ed2b72cc6bd6139b |
|
MD5 | f5bb2d86f969ac9feba2be0bce949c47 |
|
BLAKE2b-256 | e7b3150a9c6331cfe59640dfb971017b8ce0ef7fd3bcb41ed7de1d06d91a3ecd |