Skip to main content

FastStream - taskiq integration to schedule FastStream tasks

Project description

Taskiq - FastStream

Tests status Package version downloads Supported Python versions GitHub


The current package is just a wrapper for FastStream objects to make them compatible with Taskiq library.

The main goal of it - provide FastStream with a great Taskiq tasks scheduling feature.

Installation

If you already have FastStream project to interact with your Message Broker, you can add scheduling to it by installing just a taskiq-faststream

pip install taskiq-faststream

If you starting with a clear project, you can specify taskiq-faststream broker by the following distributions:

pip install taskiq-faststream[rabbit]
# or
pip install taskiq-faststream[kafka]
# or
pip install taskiq-faststream[nats]

Usage

The package gives you two classes: AppWrapper and BrokerWrapper

These are just containers for the related FastStream objects to make them taskiq-compatible

To create scheduling tasks for your broker, just wrap it to BrokerWrapper and use it like a regular taskiq Broker.

# regular FastStream code
from faststream.nats import NatsBroker

broker = NatsBroker()

@broker.subscriber("test-subject")
async def handler(msg: str):
    print(msg)

# taskiq-faststream scheduling
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler

# wrap FastStream object
taskiq_broker = BrokerWrapper(broker)

# create periodic task
taskiq_broker.task(
    message="Hi!",
    # If you are using RabbitBroker, then you need to replace subject with queue.
    # If you are using KafkaBroker, then you need to replace subject with topic.
    subject="test-subject",
    schedule=[{
        "cron": "* * * * *",
    }],
)

# create scheduler object
scheduler = StreamScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)

To run the scheduler, just use the following command

taskiq scheduler module:scheduler

Also, you can wrap your FastStream application the same way (allows to use lifespan events and AsyncAPI documentation):

# regular FastStream code
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test-subject")
async def handler(msg: str):
    print(msg)

# wrap FastStream object
from taskiq_faststream import AppWrapper
taskiq_broker = AppWrapper(app)

# Code below omitted 👇

A little feature: instead of using a final message argument, you can set a message callback to collect information right before sending:

async def collect_information_to_send():
    return "Message to send"

taskiq_broker.task(
    message=collect_information_to_send,
    ...,
)

Also, you can send a multiple message by one task call just using generator message callback with yield

async def collect_information_to_send():
    """Sends 10 messages per task call."""
    for i in range(10):
        yield i

taskiq_broker.task(
    message=collect_information_to_send,
    ...,
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_faststream-0.2.0.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

taskiq_faststream-0.2.0-py3-none-any.whl (8.4 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_faststream-0.2.0.tar.gz.

File metadata

  • Download URL: taskiq_faststream-0.2.0.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for taskiq_faststream-0.2.0.tar.gz
Algorithm Hash digest
SHA256 719b2a99b64e4bbb8ed1e5a7b4258da83ec7aa19c505c6e1269e192f8e416880
MD5 676b4484719e2129eec1ecd13de8757a
BLAKE2b-256 57ad8179b06fb032267d5da89488734edc2a70e10d50700afb0fd448639733b3

See more details on using hashes here.

File details

Details for the file taskiq_faststream-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for taskiq_faststream-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d909195cf1a20057c5cfc1f8feaec879636b4eb051f3c550c64eeee57f6a55fc
MD5 470c885fc2e84e2bfee7d09a6f03b3be
BLAKE2b-256 efe26f5abae34cad68cbb303fb720efd23e3c59e18a2decbec1d5fe6682dc7bd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page