Send and receive AWS SQS messages directly to pydantic objects
Project description
pydantic-sqs
Convert your pydantic models to and from AWS SQS messages.
Main Dependencies
Getting Started
from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os
class ThisModel(SQSModel):
foo: str = Field(..., description="Foo")
class ThatModel(SQSModel):
bar: str = Field(..., description="bar")
async def main():
queue_kwargs = {
"queue_url": os.environ.get("SQS_QUEUE_URL"),
"endpoint_url": os.environ.get("SQS_ENDPOINT_URL", None),
"use_ssl": os.environ.get("SQS_USE_SSL", "true").lower() == "true",
}
if queue_kwargs["endpoint_url"] is None:
del queue_kwargs["endpoint_url"]
queue = SQSQueue(**queue_kwargs)
queue.register_model(ThisModel)
queue.register_model(ThatModel)
this_thing = ThisModel(foo="1234")
that_thing = ThatModel(bar="5678")
await this_thing.to_sqs()
await that_thing.to_sqs()
new_things = await queue.from_sqs(max_messages=10, wait_time_seconds=90)
pprint(new_things)
for thing in new_things:
await thing.delete_from_queue()
print("deleted all the messages we got from the queue")
pprint(new_things)
if __name__ == "__main__":
asyncio.run(main())
Examples
Examples are in the examples/ directory of this repo.
Installation
Install the package
pip install pydantic-sqs
Contributing
Contributions are very welcome. To learn more, see the Contributor Guide
License
Licensed under the MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pydantic_sqs-0.1.0.tar.gz
(9.2 kB
view hashes)
Built Distribution
Close
Hashes for pydantic_sqs-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e2c54dfcb9f95a744728c6ff0298d4870fd397f7521010b21b3fe661a741601a |
|
MD5 | edefe7c553204b3072d0b4d69a5493be |
|
BLAKE2b-256 | 4a95d582c4551b22242eb3eaccf3d87854299aa7a70819df3a62217b52caf1e4 |