SNS + SQS Communications made easy
Project description
Aiorazemax
✉️ Async communications using AWS SNS + SQS for Python services ✨
Documentation
In-Memory event manager
Show me the code
from aiorazemax.event_manager import EventManager
class NorthKoreaThreatCreatedEvent:
def __init__(self, id, target):
self.id = id
self.target = target
async def trump_subscriber(event: NorthKoreaThreatCreatedEvent):
print(f"North korea will attack us or {event.target}!")
EventManager.subscribe(trump_subscriber, NorthKoreaThreatCreatedEvent)
await EventManager.trigger(NorthKoreaThreatCreatedEvent(0, "Mexico"))
Result:
North korea will attack us or Mexico!
Trigger subscribers from SQS
Preconditions
SQS queue has to be subscribed to SNS topic before running the consumer
Code
import asyncio
from aiorazemax.consumers import MessageConsumer
from aiorazemax.drivers import SQSDriver
from aiorazemax.event_manager import EventManager
from aiorazemax.publisher import SNSMessagePublisher
aws_settings = {
'region_name': "",
'aws_access_key_id': "",
'aws_secret_access_key': "",
'endpoint_url': ""
}
class NorthKoreaThreatCreatedEvent:
def __init__(self, id, target):
self.id = id
self.target = target
def kp_message_to_event(event_message):
message = event_message.body
# Highly recommended to use Marshmallow to validate
return NorthKoreaThreatCreatedEvent(message['body']['id'], message['body']['target_name'])
mapper = {
'KPThreatCreated': kp_message_to_event
}
async def trump_subscriber(event: NorthKoreaThreatCreatedEvent):
print(f"North korea will attack us or {event.target}!")
async def main():
EventManager.subscribe(trump_subscriber, NorthKoreaThreatCreatedEvent)
queue_driver = await SQSDriver.build('korea-threats-queue', aws_settings)
consumer = MessageConsumer(mapper, EventManager, queue_driver)
publisher = await SNSMessagePublisher.build('korea-topic', aws_settings)
await publisher.publish('KPThreatCreated', {'id': 21, 'target_name': 'Portugal'})
await consumer.process_message()
await queue_driver.close()
await publisher.close()
if __name__ == '__main__':
asyncio.run(main())
Result:
North korea will attack us or Portugal!
Installing
pip install aiorazemax
Running the tests
To run end to end tests do:
make unit-tests
make integration-tests
Authors
- Jairo Vadillo (@jairovadillo)
License
This project is licensed under the MIT License - see the LICENSE.md file for details
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiorazemax-0.0.1.tar.gz
(4.8 kB
view details)
File details
Details for the file aiorazemax-0.0.1.tar.gz
.
File metadata
- Download URL: aiorazemax-0.0.1.tar.gz
- Upload date:
- Size: 4.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.33.0 CPython/3.7.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 51ea96fb4fcfaa1d174f617184480e173ddbe3ae67a08aa38a0126b88e4a8880 |
|
MD5 | c6cf8c40dcdf58f932d81c2004630070 |
|
BLAKE2b-256 | 977392dc8726cf2ef95307a1dff73647fd0cb7b2f387aace19ae5b43650a102c |