FastStream - taskiq integration to schedule FastStream tasks
Project description
Taskiq - FastStream
The current package is just a wrapper for FastStream objects to make them compatible with Taskiq library.
The main goal of it - provide FastStream with a great Taskiq tasks scheduling feature.
Installation
If you already have FastStream project to interact with your Message Broker, you can add scheduling to it by installing just a taskiq-faststream
pip install taskiq-faststream
If you starting with a clear project, you can specify taskiq-faststream broker by the following distributions:
pip install taskiq-faststream[rabbit]
# or
pip install taskiq-faststream[kafka]
# or
pip install taskiq-faststream[confluent]
# or
pip install taskiq-faststream[nats]
# or
pip install taskiq-faststream[redis]
For OpenTelemetry distributed tracing support:
pip install taskiq-faststream[otel]
Usage
The package gives you two classes: AppWrapper and BrokerWrapper
These are just containers for the related FastStream objects to make them taskiq-compatible
To create scheduling tasks for your broker, just wrap it to BrokerWrapper and use it like a regular taskiq Broker.
# regular FastStream code
from faststream.nats import NatsBroker
broker = NatsBroker()
@broker.subscriber("test-subject")
async def handler(msg: str):
print(msg)
# taskiq-faststream scheduling
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler
# wrap FastStream object
taskiq_broker = BrokerWrapper(broker)
# create periodic task
taskiq_broker.task(
message="Hi!",
# If you are using RabbitBroker, then you need to replace subject with queue.
# If you are using KafkaBroker, then you need to replace subject with topic.
subject="test-subject",
schedule=[{
"cron": "* * * * *",
}],
)
# create scheduler object
scheduler = StreamScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(taskiq_broker)],
)
To run the scheduler, just use the following command
taskiq scheduler module:scheduler
Also, you can wrap your FastStream application the same way (allows to use lifespan events and AsyncAPI documentation):
# regular FastStream code
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker()
app = FastStream(broker)
@broker.subscriber("test-subject")
async def handler(msg: str):
print(msg)
# wrap FastStream object
from taskiq_faststream import AppWrapper
taskiq_broker = AppWrapper(app)
# Code below omitted 👇
A little feature: instead of using a final message argument, you can set a message callback to collect information right before sending:
async def collect_information_to_send():
return "Message to send"
taskiq_broker.task(
message=collect_information_to_send,
...,
)
Also, you can send a multiple message by one task call just using generator message callback with yield
async def collect_information_to_send():
"""Sends 10 messages per task call."""
for i in range(10):
yield i
taskiq_broker.task(
message=collect_information_to_send,
...,
)
OpenTelemetry Support
taskiq-faststream supports taskiq's OpenTelemetry middleware. To enable it, pass OpenTelemetryMiddleware when creating the broker wrapper:
from faststream.nats import NatsBroker
from taskiq_faststream import BrokerWrapper
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
broker = NatsBroker()
# Enable OpenTelemetry middleware
taskiq_broker = BrokerWrapper(broker, middlewares=[OpenTelemetryMiddleware()])
This will automatically add OpenTelemetry middleware to track task execution, providing insights into:
- Task execution spans
- Task dependencies and call chains
- Performance metrics
- Error tracking
Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup.
The same applies to AppWrapper:
from faststream import FastStream
from taskiq_faststream import AppWrapper
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
app = FastStream(broker)
# Enable OpenTelemetry middleware
taskiq_broker = AppWrapper(app, middlewares=[OpenTelemetryMiddleware()])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_faststream-0.4.0.tar.gz.
File metadata
- Download URL: taskiq_faststream-0.4.0.tar.gz
- Upload date:
- Size: 8.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8b3f033a5e73af402e50bf6a5ddfbeba0901eeb9f9a6abb23e59c3732ab67acf
|
|
| MD5 |
352a5d138361b89206ff2fc9e52326fe
|
|
| BLAKE2b-256 |
44ccf36efb622ee539b5866e7df4bf4c54fbec780db9adf697a20e3a7a679967
|
Provenance
The following attestation bundles were made for taskiq_faststream-0.4.0.tar.gz:
Publisher:
release.yml on taskiq-python/taskiq-faststream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskiq_faststream-0.4.0.tar.gz -
Subject digest:
8b3f033a5e73af402e50bf6a5ddfbeba0901eeb9f9a6abb23e59c3732ab67acf - Sigstore transparency entry: 779601048
- Sigstore integration time:
-
Permalink:
taskiq-python/taskiq-faststream@63dafcb780160f8a3c7d16b9c8e7a6ba6dc709b9 -
Branch / Tag:
refs/tags/0.4.0 - Owner: https://github.com/taskiq-python
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@63dafcb780160f8a3c7d16b9c8e7a6ba6dc709b9 -
Trigger Event:
push
-
Statement type:
File details
Details for the file taskiq_faststream-0.4.0-py3-none-any.whl.
File metadata
- Download URL: taskiq_faststream-0.4.0-py3-none-any.whl
- Upload date:
- Size: 9.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89cd5cef41fe09545dee182c5e2703b2bb5ae3add44066fa975d40fbfb126c50
|
|
| MD5 |
a5af7910365a66f1827730d61565c144
|
|
| BLAKE2b-256 |
7f2259eb6d07379aef7cdf9e89fbba4701febed4c7c2d8c20f6cd7de2f13c3c1
|
Provenance
The following attestation bundles were made for taskiq_faststream-0.4.0-py3-none-any.whl:
Publisher:
release.yml on taskiq-python/taskiq-faststream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskiq_faststream-0.4.0-py3-none-any.whl -
Subject digest:
89cd5cef41fe09545dee182c5e2703b2bb5ae3add44066fa975d40fbfb126c50 - Sigstore transparency entry: 779601051
- Sigstore integration time:
-
Permalink:
taskiq-python/taskiq-faststream@63dafcb780160f8a3c7d16b9c8e7a6ba6dc709b9 -
Branch / Tag:
refs/tags/0.4.0 - Owner: https://github.com/taskiq-python
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@63dafcb780160f8a3c7d16b9c8e7a6ba6dc709b9 -
Trigger Event:
push
-
Statement type: