Skip to main content

EchoStream library for implementing remote nodes

Project description

echostream-node

EchoStream library for implementing remote nodes that can be used in the echostream system.

This package supports creating External Nodes and Managed Node Types, and supports the following EchoStream use cases:

  • An External Node in an External App or Cross Account App that is a stand-alone application or part of another application, using either threading or asyncio.
  • An External Node in a Cross Account App that is an AWS Lambda function. This use case only supports threading.
  • A Managed Node Type, using either threading or asyncio

Installation

pip install echostream-node

Usage

Threading Application Node

from signal import SIGHUP, SIGINT, SIGTERM, signal, strsignal

from echostream_node import Message
from echostream_node.threading import AppNode


class MyExternalNode(AppNode):

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
    def signal_handler(self, signum: int, _: object) -> None:
        print(f"{strsignal(signum)} received, shutting down")
        self.stop()

    def start(self) -> None:
        super().start()
        signal(SIGHUP, self.signal_handler)
        signal(SIGINT, self.signal_handler)
        signal(SIGTERM, self.signal_handler)

try:
    my_external_node = MyExternalNode()
    my_external_node.start()
    for i in range(100):
        message = my_external_node.create_message(str(i))
        my_external_node.send_message(message)
        my_external_node.audit_message(message)
    my_external_node.join()
except Exception:
    print("Error running node")

Asyncio Application Node

import asyncio

import aiorun
from echostream_node import Message
from echostream_node.asyncio import Node

class MyExternalNode(Node):

    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)


async def main(node: Node) -> None:
    try:
        await node.start()
        for i in range(100):
            message = my_external_node.create_message(str(i))
            my_external_node.send_message(message)
            my_external_node.audit_message(message)
        await node.join()
    except asyncio.CancelledError:
        pass
    except Exception:
        print("Error running node")


if __name__ == "__main__":
    aiorun.run(main(MyExternalNode()), stop_on_unhandled_errors=True, use_uvloop=True)

Cross Account Lambda Node

from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):
    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
MY_EXTERNAL_NODE = MyExternalNode()

def lambda_handler(event, context):
    MY_EXTERNAL_NODE.handle_event(event)

Concurrent vs Sequential Message Processing

By default, all Nodes created using the package will process messages sequentially. This is normally the behavior that you want, as many messaging protocols require guaranteed ordering and therefore sequential processing within your Nodes. If this is the behavior that you require, nothign special is needed to gain it from echostream-node.

However, there are use cases where message ordering is not important but processing speed is. In these cases, you may configure your Node upon creation to concurrently process the messages that it receives.

Making a Threading Application Node Concurrent

If your Node inherits from the echostream_node.threading.AppNode class you can achieve concurrency using either threading or multi-processing. The former is appropriate if your processing is IO bound or your execution platform does not support shared memory (required for multi-processing). The latter is appropriate if your platform supports shared memory and your processing is CPU bound.

Creating A Concurrent Application Node Using Threading

This will create an AppNode that uses the provided ThreadPoolExecutor to concurrently process received Messages. Note that while you can set the maximum number of workers to less than 10, there is no gain to setting it to more than 10 since Nodes will only process up to 10 messages at a time.

from concurrent.futures import ThreadPoolExecutor

from echostream_node import Message
from echostream_node.threading import AppNode

class MyExternalNode(AppNode):

    def __init__(self) -> None:
        super().__init__(executor=ThreadPoolExecutor(max_workers=10))

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)

Creating A Concurrent Application Node Using Multi-Processing

This will create an AppNode that uses the provided ProcessPoolExecutor to concurrently process received Messages. Note that while you can set the maximum number of workers to less than 10, there is no gain to setting it to more than 10 since Nodes will only process up to 10 messages at a time.

from concurrent.futures import ProcessPoolExecutor

from echostream_node import Message
from echostream_node.threading import AppNode

class MyExternalNode(AppNode):

    def __init__(self) -> None:
        super().__init__(executor=ProcessPoolExecutor(max_workers=10))

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)

Making a Asyncio Application Node Concurrent

If your Node inherits from the echostream_node.asyncio.Node you can set the Node to process incoming Messages concurrently. There is no setting for the maximum number of tasks; a task is created per received Message.

import asyncio

from echostream_node import Message
from echostream_node.asyncio import Node

class MyExternalNode(Node):

    def __init__(self) -> None:
        super().__init__(concurrent_processing=True)

    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)

Making a Lambda Node Concurrent

The AWS Lambda platform does not support shared memory, and therefore will only support concurrency via threading. This will create a LambdaNode that uses an optimized (to your Lambda function's resources) ThreadPoolExecutor to concurrently process received Messages.

from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):

    def __init__(self) -> None:
        super().__init__(concurrent_processing=True)

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)

Lambda Nodes and Partial Success Reporting

When you connect an Edge's SQS Queue to the AWS Lambda function implementing your Lambda Node, you can choose to Report Batch Item Failures. This allows your Lambda Node to report partial success back to the SQS Queue, but it does require that your Lambda Node operate differently.

If you wish to take advantage of this, set report_batch_item_failures when you create your Lambda Node. This can be set even if your Node is not concurrent processing.

from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):

    def __init__(self) -> None:
        super().__init__(report_batch_item_failures=True)

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

echostream-node-0.1.3.tar.gz (15.7 kB view details)

Uploaded Source

Built Distribution

echostream_node-0.1.3-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file echostream-node-0.1.3.tar.gz.

File metadata

  • Download URL: echostream-node-0.1.3.tar.gz
  • Upload date:
  • Size: 15.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for echostream-node-0.1.3.tar.gz
Algorithm Hash digest
SHA256 067a7a234a8c8ff6a24876c381215c20cef4f900035b6947df8ceee3c855e0f8
MD5 0de9073a0b8ea74a286c2b2a3748e6b6
BLAKE2b-256 c1f04db9e13bc9dcfb259ef05c59fa3fff3725250c49f9bc620a5ac177362bbd

See more details on using hashes here.

File details

Details for the file echostream_node-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: echostream_node-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for echostream_node-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 5f1e4ff0a3bde5a2d29c618ffeb4cbcd8c80b4e60c5b1a0de2c35623af5c60ef
MD5 ef6e2640033167249fb5503693d171a6
BLAKE2b-256 198fd436b95fefda486c527e37b7f3842a03bbf87a9c4285f60032c19cc6496b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page