Skip to main content

FastAPI-like, lightweight SQS message processing for Python (routing + middleware + dependency injection)

Project description

FastSQS

A FastAPI-style router for AWS SQS on Lambda — pydantic routing, dependency injection, a middleware system, and native partial batch failure.

PyPI version License: MIT Python


FastSQS turns an SQS-triggered Lambda into a typed, declarative app. You write handlers for pydantic event models; FastSQS parses each record, routes it, validates it, runs your middleware, and returns the batchItemFailures SQS expects — so failed messages are redelivered and dead-lettered by the queue's own redrive policy, not by bespoke in-app code.

Features

  • 🚀 FastAPI-style routing@app.route(OrderCreated) dispatches by a payload discriminator (default key "type").
  • 🔒 Pydantic validation — handlers receive a validated SQSEvent model; bad messages become clean batch failures.
  • 💉 Dependency injection — declare Depends(...) params (powered by fast-depends); no @inject needed.
  • 🧩 Typed Contextctx.message_id, ctx.queue_type, … as typed attributes; arbitrary scratch in ctx.state.
  • 🪝 Middlewarebefore/after hooks with balanced unwind (resources acquired in before are always released).
  • 🦾 Partial batch failure — native ReportBatchItemFailures for standard and FIFO queues.
  • 🔀 FIFO-aware — queue type is inferred from the event-source ARN; per-group ordering with configurable failure mode.
  • 🧪 In-process test client — drive your app with synthetic events, no AWS required.
  • 🐍 Typed — ships py.typed; full editor/mypy support.

Install

pip install fastsqs

Requires Python 3.10+. Depends on pydantic>=2 and fast-depends>=3.

Quick start

from fastsqs import FastSQS, SQSEvent

app = FastSQS()  # queue type auto-detected from the event-source ARN


class OrderCreated(SQSEvent):
    order_id: str
    amount: int


@app.route(OrderCreated)
async def handle_order(msg: OrderCreated):
    print("processing", msg.order_id, msg.amount)
    # raising marks this record as failed -> SQS redelivers it


# Lambda entry point (set as the function handler):
def handler(event, context):
    return app.handler(event, context)

A message is routed by its discriminator value ("type" by default), matched to the event model's name in snake_case — {"type": "order_created", "order_id": "...", "amount": 1} routes to OrderCreated.

Typed context

Annotate a handler (or middleware) param ctx: Context for typed access to the framework-owned fields. Put your own scratch data in ctx.state:

from fastsqs import FastSQS, SQSEvent, Context

app = FastSQS()


@app.route(OrderCreated)
async def handle(msg: OrderCreated, ctx: Context):
    ctx.message_id        # str
    ctx.queue_type        # QueueType enum (.value for the string)
    ctx.fifo_info         # FifoInfo | None (.message_group_id, ...)
    ctx.state.tenant = "acme"   # arbitrary scratch — never collides with a framework field

Dependency injection

Declare Depends(...) params and FastSQS wires them per invocation (no decorator):

from fastsqs import FastSQS, SQSEvent, Depends

def get_db():
    return Database(...)

app = FastSQS()


@app.route(OrderCreated)
async def handle(msg: OrderCreated, db=Depends(get_db)):
    await db.save(msg.order_id)

Sub-dependencies (a Depends that itself takes Depends) resolve automatically.

Middleware

Subclass Middleware and override before / after. after always runs for every middleware whose before completed (balanced unwind), and receives the error (or None):

from fastsqs import FastSQS, Middleware, TimingMiddleware, LoggingMiddleware

class Audit(Middleware):
    async def before(self, payload, record, context, ctx):
        ctx.state.t0 = ...
    async def after(self, payload, record, context, ctx, error):
        if error is not None:
            ...  # observe the failure

app = FastSQS()
app.add_middleware(LoggingMiddleware())
app.add_middleware(TimingMiddleware())
app.add_middleware(Audit())

Observability, idempotency and PII masking are application concerns — compose them as your own middleware (or use aws-lambda-powertools alongside FastSQS).

FIFO & partial batch failure

  • Queue type is QueueType.AUTO by default: FastSQS infers FIFO from a .fifo event-source ARN. Force it with FastSQS(queue_type=QueueType.FIFO).
  • fifo_failure_mode (FIFO only): "isolate_groups" (default) blocks only the failed messageGroupId's tail; "halt_batch" halts the whole batch at the first failure (AWS Powertools' default).
  • partial_batch_failure (default True) reports per-record failures. Set it False to fail the entire batch (raising BatchFailedError) so SQS redelivers every message.

FastSQS only reports failures — redelivery and dead-lettering are the queue's job (visibility timeout + maxReceiveCount + redrive policy).

max_concurrent_messages (default 10) bounds concurrency on standard queues; FIFO records are processed in order per group.

debug (default False) enables verbose per-record debug logging through the registered LoggingMiddleware.

Testing

from fastsqs.testing import SQSTestClient, RecordSpec

client = SQSTestClient(app)

# one message
result = client.send({"type": "order_created", "order_id": "1", "amount": 5})
assert result == {"batchItemFailures": []}

# a FIFO batch with two message groups (a .fifo ARN is set so AUTO infers FIFO)
client.send_batch([
    RecordSpec({"type": "order_created", "order_id": "1", "amount": 1}, group_id="g1"),
    RecordSpec({"type": "order_created", "order_id": "2", "amount": 2}, group_id="g2"),
])

# a raw (malformed) body to exercise the InvalidMessageError path
client.send("{not json", message_id="bad")

Exceptions

All errors derive from FastSQSError: RouteNotFoundError, InvalidMessageError, and BatchFailedError (whose .failures holds the failed item ids).

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastsqs-1.0.0.tar.gz (69.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastsqs-1.0.0-py3-none-any.whl (27.0 kB view details)

Uploaded Python 3

File details

Details for the file fastsqs-1.0.0.tar.gz.

File metadata

  • Download URL: fastsqs-1.0.0.tar.gz
  • Upload date:
  • Size: 69.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for fastsqs-1.0.0.tar.gz
Algorithm Hash digest
SHA256 b6fd0d68bbb543dc69e202a3fd235f47dc3da2e854d697a363f7db340206921e
MD5 4710b97347ad667de0e79d589219dc87
BLAKE2b-256 189d8103549632df682f9f07e340ebb2995dd9bac890af5c2589364a9490d1d3

See more details on using hashes here.

File details

Details for the file fastsqs-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: fastsqs-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 27.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for fastsqs-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4b4a8dc843517d70c26f886d50b28fa230dad1d1818d31668bae8ff7cca5df9c
MD5 b2cfa73f74732453f2187758deff14f0
BLAKE2b-256 c553a95a78c9c8d862a68524c3f2e47577e762bb9acf14d5f2a969ae8f94118e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page