A FastAPI-style router for AWS SQS on Lambda: pydantic routing, dependency injection, middleware, and native partial batch failure.
Project description
FastSQS
A FastAPI-style router for AWS SQS on Lambda: pydantic routing, dependency injection, a middleware system, and native partial batch failure.
Documentation · Changelog · Source · Issues
FastSQS turns an SQS-triggered Lambda into a typed, declarative app. You write
handlers for pydantic event models; FastSQS parses each record, routes it,
validates it, runs your middleware, and returns the batchItemFailures SQS
expects, so failed messages are redelivered and dead-lettered by the queue's
own redrive policy, not by bespoke in-app code.
Features
- FastAPI-style routing:
@app.route(OrderCreated)dispatches by a payload discriminator (default key"type"). - Pydantic validation: handlers receive a validated
SQSEventmodel; bad messages become clean batch failures. - Dependency injection: declare
Depends(...)params (powered byfast-depends); no@injectneeded. - Typed
Context:ctx.message_id,ctx.queue_type, … as typed attributes; arbitrary scratch inctx.state. - Middleware:
before/afterhooks with balanced unwind (resources acquired inbeforeare always released). - Partial batch failure: native
ReportBatchItemFailuresfor standard and FIFO queues. - FIFO-aware: queue type is inferred from the event-source ARN; per-group ordering with a configurable failure mode.
- EventBridge Pipes ready:
app.handleraccepts both the Lambda{"Records": [...]}envelope and a bare list of records (the Pipes target shape). - Shape detection:
is_sqs_event(event)lets one Lambda multiplex SQS and non-SQS (e.g. API Gateway) events. - In-process test client: drive your app with synthetic events, no AWS required.
- Typed: ships
py.typed; full editor/mypy support.
Install
pip install fastsqs
Requires Python 3.10+. Depends on pydantic>=2 and fast-depends>=3,<4.
Quick start
from fastsqs import FastSQS, SQSEvent
app = FastSQS() # queue type auto-detected from the event-source ARN
class OrderCreated(SQSEvent):
order_id: str
amount: int
@app.route(OrderCreated)
async def handle_order(msg: OrderCreated):
print("processing", msg.order_id, msg.amount)
# raising marks this record as failed -> SQS redelivers it
# Lambda entry point (set as the function handler):
def handler(event, context):
return app.handler(event, context)
A message is routed by its discriminator value ("type" by default), matched to
the event model's name in snake_case, so {"type": "order_created", "order_id": "...", "amount": 1}
routes to OrderCreated. Field names accept both snake_case and their camelCase
aliases (order_id or orderId) via Pydantic alias generation.
app.handler also accepts a bare list of records (the shape an EventBridge
Pipes SQS-source target delivers), so the same function works behind both an
event source mapping and a Pipe (see below).
EventBridge Pipes & multiplexed handlers
app.handler accepts both Lambda event shapes for an SQS source: the event
source mapping envelope {"Records": [...]} and a bare list of records (the
shape an EventBridge Pipes SQS-source target delivers). The same handler routes
both unchanged.
To run a single Lambda for both SQS and non-SQS (e.g. API Gateway) traffic,
dispatch by shape with is_sqs_event:
from fastsqs import is_sqs_event
def handler(event, context):
if is_sqs_event(event): # a bare list OR {"Records": [...]}
return app.handler(event, context)
return http_handler(event, context) # e.g. an API Gateway proxy event
Routers, key-value routing & default handler
Split routes across modules with SQSRouter, then attach them with
app.include_router(...). A router supports pydantic routing and key-value
routing (@router.route("value")), an optional model= for validation on
key-value routes, and nesting via subrouter(...):
from fastsqs import FastSQS, SQSRouter, SQSEvent
orders = SQSRouter()
@orders.route(OrderCreated) # pydantic routing
async def on_created(msg: OrderCreated):
...
@orders.route("order_cancelled", model=OrderCancelled) # key-value + validation
async def on_cancelled(msg: OrderCancelled):
...
@orders.route("ping") # key-value, no model -> raw SQSEvent
async def on_ping(msg: SQSEvent):
...
app = FastSQS()
app.include_router(orders) # tried after the app's own routes
Nest with orders.subrouter("v2", child_router). Register a catch-all for
unmatched messages with @app.default() (or @router.default()). Without one,
an unmatched message raises RouteNotFoundError and becomes a batch failure:
@app.default()
async def fallback(msg, ctx):
...
flexible_matching=True (on FastSQS or SQSRouter, default False) also
matches the ClassName plus camelCase / kebab-case variants of the discriminator
value. A single discriminator value may use only one routing style. Registering
it as both a pydantic and a key-value route raises ValueError at import.
Typed context
Annotate a handler (or middleware) param ctx: Context for typed access to the
framework-owned fields. Put your own scratch data in ctx.state (a State
namespace):
from fastsqs import FastSQS, SQSEvent, Context
app = FastSQS()
@app.route(OrderCreated)
async def handle(msg: OrderCreated, ctx: Context):
ctx.message_id # str
ctx.queue_type # QueueType enum (.value for the string)
ctx.fifo_info # FifoInfo | None (.message_group_id, ...)
ctx.state.tenant = "acme" # attribute access, never collides with a framework field
ctx.state["tenant"] # item access works too
ctx.state.get("missing") # use .get() for optional reads (bare .missing raises AttributeError)
Dependency injection
Declare Depends(...) params and FastSQS wires them per invocation (no decorator):
from fastsqs import FastSQS, SQSEvent, Depends
def get_db():
return Database(...)
app = FastSQS()
@app.route(OrderCreated)
async def handle(msg: OrderCreated, db=Depends(get_db)):
await db.save(msg.order_id)
Sub-dependencies (a Depends that itself takes Depends) resolve automatically.
Middleware
Subclass Middleware and override before / after. after always runs for
every middleware whose before completed (balanced unwind), and receives the
error (or None):
from fastsqs import FastSQS, Middleware, TimingMiddleware, LoggingMiddleware
class Audit(Middleware):
async def before(self, payload, record, context, ctx):
ctx.state.t0 = ...
async def after(self, payload, record, context, ctx, error):
if error is not None:
... # observe the failure
app = FastSQS()
app.add_middleware(LoggingMiddleware())
app.add_middleware(TimingMiddleware())
app.add_middleware(Audit())
LoggingMiddleware takes a custom logger= plus include_payload /
include_record / include_context / verbose toggles; TimingMiddleware
writes duration_ms into ctx.state (key configurable via store_key_ms).
Observability, idempotency and PII masking are application concerns: compose them as your own middleware.
FIFO & partial batch failure
- Queue type:
QueueType.AUTO(default; infers FIFO from a.fifoevent-source ARN), or forceQueueType.STANDARD/QueueType.FIFO. fifo_failure_mode(FIFO only):"isolate_groups"(default) blocks only the failedMessageGroupId's tail;"halt_batch"halts the whole batch at the first failure.partial_batch_failure(defaultTrue) reports per-record failures. Set itFalseto fail the entire batch (raisingBatchFailedError) so SQS redelivers every message.
FastSQS only reports failures: redelivery and dead-lettering are the queue's
job (visibility timeout + maxReceiveCount + redrive policy). The event source
mapping must enable FunctionResponseTypes: ["ReportBatchItemFailures"], or SQS
ignores the partial response and redelivers the whole batch.
FIFO footgun: SQS exposes system attributes (
MessageGroupId,MessageDeduplicationId) in PascalCase underrecord["attributes"], unlike the camelCase record-level keys. Keep raw test events faithful, or FIFO grouping silently collapses into one group (SQSTestClientalready emits PascalCase).
max_concurrent_messages (default 10) bounds concurrency on standard queues;
FIFO records are processed in order per group. debug (default False) enables
verbose per-record logging through a registered LoggingMiddleware.
Why fastsqs
FastSQS gives you correct ReportBatchItemFailures and the FastAPI model on
top of it: the message body routes to a handler by type, with pydantic
validation, dependency injection, and a typed Context. Reach for it when a
queue carries many message types and you'd otherwise branch by hand in one
big handler. For a single trivial handler with no validation, a plain boto3
loop is still fine; FastSQS earns its place the moment routing, validation, or
DI enter the picture.
| You have… | by hand | FastSQS |
|---|---|---|
| Many message types on one queue, routed by payload | branch in one handler | declarative @app.route(Model) |
| Pydantic validation per type | bring your own | built in |
Dependency injection / typed Context |
wire it by hand | built in |
| FIFO per-group isolation by default | hand-rolled | isolate_groups |
| Partial batch failure | hand-rolled response shape | native |
Testing
from fastsqs.testing import SQSTestClient, RecordSpec
client = SQSTestClient(app)
# one message
result = client.send({"type": "order_created", "order_id": "1", "amount": 5})
assert result == {"batchItemFailures": []}
# a FIFO batch with two message groups (a .fifo ARN is set so AUTO infers FIFO)
client.send_batch([
RecordSpec({"type": "order_created", "order_id": "1", "amount": 1}, group_id="g1"),
RecordSpec({"type": "order_created", "order_id": "2", "amount": 2}, group_id="g2"),
])
# a raw (malformed) body becomes a reported failure, not an exception
result = client.send("{not json", message_id="bad")
assert result == {"batchItemFailures": [{"itemIdentifier": "bad"}]}
For hand-built events, fastsqs.testing also exports make_record(...) and
make_event(records).
Examples
Runnable end-to-end samples (handler + Dockerfile + tests) live in
examples/:
- simple_standard_example: minimal standard-queue app
- simple_fifo_example: FIFO with per-group ordering
- nested_example: routers & subrouters
- custom_middleware_example: writing middleware
- comprehensive_example: routing + DI + middleware together
See the roadmap for what's next.
Exceptions
All errors derive from FastSQSError:
RouteNotFoundError: a message matched no route and no default handler is registered.InvalidMessageError: a non-JSON body, a non-object body, or a pydantic validation failure.BatchFailedError: raised whenpartial_batch_failure=Falseand any record fails;.failuresholds the failed item ids.
Contributing
Issues and PRs are welcome. Open an issue at github.com/imgabrieldev/fastsqs/issues to discuss anything non-trivial first. Dev setup:
pip install -e . -r requirements-dev.txt
make test # unit suite
make start-local # build the Lambda image (Docker RIE) for local invokes
make invoke-standard # POST a sample SQS batch at the running container
License
MIT. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastsqs-1.1.2.tar.gz.
File metadata
- Download URL: fastsqs-1.1.2.tar.gz
- Upload date:
- Size: 76.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f56f6eeea5178e97e08ef783125964469e57d6b677807e4e8d747b4d025a417c
|
|
| MD5 |
9df600be7190a0b28b35e4acfcee3366
|
|
| BLAKE2b-256 |
434605f9d08bee3491c8f14bf9da59111589c497c1bb658859808c2666477f03
|
File details
Details for the file fastsqs-1.1.2-py3-none-any.whl.
File metadata
- Download URL: fastsqs-1.1.2-py3-none-any.whl
- Upload date:
- Size: 29.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5979f54f5bd475aa3a081226cd2284baa42af2f716637539b1a73a17488c401f
|
|
| MD5 |
1b5fad7122a703053d35dd892a39e223
|
|
| BLAKE2b-256 |
e92f9f674cf655e149cc9e62d0ca6c574d745d3f9b847aaa2e144f5abc014180
|