Skip to main content

AWS SQS bindings for Azure Functions

Project description

Azure Functions SQS Extension for Python

A Python package that enables Azure Functions to integrate with Amazon SQS (Simple Queue Service) for both trigger-based and output binding scenarios.

📁 Repository Structure

python/
├── azure_functions_sqs/   # Source code for the package
├── tests/                 # Unit tests
├── samples/               # Sample function app
├── pyproject.toml         # Package configuration
└── README.md              # This documentation

Installation

pip install azure-functions-sqs

Features

  • SQS Trigger: Process messages from an SQS queue with automatic deletion on success
  • SQS Output Binding: Send messages to SQS as a function return value
  • SQS Collector: Batch send multiple messages efficiently (like IAsyncCollector in .NET)
  • FIFO Queue Support: Full support for FIFO queues with message groups and deduplication
  • Environment Variable Resolution: Use %VAR_NAME% or ${VAR_NAME} syntax for credentials

Quick Start

SQS Trigger

Process messages from an SQS queue:

from azure_functions_sqs import SqsTrigger, SqsMessage, SqsTriggerOptions
from datetime import timedelta

trigger = SqsTrigger(
    queue_url="%SQS_QUEUE_URL%",  # From environment variable
    aws_key_id="%AWS_ACCESS_KEY_ID%",
    aws_access_key="%AWS_SECRET_ACCESS_KEY%",
    options=SqsTriggerOptions(
        max_number_of_messages=10,
        visibility_timeout=timedelta(seconds=30),
        polling_interval=timedelta(seconds=5),
    ),
)

@trigger
def process_message(message: SqsMessage) -> None:
    print(f"Received: {message.body}")
    # Message is automatically deleted after successful processing

SQS Output Binding

Send messages to SQS:

from azure_functions_sqs import SqsOutput
import json

output = SqsOutput(
    queue_url="%OUTPUT_QUEUE_URL%",
    aws_key_id="%AWS_ACCESS_KEY_ID%",
    aws_access_key="%AWS_SECRET_ACCESS_KEY%",
)

@output
def send_to_sqs() -> str:
    return json.dumps({"key": "value"})

Batch Sending with SqsCollector

Send multiple messages efficiently:

from azure_functions_sqs import SqsCollector

collector = SqsCollector(
    queue_url="%OUTPUT_QUEUE_URL%",
    aws_key_id="%AWS_ACCESS_KEY_ID%",
    aws_access_key="%AWS_SECRET_ACCESS_KEY%",
)

# Add messages
for i in range(25):
    collector.add({"item": i})

# Flush sends in batches of 10 (SQS limit)
sent_count = collector.flush()
print(f"Sent {sent_count} messages")

Configuration

Trigger Options

Option Default Description
max_number_of_messages 10 Number of messages to receive per poll (1-10)
visibility_timeout 30 seconds How long a message is hidden from other consumers
polling_interval 5 seconds Delay between polls when queue is empty

Credentials

The extension supports multiple credential sources:

  1. Environment variables (recommended):

    • Use %VAR_NAME% or ${VAR_NAME} syntax
    • Values are resolved at runtime
  2. Direct values (for testing only):

    • Pass credentials directly (not recommended for production)
  3. IAM Role/Instance Profile:

    • Leave credentials empty to use AWS default credential chain

Example Environment Variables

# AWS credentials
export AWS_REGION=us-east-1
export AWS_ACCESS_KEY_ID=AKIA...
export AWS_SECRET_ACCESS_KEY=...

# Queue URLs
export SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue

Message Structure

The SqsMessage class provides access to all SQS message properties:

@trigger
def process(message: SqsMessage) -> None:
    # Core properties
    print(message.message_id)
    print(message.body)
    print(message.receipt_handle)
    
    # Timestamps and counts
    print(message.sent_timestamp)
    print(message.approximate_receive_count)
    
    # FIFO queue properties
    print(message.message_group_id)
    print(message.message_deduplication_id)
    print(message.sequence_number)
    
    # Custom attributes
    for name, attr in message.message_attributes.items():
        print(f"{name}: {attr.string_value}")
    
    # Convert to dict (matches .NET JSON contract)
    data = message.to_dict()

FIFO Queue Support

For FIFO queues (queue URL ends with .fifo):

from azure_functions_sqs import SqsOutput, SqsOutputOptions

output = SqsOutput(
    queue_url="%FIFO_QUEUE_URL%",
    aws_key_id="%AWS_ACCESS_KEY_ID%",
    aws_access_key="%AWS_SECRET_ACCESS_KEY%",
    options=SqsOutputOptions(message_group_id="my-group"),  # Required for FIFO
)

LocalStack Testing

For local development with LocalStack:

from azure_functions_sqs import SqsTrigger

trigger = SqsTrigger(
    queue_url="http://localhost:4566/000000000000/test-queue",
    region="us-east-1",  # Must specify for LocalStack
    aws_key_id="test",
    aws_access_key="test",
)

See the LocalStack testing guide for setting up LocalStack with Docker.

Sample Application

A complete sample function app is available in the samples/ directory:

Building from Source

# Clone the repository
git clone https://github.com/laveeshb/azure-functions-sqs-extension.git
cd azure-functions-sqs-extension/python

# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # Linux/macOS
# .venv\Scripts\activate   # Windows

# Install in development mode with test dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/ -v

Comparison with .NET Extension

This Python package is designed to match the .NET extension's contracts:

.NET Python
[SqsQueueTrigger] @SqsTrigger()
[SqsQueueOut] @SqsOutput()
IAsyncCollector<SqsQueueMessage> SqsCollector
Amazon.SQS.Model.Message SqsMessage
SqsQueueOptions SqsTriggerOptions

Requirements

  • Python 3.9+
  • boto3 >= 1.26.0
  • azure-functions >= 1.17.0

Support

For issues, questions, or feature requests, please open an issue.

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azure_functions_sqs-1.0.0.tar.gz (17.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

azure_functions_sqs-1.0.0-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file azure_functions_sqs-1.0.0.tar.gz.

File metadata

  • Download URL: azure_functions_sqs-1.0.0.tar.gz
  • Upload date:
  • Size: 17.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for azure_functions_sqs-1.0.0.tar.gz
Algorithm Hash digest
SHA256 0fd1303ec221b13454e1d4f3cef84c872de27adc61df8ebcb9aec13a4d9968a7
MD5 e61d89b4453c759febfc8b91071d0861
BLAKE2b-256 2a3d9b241a8e541ec575903c5896cad12a4ad286fe4bf94d6b10648b96109238

See more details on using hashes here.

File details

Details for the file azure_functions_sqs-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for azure_functions_sqs-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8b760bfca9210b332630e55ca2b70a54fd20c64cf7e0090b4abb5fc5ffe7c582
MD5 23ac65cadd52711014fa86fa41035bed
BLAKE2b-256 e37ebf3017f8c3d4315d70631ef4e1550f877d31e481e2bcd803c8c0e073206f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page