FastStream - taskiq integration to schedule FastStream tasks
Project description
Taskiq - FastStream
The current package is just a wrapper for FastStream objects to make them compatible with Taskiq library.
The main goal of it - provide FastStream with a great Taskiq tasks scheduling feature.
Installation
If you already have FastStream project to interact with your Message Broker, you can add scheduling to it by installing just a taskiq-faststream
pip install taskiq-faststream
If you starting with a clear project, you can specify taskiq-faststream broker by the following distributions:
pip install taskiq-faststream[rabbit]
# or
pip install taskiq-faststream[kafka]
# or
pip install taskiq-faststream[nats]
Usage
The package gives you two classes: AppWrapper
and BrokerWrapper
These are just containers for the related FastStream objects to make them taskiq-compatible
To create scheduling tasks for your broker, just wrap it to BrokerWrapper
and use it like a regular taskiq Broker.
# regular FastStream code
from faststream.nats import NatsBroker
broker = NatsBroker()
@broker.subscriber("test-subject")
async def handler(msg: str):
print(msg)
# taskiq-faststream scheduling
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper
# wrap FastStream object
taskiq_broker = BrokerWrapper(broker)
# create periodic task
taskiq_broker.task(
message="Hi!",
subject="test-subject"
schedule=[{
"cron": "* * * * *",
}],
)
# create scheduler object
scheduler = TaskiqScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(taskiq_broker)],
)
To run the scheduler, just use the following command
taskiq scheduler module:scheduler
Also, you can wrap your FastStream application the same way (allows to use lifespan events and AsyncAPI documentation):
# regular FastStream code
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker()
app = FastStream(broker)
@broker.subscriber("test-subject")
async def handler(msg: str):
print(msg)
# wrap FastStream object
from taskiq_faststream import AppWrapper
taskiq_broker = AppWrapper(app)
# Code below omitted 👇
A little feature: instead of using a final message
argument, you can set a message callback to collect information right before sending:
async def collect_information_to_send():
return "Message to send"
taskiq_broker.task(
message=collect_information_to_send,
...,
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for taskiq_faststream-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 86f832a335b08a6c7ec14395f4edcfe8978b6794a40db50cab6a6a6bfa977929 |
|
MD5 | b6a6ab7fcc551b7f99eee1e9b72eaaff |
|
BLAKE2b-256 | f3675bec526fe99087dee01f7d3ca4d72afabacc560a95832dc2ae5f492471e4 |