EchoStream library for implementing remote nodes
Project description
echostream-node
EchoStream library for implementing remote nodes that can be used in the echostream system.
This package supports creating External Nodes and Managed Node Types, and supports the following EchoStream use cases:
- An External Node in an External App or Cross Account App that is a stand-alone application or part of another application, using either
threading
orasyncio
. - An External Node in a Cross Account App that is an AWS Lambda function. This use case only supports
threading
. - A Managed Node Type, using either
threading
orasyncio
Installation
pip install echostream-node
Usage
Threading Application Node
from signal import SIGHUP, SIGINT, SIGTERM, signal, strsignal
from echostream_node import Message
from echostream_node.threading import AppNode
class MyExternalNode(AppNode):
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
def signal_handler(self, signum: int, _: object) -> None:
print(f"{strsignal(signum)} received, shutting down")
self.stop()
def start(self) -> None:
super().start()
signal(SIGHUP, self.signal_handler)
signal(SIGINT, self.signal_handler)
signal(SIGTERM, self.signal_handler)
try:
my_external_node = MyExternalNode()
my_external_node.start()
for i in range(100):
message = my_external_node.create_message(str(i))
my_external_node.send_message(message)
my_external_node.audit_message(message)
my_external_node.join()
except Exception:
print("Error running node")
Asyncio Application Node
import asyncio
import aiorun
from echostream_node import Message
from echostream_node.asyncio import Node
class MyExternalNode(Node):
async def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
async def main(node: Node) -> None:
try:
await node.start()
for i in range(100):
message = my_external_node.create_message(str(i))
my_external_node.send_message(message)
my_external_node.audit_message(message)
await node.join()
except asyncio.CancelledError:
pass
except Exception:
print("Error running node")
if __name__ == "__main__":
aiorun.run(main(MyExternalNode()), stop_on_unhandled_errors=True, use_uvloop=True)
Cross Account Lambda Node
from echostream_node import Message
from echostream_node.threading import LambdaNode
class MyExternalNode(LambdaNode):
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
MY_EXTERNAL_NODE = MyExternalNode()
def lambda_handler(event, context):
MY_EXTERNAL_NODE.handle_event(event)
Concurrent vs Sequential Message Processing
By default, all Nodes created using the package will process messages sequentially.
This is normally the behavior that you want, as many messaging protocols require
guaranteed ordering and therefore sequential processing within your Nodes. If this is
the behavior that you require, nothign special is needed to gain it from echostream-node
.
However, there are use cases where message ordering is not important but processing speed is. In these cases, you may configure your Node upon creation to concurrently process the messages that it receives.
Making a Threading Application Node Concurrent
If your Node inherits from the echostream_node.threading.AppNode
class you can achieve concurrency
using either threading or multi-processing. The former is appropriate if your processing is IO bound
or your execution platform does not support shared memory (required for multi-processing). The latter
is appropriate if your platform supports shared memory and your processing is CPU bound.
Creating A Concurrent Application Node Using Threading
This will create an AppNode that uses the provided ThreadPoolExecutor
to concurrently
process received Message
s. Note that while you can set the maximum number of workers to
less than 10, there is no gain to setting it to more than 10 since Nodes will only process
up to 10 messages at a time.
from concurrent.futures import ThreadPoolExecutor
from echostream_node import Message
from echostream_node.threading import AppNode
class MyExternalNode(AppNode):
def __init__(self) -> None:
super().__init__(executor=ThreadPoolExecutor(max_workers=10))
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
Creating A Concurrent Application Node Using Multi-Processing
This will create an AppNode that uses the provided ProcessPoolExecutor
to concurrently
process received Message
s. Note that while you can set the maximum number of workers to
less than 10, there is no gain to setting it to more than 10 since Nodes will only process
up to 10 messages at a time.
from concurrent.futures import ProcessPoolExecutor
from echostream_node import Message
from echostream_node.threading import AppNode
class MyExternalNode(AppNode):
def __init__(self) -> None:
super().__init__(executor=ProcessPoolExecutor(max_workers=10))
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
Making a Asyncio Application Node Concurrent
If your Node inherits from the echostream_node.asyncio.Node
you can set the Node to
process incoming Message
s concurrently. There is no setting for the maximum number of tasks;
a task is created per received Message
.
import asyncio
from echostream_node import Message
from echostream_node.asyncio import Node
class MyExternalNode(Node):
def __init__(self) -> None:
super().__init__(concurrent_processing=True)
async def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
Making a Lambda Node Concurrent
The AWS Lambda platform does not support shared memory, and therefore will only support concurrency
via threading. This will create a LambdaNode that uses an optimized (to your Lambda function's resources)
ThreadPoolExecutor
to concurrently process received Message
s.
from echostream_node import Message
from echostream_node.threading import LambdaNode
class MyExternalNode(LambdaNode):
def __init__(self) -> None:
super().__init__(concurrent_processing=True)
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
Lambda Nodes and Partial Success Reporting
When you connect an Edge's SQS Queue to the AWS Lambda function implementing your Lambda Node, you can choose to Report Batch Item Failures. This allows your Lambda Node to report partial success back to the SQS Queue, but it does require that your Lambda Node operate differently.
If you wish to take advantage of this, set report_batch_item_failures
when you create your
Lambda Node. This can be set even if your Node is not concurrent processing.
from echostream_node import Message
from echostream_node.threading import LambdaNode
class MyExternalNode(LambdaNode):
def __init__(self) -> None:
super().__init__(report_batch_item_failures=True)
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file echostream-node-0.1.3.tar.gz
.
File metadata
- Download URL: echostream-node-0.1.3.tar.gz
- Upload date:
- Size: 15.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 067a7a234a8c8ff6a24876c381215c20cef4f900035b6947df8ceee3c855e0f8 |
|
MD5 | 0de9073a0b8ea74a286c2b2a3748e6b6 |
|
BLAKE2b-256 | c1f04db9e13bc9dcfb259ef05c59fa3fff3725250c49f9bc620a5ac177362bbd |
File details
Details for the file echostream_node-0.1.3-py3-none-any.whl
.
File metadata
- Download URL: echostream_node-0.1.3-py3-none-any.whl
- Upload date:
- Size: 17.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5f1e4ff0a3bde5a2d29c618ffeb4cbcd8c80b4e60c5b1a0de2c35623af5c60ef |
|
MD5 | ef6e2640033167249fb5503693d171a6 |
|
BLAKE2b-256 | 198fd436b95fefda486c527e37b7f3842a03bbf87a9c4285f60032c19cc6496b |