Skip to main content

SNS + SQS Communications made easy

Project description

Aiorazemax

Build Status

✉️ Async communications using AWS SNS + SQS for Python services ✨

Documentation

In-Memory event manager

Show me the code

from aiorazemax.event_manager import EventManager


class NorthKoreaThreatCreatedEvent:
    def __init__(self, id, target):
        self.id = id
        self.target = target


async def trump_subscriber(event: NorthKoreaThreatCreatedEvent):
    print(f"North korea will attack us or {event.target}!")


EventManager.subscribe(trump_subscriber, NorthKoreaThreatCreatedEvent)
await EventManager.trigger(NorthKoreaThreatCreatedEvent(0, "Mexico"))

Result:

North korea will attack us or Mexico!

Trigger subscribers from SQS

Preconditions

SQS queue has to be subscribed to SNS topic before running the consumer

Code

import asyncio

from aiorazemax.consumers import MessageConsumer
from aiorazemax.drivers import SQSDriver
from aiorazemax.event_manager import EventManager
from aiorazemax.publisher import SNSMessagePublisher


aws_settings = {
    'region_name': "",
    'aws_access_key_id': "",
    'aws_secret_access_key': "",
    'endpoint_url': ""
}


class NorthKoreaThreatCreatedEvent:
    def __init__(self, id, target):
        self.id = id
        self.target = target


def kp_message_to_event(event_message):
    message = event_message.body
    # Highly recommended to use Marshmallow to validate
    return NorthKoreaThreatCreatedEvent(message['body']['id'], message['body']['target_name'])


mapper = {
    'KPThreatCreated': kp_message_to_event
}


async def trump_subscriber(event: NorthKoreaThreatCreatedEvent):
    print(f"North korea will attack us or {event.target}!")


async def main():
    EventManager.subscribe(trump_subscriber, NorthKoreaThreatCreatedEvent)

    queue_driver = await SQSDriver.build('korea-threats-queue', aws_settings)
    consumer = MessageConsumer(mapper, EventManager, queue_driver)

    publisher = await SNSMessagePublisher.build('korea-topic', aws_settings)
    await publisher.publish('KPThreatCreated', {'id': 21, 'target_name': 'Portugal'})

    await consumer.process_message()

    await queue_driver.close()
    await publisher.close()


if __name__ == '__main__':
    asyncio.run(main())

Result:

North korea will attack us or Portugal!

Installing

pip install aiorazemax

Running the tests

To run end to end tests do:

make unit-tests
make integration-tests

Authors

License

This project is licensed under the MIT License - see the LICENSE.md file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiorazemax-0.0.1.tar.gz (4.8 kB view details)

Uploaded Source

File details

Details for the file aiorazemax-0.0.1.tar.gz.

File metadata

  • Download URL: aiorazemax-0.0.1.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.33.0 CPython/3.7.1

File hashes

Hashes for aiorazemax-0.0.1.tar.gz
Algorithm Hash digest
SHA256 51ea96fb4fcfaa1d174f617184480e173ddbe3ae67a08aa38a0126b88e4a8880
MD5 c6cf8c40dcdf58f932d81c2004630070
BLAKE2b-256 977392dc8726cf2ef95307a1dff73647fd0cb7b2f387aace19ae5b43650a102c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page