No project description provided
Project description
Coleridge
Use Pydantic with RabbitMQ or to run background tasks.
Example
from time import sleep
from pydantic import BaseModel
from coleridge import Coleridge, Empty, Connection
class Poem(BaseModel):
lines: List[str]
# Background task
coleridge = Coleridge()
@coleridge
def long_task(_: Empty) -> Poem:
poem = Poem(lines=["In Xanadu did Kubla Khan",
"A stately pleasure-dome decree:",])
sleep(5)
return poem
def print_poem(poem: Poem) -> None:
for line in poem.lines:
print(f"~~ {line} ~~")
def print_error(error: Exception) -> None:
print(error)
# Assing a function to run when the exection finishes
long_task.on_finish = print_poem
# Or catch errors
long_task.on_error = print_error
# Run the task
long_task.run(Empty()) # It always requires a pydantic model, empty in this case
# Or use RabbitMQ instead of background functions
rabbit = Coleridge(Connection(host="localhost", port=5672))
@rabbit
def long_task_with_rabbit(poem: Poem) -> Empty:
for line in poem.lines:
print(f"~~ {line} ~~")
sleep(5)
return Empty()
long_task_with_rabbit.run(Poem(lines=["In Xanadu did Kubla Khan",
"A stately pleasure-dome decree:",]))
You can also call the results directly like this:
from time import sleep
from pydantic import BaseModel
from coleridge import Coleridge, Empty, Connection
class Poem(BaseModel):
lines: List[str]
# Background task
coleridge = Coleridge()
@coleridge
def long_task(_: Empty) -> Poem:
poem = Poem(lines=["In Xanadu did Kubla Khan",
"A stately pleasure-dome decree:",])
sleep(5)
return poem
result = long_task.run(Empty())
while True:
if result.finished:
if result.success:
print(result.result)
elif result.error:
print(result.error)
break
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
coleridge-0.1.1.tar.gz
(10.4 kB
view details)
Built Distribution
coleridge-0.1.1-py3-none-any.whl
(14.4 kB
view details)
File details
Details for the file coleridge-0.1.1.tar.gz
.
File metadata
- Download URL: coleridge-0.1.1.tar.gz
- Upload date:
- Size: 10.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ed488824ce52735925c96af30543bbd924162f824c19898433ddb5f232b0c21e |
|
MD5 | d621016b7913fdb11e01d0d5a2255a7d |
|
BLAKE2b-256 | e8f2c14cade3121d887b1f89d5baf188cdce3b3f490355d65d7408cb14fc5290 |
File details
Details for the file coleridge-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: coleridge-0.1.1-py3-none-any.whl
- Upload date:
- Size: 14.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5f041f6d65aef65d492f8d3b04d8228a4a266cd84d7c033326a6d0420c91acdb |
|
MD5 | 23900db72dc15b1ae3b8e67c76e3df9b |
|
BLAKE2b-256 | 2e86787e9fd4cef08407a50b9fc8225ac8d0175f3d470394cce943efabab16bb |