A lightweight decorator-based router for AWS Lambda SQS consumers
Project description
sqs-router
Decorator-based message routing for Lambda functions that consume SQS.
from sqs_router import SQSRouter
router = SQSRouter(message_type_field="event_type")
@router.on("user_signup")
def handle_signup(message, metadata):
provision_account(message["user_id"])
def handler(event, context):
return router.dispatch(event)
SQS messages are just bytes. There's no built-in concept of message type — SQS doesn't know or care what's in the body. The message_type_field is a field your team puts in the JSON body. Different teams use "type", "event_type", "action", "detail-type" — whatever your producers send, sqs-router routes on it.
The pattern it replaces is this, which every SQS Lambda eventually grows into:
def handler(event, context):
for record in event["Records"]:
body = json.loads(record["body"])
t = body.get("event_type")
if t == "user_signup":
handle_signup(body)
elif t == "user_deleted":
handle_deletion(body)
elif t == "password_reset":
handle_reset(body)
else:
logger.warning("unknown: %s", t)
It works until you have fifteen event types and someone forgets to handle errors properly or implement partial batch failure. sqs-router is the standard pattern for this.
Install
pip install sqs-router
No dependencies. Python 3.9+.
Message format
Your producer decides the format. Pick a field name and stick to it.
A common internal microservice convention:
{
"event_type": "user_signup",
"user_id": "u-8821",
"email": "ali@example.com",
"ts": "2025-03-14T10:00:00Z"
}
An EventBridge event forwarded to SQS:
{
"source": "com.mycompany.auth",
"detail-type": "UserSignedUp",
"detail": {
"user_id": "u-8821",
"email": "ali@example.com"
}
}
A job/task queue convention:
{
"action": "send_welcome_email",
"payload": {
"user_id": "u-8821",
"template": "welcome_v2"
}
}
Configure message_type_field to match whatever your producers send.
Quickstart
from sqs_router import SQSRouter
router = SQSRouter(message_type_field="event_type")
@router.on("user_signup")
def handle_signup(message, metadata):
create_account(message["user_id"])
@router.on("user_deleted")
def handle_deletion(message, metadata):
deactivate_account(message["user_id"])
def handler(event, context):
return router.dispatch(event)
Registering handlers
One type:
@router.on("send_email")
def handle(message, metadata):
send(message["to"], message["template"])
Multiple types on one handler:
@router.on("charge.succeeded", "charge.refunded")
def handle_charge(message, metadata):
# differentiate inside the handler if needed
if metadata.message_type == "charge.refunded":
issue_refund(message)
on_many is an alias for on — use whichever reads better when passing many types.
Catch-all for anything without a registered handler:
@router.default
def fallback(message, metadata):
logger.warning("no handler for %s", metadata.message_type)
Without a default, unhandled message types raise UnknownMessageTypeError. Pass raise_on_unhandled=False to silently skip them instead.
EventBridge → SQS
EventBridge is one of the most common ways to fan out events to SQS. The detail-type field is EventBridge's routing field:
router = SQSRouter(message_type_field="detail-type")
@router.on("UserSignedUp")
def handle(message, metadata):
detail = message.get("detail", {})
create_account(detail["user_id"])
@router.on("PasswordResetRequested")
def handle_reset(message, metadata):
send_reset_email(message["detail"]["email"])
SNS → SQS
When SNS delivers to SQS it wraps the payload in a Notification envelope. The router unwraps it automatically — route on whatever field is in the inner message body.
metadata
Every handler gets a MessageMetadata object as the second argument:
@router.on("send_email")
def handle(message, metadata):
metadata.message_id # SQS message ID
metadata.queue_name # parsed from the event source ARN
metadata.message_type # the routing field value
metadata.receive_count # how many times SQS has delivered this message
metadata.receipt_handle # if you need to manually ack/delete
metadata.attributes # raw SQS system attributes dict
receive_count is how you detect messages that keep failing:
@router.on("send_email")
def handle(message, metadata):
if metadata.receive_count > 3:
# something is persistently wrong — log and let it go to the DLQ
logger.error(
"giving up on message %s after %d attempts",
metadata.message_id,
metadata.receive_count,
)
return
send(message["to"], message["template"])
Partial batch failure
By default, if one message in a batch of ten fails, Lambda retries all ten — including the nine that already succeeded. The correct behaviour is partial batch failure: tell Lambda exactly which messages failed so only those get retried.
router = SQSRouter(message_type_field="event_type", partial_failure=True)
The router catches exceptions per-record, continues processing the rest of the batch, and returns the right response to Lambda:
{"batchItemFailures": [{"itemIdentifier": "failed-message-id"}]}
You also need to enable ReportBatchItemFailures on the Lambda event source mapping. See the AWS docs.
Error hook
@router.on_error
def on_error(exc, message, metadata):
sentry_sdk.capture_exception(exc)
Don't re-raise in the hook — the router handles propagation.
Custom envelope formats
If your messages have a non-standard structure, pass a callable that receives the raw SQS record dict and returns a parsed message dict:
def unwrap(record):
# e.g. your producer base64-encodes the body, or nests it differently
outer = json.loads(record["body"])
return json.loads(outer["payload"])
router = SQSRouter(message_type_field="event_type", message_extractor=unwrap)
Configuration
SQSRouter(
message_type_field="event_type", # required — the field you route on
raise_on_unhandled=True, # raise if no handler matches and no default set
partial_failure=False, # enable partial batch failure response
message_extractor=None, # custom body parser
)
There's no default for message_type_field that makes sense universally — use the field name your producers actually send.
Exceptions
from sqs_router import (
SQSRouterError, # base class
InvalidMessageBodyError, # body isn't valid JSON
MissingTypeFieldError, # routing field not found in message
UnknownMessageTypeError, # no handler for this type
HandlerError, # a handler raised an exception
)
Tests
pip install -e .
pip install pytest
pytest
MIT · Aditya Ganti
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sqs_router-0.1.0.tar.gz.
File metadata
- Download URL: sqs_router-0.1.0.tar.gz
- Upload date:
- Size: 11.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf75556203ed5819d3c1ff4e0cd1cdcf4a64aca00c0322e6b21d119c699542af
|
|
| MD5 |
30b0deefcb7aeea10cc992d28da73d30
|
|
| BLAKE2b-256 |
3ebc8e9e5d7ea81985a9fdc2984e2319e248d4f0cb2bd12c3475b941a5ffc907
|
File details
Details for the file sqs_router-0.1.0-py3-none-any.whl.
File metadata
- Download URL: sqs_router-0.1.0-py3-none-any.whl
- Upload date:
- Size: 9.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c0d56c42e9850a19f71398f701a645a0127f6ec4115e8a0a3b60f67593bdda0d
|
|
| MD5 |
e1a5112f83e6f32031eb1a6624612bfe
|
|
| BLAKE2b-256 |
8fc1db036393ef28b081b90ef7e2abcb9d126663f5ba04af145137f7273a8757
|