Skip to main content

AWS SNS/SQS backend for natricine

Project description

natricine-aws

AWS SQS and SNS pub/sub implementation.

Installation

pip install natricine-aws

SQS (Simple Queue)

Direct queue messaging with competing consumers.

import asyncio
import aioboto3
from natricine.pubsub import Message
from natricine_aws import SQSPublisher, SQSSubscriber, SQSConfig

async def main():
    session = aioboto3.Session()

    async with SQSPublisher(session) as publisher:
        async with SQSSubscriber(session, SQSConfig()) as subscriber:
            # Publish
            await publisher.publish("my-queue", Message(payload=b'{"order": 1}'))

            # Subscribe
            async for msg in subscriber.subscribe("my-queue"):
                print(f"Received: {msg.payload}")
                await msg.ack()  # Deletes from queue
                break

asyncio.run(main())

SQS IAM Permissions

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:CreateQueue"
      ],
      "Resource": "arn:aws:sqs:*:*:my-queue"
    }
  ]
}

Remove sqs:CreateQueue if create_queue_if_missing=False.

SNS+SQS (Fan-out)

Pub/sub with fan-out to multiple consumer groups.

from natricine_aws import SNSPublisher, SNSSubscriber, SNSConfig

async def main():
    session = aioboto3.Session()

    async with SNSPublisher(session) as publisher:
        # Two consumer groups - each gets all messages
        config_a = SNSConfig(consumer_group="email-service")
        config_b = SNSConfig(consumer_group="analytics")

        async with SNSSubscriber(session, config_a) as email_sub:
            async with SNSSubscriber(session, config_b) as analytics_sub:
                # Both subscribers receive this message
                await publisher.publish("user-events", Message(payload=b"new signup"))

SNS+SQS IAM Permissions

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sns:Publish",
        "sns:CreateTopic",
        "sns:Subscribe",
        "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:*:*:user-events"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:CreateQueue",
        "sqs:SetQueueAttributes"
      ],
      "Resource": "arn:aws:sqs:*:*:user-events-*"
    }
  ]
}

Remove sns:CreateTopic, sns:Subscribe, sqs:CreateQueue, sqs:SetQueueAttributes if create_resources=False.

Configuration

SQSConfig(
    wait_time_s=20,            # Long polling (max 20)
    visibility_timeout_s=30,   # Redelivery timeout
    max_messages=10,           # Messages per poll (max 10)
    create_queue_if_missing=True,
)

SNSConfig(
    consumer_group="my-service",  # Queue name: {topic}-{consumer_group}
    create_resources=True,        # Auto-create topic, queue, subscription
    sqs_config=SQSConfig(),       # SQS settings for the subscriber
)

Localstack Testing

async with SQSPublisher(
    session,
    endpoint_url="http://localhost:4566",
) as publisher:
    ...

With CQRS

from natricine.cqrs import CommandBus, PydanticMarshaler

command_bus = CommandBus(
    publisher=SQSPublisher(session),
    subscriber=SQSSubscriber(session, SQSConfig()),
    marshaler=PydanticMarshaler(),
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

natricine_aws-0.1.0.tar.gz (6.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

natricine_aws-0.1.0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file natricine_aws-0.1.0.tar.gz.

File metadata

  • Download URL: natricine_aws-0.1.0.tar.gz
  • Upload date:
  • Size: 6.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for natricine_aws-0.1.0.tar.gz
Algorithm Hash digest
SHA256 67f4443bd4e6ccc31c25d4a0c51d6bd6be5bf7496a92b5f73033e3d4a2032bc8
MD5 fb6e1b5f1e23fde2cf74ea18f84156fe
BLAKE2b-256 8e36dbcf9abd4008a8234d3f9b553700d1677ab80e392e14ff5fb71240f8e882

See more details on using hashes here.

Provenance

The following attestation bundles were made for natricine_aws-0.1.0.tar.gz:

Publisher: publish.yml on nm523/natricine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file natricine_aws-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: natricine_aws-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for natricine_aws-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 77c0098826826c7b41f2e7363e05e6f038173c3514e86eba1601a54a70de99f4
MD5 4b2c9584d550a6f612a9d003947b7eb4
BLAKE2b-256 53971a3ac6876e2656ba0e9e70e7db83cc063c4416ecf208d41e73cf8d79ab2f

See more details on using hashes here.

Provenance

The following attestation bundles were made for natricine_aws-0.1.0-py3-none-any.whl:

Publisher: publish.yml on nm523/natricine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page