Skip to main content

No project description provided

Project description

Taskturbine Python

Python SDK for taskturbine. Built on top of the taskturbine-core package in this repository. This library contains both synchronous and asyncio APIs.

Synchronous API Usage

import json
from datetime import timedelta

from taskturbine import Config, TaskturbineApp, TaskContext

# Config reflects options available in rust package.
config = Config(
    database_url="postgres://app:password@localhost:5432/my_app",
    usecase="docket-app"
)

# Build an app that can have tasks attached.
app = TaskturbineApp(config=config)

# Define additional channels that tasks can be spawned on
app.add_channel("reports")

# Register a task that can be spawned
@app.register_task("process-signup")
def process_signup(ctx: TaskContext) -> None:

    # Tasks are composed of steps that run at least once.
    @ctx.step(name="create-user")
    def create_user(ctx: TaskContext) -> str:
        # insert user
        user_data = {...}
        return json.dumps(user_data)

    @ctx.step(name="send-registration-code")
    def send_registration_code(ctx: TaskContext) -> str:
        # Send registration code
        return ""

    @ctx.step(name="complete_registration")
    def complete_registration(ctx: TaskContext) -> str:
        # Provision the rest of the account
        return ""

    # Run the steps
    user_data = create_user(ctx)
    regstration_data = send_registration_code(ctx)

    # Wait for an external event.
    payload = ctx.await_event(registration_data['event_name'], timeout=timedelta(minutes=10))

    result = complete_registration(ctx)

    return result


# Spawn a task with a dict of parameters and options
# parameters will be JSON encoded automatically.
app.spawn_task("process-signup", parameters, retry_seconds=30)

# Spawn a task on a defined channel.
app.spawn_task("process-signup", parameters, channel="reports")

# Emit an external event. Payload is expected to be bytes
# containing the event.
app.emit_event("event-123", payload)

# Run a worker consuming from two channels
worker = app.worker("worker-812", ["reports", "default"])
worker.run()

# Run a dedicated upkeep worker
worker = app.worker("worker-upkeep")
worker.run_upkeep()

The synchronous Worker uses multiprocessing to increase throughput. You can control how many child processes are spawned with Config.worker_concurrency.

Asyncio API Usage

import json
from datetime import timedelta

from taskturbine import Config
from taskturbine.asynclib import AsyncTaskturbineApp, AsyncTaskContext

# Config reflects options available in rust package.
config = Config(
    database_url="postgres://app:password@localhost:5432/my_app",
    usecase="docket-app"
)

# Build an app that can have tasks attached.
app = AsyncTaskturbineApp(config=config)

# Define additional channels that tasks can be spawned on
app.add_channel("reports")

# Register a task that can be spawned
@app.register_task("process-signup")
async def process_signup(ctx: AsyncTaskContext) -> None:

    # Tasks are composed of steps that run at least once.
    @ctx.step(name="create-user")
    async def create_user(ctx: AsyncTaskContext) -> str:
        # insert user
        user_data = {...}
        return json.dumps(user_data)

    @ctx.step(name="send-registration-code")
    async def send_registration_code(ctx: AsyncTaskContext) -> str:
        # Send registration code
        return ""

    @ctx.step(name="complete_registration")
    async def complete_registration(ctx: AsyncTaskContext) -> str:
        # Provision the rest of the account
        return ""

    # Run the steps
    user_data = await create_user(ctx)
    regstration_data = await send_registration_code(ctx)

    # Wait for an external event.
    payload = await ctx.await_event(registration_data['event_name'], timeout=timedelta(minutes=10))

    result = await complete_registration(ctx)

    return result


# Spawn a task with a dict of parameters and options
# parameters will be JSON encoded automatically.
await app.spawn_task("process-signup", parameters, retry_seconds=30)

# Spawn a task on a defined channel.
await app.spawn_task("process-signup", parameters, channel="reports")

# Emit an external event. Payload is expected to be bytes
# containing the event.
await app.emit_event("event-123", payload)

# Run a worker consuming from two channels
worker = app.worker("worker-812", ["reports", "default"])
await worker.run()

# Run an upkeep worker
worker = app.worker("worker-upkeep")
await worker.run_upkeep()

Async workers are single threaded and will run up to Config.worker_concurrency tasks simultaneously.

Development setup

You'll need uv installed. You can use uv init && uv sync to setup a development environment.

Building and running tests

This library uses maturin to build the native extension:

  1. Run uv run maturin develop or uv run maturin develop --release
  2. Run uv run maturin build to produce a wheel.

To run tests you'll need TASKTURBINE_DATABASE_URL set:

export TASKTURBINE_DATABASE_URL="postgres://postgres:@127.0.0.1:5432/my_app"

Then tests can be run with uv run pytest.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskturbine-0.1.0.tar.gz (2.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskturbine-0.1.0-cp313-cp313-manylinux_2_34_x86_64.whl (2.4 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.34+ x86-64

File details

Details for the file taskturbine-0.1.0.tar.gz.

File metadata

  • Download URL: taskturbine-0.1.0.tar.gz
  • Upload date:
  • Size: 2.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.3.0

File hashes

Hashes for taskturbine-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d8bac6a87db04fe4029eab0bcb076fe6dcf4311afbfe4be74e2ddb5201baf920
MD5 c5e825723ab2ef615775d5f827835b0c
BLAKE2b-256 2e6bebda272dd5145066476b7c687f25a8b29c5b6ba03d086ae6254f286cdfd0

See more details on using hashes here.

File details

Details for the file taskturbine-0.1.0-cp313-cp313-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for taskturbine-0.1.0-cp313-cp313-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 389557cab8ff212a05f78901b7d90535dd426dca1deb161c10a98e2623bca74e
MD5 3f57c8feb742a8a893f8e40d75808592
BLAKE2b-256 a2430d17fd954dffb663571cabe5d203b1d96d6a0f11e866e52a70ec32060343

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page