FastAPI-like, lightweight SQS message processing for Python (routing + middleware + dependency injection)
Project description
FastSQS
A FastAPI-style router for AWS SQS on Lambda — pydantic routing, dependency injection, a middleware system, and native partial batch failure.
FastSQS turns an SQS-triggered Lambda into a typed, declarative app. You write
handlers for pydantic event models; FastSQS parses each record, routes it,
validates it, runs your middleware, and returns the batchItemFailures SQS
expects — so failed messages are redelivered and dead-lettered by the queue's
own redrive policy, not by bespoke in-app code.
Features
- 🚀 FastAPI-style routing —
@app.route(OrderCreated)dispatches by a payload discriminator (default key"type"). - 🔒 Pydantic validation — handlers receive a validated
SQSEventmodel; bad messages become clean batch failures. - 💉 Dependency injection — declare
Depends(...)params (powered byfast-depends); no@injectneeded. - 🧩 Typed
Context—ctx.message_id,ctx.queue_type, … as typed attributes; arbitrary scratch inctx.state. - 🪝 Middleware —
before/afterhooks with balanced unwind (resources acquired inbeforeare always released). - 🦾 Partial batch failure — native
ReportBatchItemFailuresfor standard and FIFO queues. - 🔀 FIFO-aware — queue type is inferred from the event-source ARN; per-group ordering with configurable failure mode.
- 🧪 In-process test client — drive your app with synthetic events, no AWS required.
- 🐍 Typed — ships
py.typed; full editor/mypy support.
Install
pip install fastsqs
Requires Python 3.10+. Depends on pydantic>=2 and fast-depends>=3.
Quick start
from fastsqs import FastSQS, SQSEvent
app = FastSQS() # queue type auto-detected from the event-source ARN
class OrderCreated(SQSEvent):
order_id: str
amount: int
@app.route(OrderCreated)
async def handle_order(msg: OrderCreated):
print("processing", msg.order_id, msg.amount)
# raising marks this record as failed -> SQS redelivers it
# Lambda entry point (set as the function handler):
def handler(event, context):
return app.handler(event, context)
A message is routed by its discriminator value ("type" by default), matched to
the event model's name in snake_case — {"type": "order_created", "order_id": "...", "amount": 1}
routes to OrderCreated.
Typed context
Annotate a handler (or middleware) param ctx: Context for typed access to the
framework-owned fields. Put your own scratch data in ctx.state:
from fastsqs import FastSQS, SQSEvent, Context
app = FastSQS()
@app.route(OrderCreated)
async def handle(msg: OrderCreated, ctx: Context):
ctx.message_id # str
ctx.queue_type # QueueType enum (.value for the string)
ctx.fifo_info # FifoInfo | None (.message_group_id, ...)
ctx.state.tenant = "acme" # arbitrary scratch — never collides with a framework field
Dependency injection
Declare Depends(...) params and FastSQS wires them per invocation (no decorator):
from fastsqs import FastSQS, SQSEvent, Depends
def get_db():
return Database(...)
app = FastSQS()
@app.route(OrderCreated)
async def handle(msg: OrderCreated, db=Depends(get_db)):
await db.save(msg.order_id)
Sub-dependencies (a Depends that itself takes Depends) resolve automatically.
Middleware
Subclass Middleware and override before / after. after always runs for
every middleware whose before completed (balanced unwind), and receives the
error (or None):
from fastsqs import FastSQS, Middleware, TimingMiddleware, LoggingMiddleware
class Audit(Middleware):
async def before(self, payload, record, context, ctx):
ctx.state.t0 = ...
async def after(self, payload, record, context, ctx, error):
if error is not None:
... # observe the failure
app = FastSQS()
app.add_middleware(LoggingMiddleware())
app.add_middleware(TimingMiddleware())
app.add_middleware(Audit())
Observability, idempotency and PII masking are application concerns — compose
them as your own middleware (or use aws-lambda-powertools alongside FastSQS).
FIFO & partial batch failure
- Queue type is
QueueType.AUTOby default: FastSQS infers FIFO from a.fifoevent-source ARN. Force it withFastSQS(queue_type=QueueType.FIFO). fifo_failure_mode(FIFO only):"isolate_groups"(default) blocks only the failedmessageGroupId's tail;"halt_batch"halts the whole batch at the first failure (AWS Powertools' default).partial_batch_failure(defaultTrue) reports per-record failures. Set itFalseto fail the entire batch (raisingBatchFailedError) so SQS redelivers every message.
FastSQS only reports failures — redelivery and dead-lettering are the queue's
job (visibility timeout + maxReceiveCount + redrive policy).
max_concurrent_messages (default 10) bounds concurrency on standard queues;
FIFO records are processed in order per group.
debug (default False) enables verbose per-record debug logging through the
registered LoggingMiddleware.
Testing
from fastsqs.testing import SQSTestClient, RecordSpec
client = SQSTestClient(app)
# one message
result = client.send({"type": "order_created", "order_id": "1", "amount": 5})
assert result == {"batchItemFailures": []}
# a FIFO batch with two message groups (a .fifo ARN is set so AUTO infers FIFO)
client.send_batch([
RecordSpec({"type": "order_created", "order_id": "1", "amount": 1}, group_id="g1"),
RecordSpec({"type": "order_created", "order_id": "2", "amount": 2}, group_id="g2"),
])
# a raw (malformed) body to exercise the InvalidMessageError path
client.send("{not json", message_id="bad")
Exceptions
All errors derive from FastSQSError: RouteNotFoundError,
InvalidMessageError, and BatchFailedError (whose .failures holds the failed
item ids).
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastsqs-1.0.0.tar.gz.
File metadata
- Download URL: fastsqs-1.0.0.tar.gz
- Upload date:
- Size: 69.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b6fd0d68bbb543dc69e202a3fd235f47dc3da2e854d697a363f7db340206921e
|
|
| MD5 |
4710b97347ad667de0e79d589219dc87
|
|
| BLAKE2b-256 |
189d8103549632df682f9f07e340ebb2995dd9bac890af5c2589364a9490d1d3
|
File details
Details for the file fastsqs-1.0.0-py3-none-any.whl.
File metadata
- Download URL: fastsqs-1.0.0-py3-none-any.whl
- Upload date:
- Size: 27.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4b4a8dc843517d70c26f886d50b28fa230dad1d1818d31668bae8ff7cca5df9c
|
|
| MD5 |
b2cfa73f74732453f2187758deff14f0
|
|
| BLAKE2b-256 |
c553a95a78c9c8d862a68524c3f2e47577e762bb9acf14d5f2a969ae8f94118e
|