Send and receive AWS SQS messages directly to pydantic objects
Project description
pydantic-sqs
Convert your pydantic models to and from AWS SQS messages.
Main Dependencies
Getting Started
from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os
class ThisModel(SQSModel):
foo: str = Field(..., description="Foo")
class ThatModel(SQSModel):
bar: str = Field(..., description="bar")
async def main():
queue_kwargs = {
"queue_url": os.environ.get("SQS_QUEUE_URL"),
"endpoint_url": os.environ.get("SQS_ENDPOINT_URL", None),
"use_ssl": os.environ.get("SQS_USE_SSL", "true").lower() == "true",
}
if queue_kwargs["endpoint_url"] is None:
del queue_kwargs["endpoint_url"]
queue = SQSQueue(**queue_kwargs)
queue.register_model(ThisModel)
queue.register_model(ThatModel)
this_thing = ThisModel(foo="1234")
that_thing = ThatModel(bar="5678")
await this_thing.to_sqs()
await that_thing.to_sqs()
new_things = await queue.from_sqs(max_messages=10, wait_time_seconds=90)
pprint(new_things)
for thing in new_things:
await thing.delete_from_queue()
print("deleted all the messages we got from the queue")
pprint(new_things)
if __name__ == "__main__":
asyncio.run(main())
Examples
Examples are in the examples/ directory of this repo.
Installation
Install the package
pip install pydantic-sqs
Contributing
Contributions are very welcome. To learn more, see the Contributor Guide
License
Licensed under the MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pydantic_sqs-0.2.0.tar.gz
(9.6 kB
view hashes)
Built Distribution
Close
Hashes for pydantic_sqs-0.2.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 16c355a25626b1bd60cd4aeb3db7af658e943b671ca796a5e509e675d6457c30 |
|
MD5 | c7a793d4dc7387179772ba377b5075bd |
|
BLAKE2b-256 | b96f27954a2b209d4989bb7f1b49766607fe62e99af7fe1f99cf42f44ffc7e5e |