Skip to main content

Send and receive AWS SQS messages directly to pydantic objects

Project description

pydantic-sqs

Convert your pydantic models to and from AWS SQS messages.

Latest Commit GitHub release (latest by date)
GitHub Workflow Status Test and Lint (branch)
Package version

Main Dependencies

Getting Started

from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os


class ThisModel(SQSModel):
    foo: str = Field(..., description="Foo")


class ThatModel(SQSModel):
    bar: str = Field(..., description="bar")


async def main():
    queue_kwargs = {
        "queue_url": os.environ.get("SQS_QUEUE_URL"),
        "endpoint_url": os.environ.get("SQS_ENDPOINT_URL", None),
        "use_ssl": os.environ.get("SQS_USE_SSL", "true").lower() == "true",
    }
    if queue_kwargs["endpoint_url"] is None:
        del queue_kwargs["endpoint_url"]

    queue = SQSQueue(**queue_kwargs)

    queue.register_model(ThisModel)
    queue.register_model(ThatModel)

    this_thing = ThisModel(foo="1234")
    that_thing = ThatModel(bar="5678")
    await this_thing.to_sqs()
    await that_thing.to_sqs()

    new_things = await queue.from_sqs(max_messages=10, wait_time_seconds=90)
    pprint(new_things)
    for thing in new_things:
        await thing.delete_from_queue()

    print("deleted all the messages we got from the queue")
    pprint(new_things)


if __name__ == "__main__":
    asyncio.run(main())

Examples

Examples are in the examples/ directory of this repo.

Installation

Install the package

pip install pydantic-sqs

Contributing

Contributions are very welcome. To learn more, see the Contributor Guide

License

Licensed under the MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydantic_sqs-0.2.0.tar.gz (9.6 kB view hashes)

Uploaded Source

Built Distribution

pydantic_sqs-0.2.0-py3-none-any.whl (9.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page