Skip to main content

FastAPI-like, lightweight SQS message processing for Python (routing + middleware + dependency injection)

Project description

FastSQS

A FastAPI-style router for AWS SQS on Lambda — pydantic routing, dependency injection, a middleware system, and native partial batch failure.

PyPI version License: MIT Python


FastSQS turns an SQS-triggered Lambda into a typed, declarative app. You write handlers for pydantic event models; FastSQS parses each record, routes it, validates it, runs your middleware, and returns the batchItemFailures SQS expects — so failed messages are redelivered and dead-lettered by the queue's own redrive policy, not by bespoke in-app code.

Features

  • 🚀 FastAPI-style routing@app.route(OrderCreated) dispatches by a payload discriminator (default key "type").
  • 🔒 Pydantic validation — handlers receive a validated SQSEvent model; bad messages become clean batch failures.
  • 💉 Dependency injection — declare Depends(...) params (powered by fast-depends); no @inject needed.
  • 🧩 Typed Contextctx.message_id, ctx.queue_type, … as typed attributes; arbitrary scratch in ctx.state.
  • 🪝 Middlewarebefore/after hooks with balanced unwind (resources acquired in before are always released).
  • 🦾 Partial batch failure — native ReportBatchItemFailures for standard and FIFO queues.
  • 🔀 FIFO-aware — queue type is inferred from the event-source ARN; per-group ordering with configurable failure mode.
  • 🧪 In-process test client — drive your app with synthetic events, no AWS required.
  • 🐍 Typed — ships py.typed; full editor/mypy support.

Install

pip install fastsqs

Requires Python 3.10+. Depends on pydantic>=2 and fast-depends>=3.

Quick start

from fastsqs import FastSQS, SQSEvent

app = FastSQS()  # queue type auto-detected from the event-source ARN


class OrderCreated(SQSEvent):
    order_id: str
    amount: int


@app.route(OrderCreated)
async def handle_order(msg: OrderCreated):
    print("processing", msg.order_id, msg.amount)
    # raising marks this record as failed -> SQS redelivers it


# Lambda entry point (set as the function handler):
def handler(event, context):
    return app.handler(event, context)

A message is routed by its discriminator value ("type" by default), matched to the event model's name in snake_case — {"type": "order_created", "order_id": "...", "amount": 1} routes to OrderCreated.

Typed context

Annotate a handler (or middleware) param ctx: Context for typed access to the framework-owned fields. Put your own scratch data in ctx.state:

from fastsqs import FastSQS, SQSEvent, Context

app = FastSQS()


@app.route(OrderCreated)
async def handle(msg: OrderCreated, ctx: Context):
    ctx.message_id        # str
    ctx.queue_type        # QueueType enum (.value for the string)
    ctx.fifo_info         # FifoInfo | None (.message_group_id, ...)
    ctx.state.tenant = "acme"   # arbitrary scratch — never collides with a framework field

Dependency injection

Declare Depends(...) params and FastSQS wires them per invocation (no decorator):

from fastsqs import FastSQS, SQSEvent, Depends

def get_db():
    return Database(...)

app = FastSQS()


@app.route(OrderCreated)
async def handle(msg: OrderCreated, db=Depends(get_db)):
    await db.save(msg.order_id)

Sub-dependencies (a Depends that itself takes Depends) resolve automatically.

Middleware

Subclass Middleware and override before / after. after always runs for every middleware whose before completed (balanced unwind), and receives the error (or None):

from fastsqs import FastSQS, Middleware, TimingMiddleware, LoggingMiddleware

class Audit(Middleware):
    async def before(self, payload, record, context, ctx):
        ctx.state.t0 = ...
    async def after(self, payload, record, context, ctx, error):
        if error is not None:
            ...  # observe the failure

app = FastSQS()
app.add_middleware(LoggingMiddleware())
app.add_middleware(TimingMiddleware())
app.add_middleware(Audit())

Observability, idempotency and PII masking are application concerns — compose them as your own middleware (or use aws-lambda-powertools alongside FastSQS).

FIFO & partial batch failure

  • Queue type is QueueType.AUTO by default: FastSQS infers FIFO from a .fifo event-source ARN. Force it with FastSQS(queue_type=QueueType.FIFO).
  • fifo_failure_mode (FIFO only): "isolate_groups" (default) blocks only the failed messageGroupId's tail; "halt_batch" halts the whole batch at the first failure (AWS Powertools' default).
  • partial_batch_failure (default True) reports per-record failures. Set it False to fail the entire batch (raising BatchFailedError) so SQS redelivers every message.

FastSQS only reports failures — redelivery and dead-lettering are the queue's job (visibility timeout + maxReceiveCount + redrive policy).

max_concurrent_messages (default 10) bounds concurrency on standard queues; FIFO records are processed in order per group.

debug (default False) enables verbose per-record debug logging through the registered LoggingMiddleware.

Testing

from fastsqs.testing import SQSTestClient, RecordSpec

client = SQSTestClient(app)

# one message
result = client.send({"type": "order_created", "order_id": "1", "amount": 5})
assert result == {"batchItemFailures": []}

# a FIFO batch with two message groups (a .fifo ARN is set so AUTO infers FIFO)
client.send_batch([
    RecordSpec({"type": "order_created", "order_id": "1", "amount": 1}, group_id="g1"),
    RecordSpec({"type": "order_created", "order_id": "2", "amount": 2}, group_id="g2"),
])

# a raw (malformed) body to exercise the InvalidMessageError path
client.send("{not json", message_id="bad")

Exceptions

All errors derive from FastSQSError: RouteNotFoundError, InvalidMessageError, and BatchFailedError (whose .failures holds the failed item ids).

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastsqs-1.1.0.tar.gz (72.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastsqs-1.1.0-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file fastsqs-1.1.0.tar.gz.

File metadata

  • Download URL: fastsqs-1.1.0.tar.gz
  • Upload date:
  • Size: 72.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for fastsqs-1.1.0.tar.gz
Algorithm Hash digest
SHA256 6c009c018d80cd293aa2cd698c39416e74e8b980ea8b0bafa1bb6cac00f2a052
MD5 ee8185d338d47e9ffafb07e2c75dddd8
BLAKE2b-256 9ca6ed4aee6e8f7133f1dd56644cd685e18c47f25b942b1d3fc2c43c50bfd1b1

See more details on using hashes here.

File details

Details for the file fastsqs-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: fastsqs-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 27.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for fastsqs-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c8c020b108aa5ab0276a2db9b5715278890912f2d5b039e4ad32fc70936effcc
MD5 c74bdbc595112e0a69786400f21cab52
BLAKE2b-256 907f8451c8f4693b8fdc72d156d07ab9b14ece70ea192fde39233c2f7e448c96

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page