Skip to main content

distributed background task system for python functions

Project description

shadows represents a sick distributed background task processing framework designed specifically for python applications, emphasizing seamless scheduling capabilities for both immediate and future computational workloads with exceptional efficiency.

installing shadows

shadow is available on PyPI under the package name shadow-task. it targets python 3.12 or above.

with uv:

uv pip install shadow-task

or

uv add shadow-task

with pip:

pip install shadow-task

shadow requires a redis server with streams support (introduced in redis 5.0.0). shadow is thoroughly tested with redis 6 and 7, ensuring compatibility and reliability.

at a glance

from datetime import datetime, timedelta, timezone

from shadows import Shadow


async def greet(name: str, greeting="Hello") -> None:
    print(f"{greeting}, {name} at {datetime.now()}!")


async with Shadow() as shadows:
    await shadows.add(greet)("rohit")

    now = datetime.now(timezone.utc)
    soon = now + timedelta(seconds=3)
    await shadows.add(greet, when=soon)("rahul", greeting="what'sgood")
from shadows import Shadow, Worker

async with Shadow() as shadows:
    async with Worker(shadows) as worker:
        await worker.run_until_finished()
Hello, rohit at 2025-03-05 13:58:21.552644!
what'sgood, rahul at 2025-03-05 13:58:24.550773!

how shadows works

shadows operates on a redis-based architecture that provides reliable distributed task processing with at-least-once delivery semantics. the system uses two primary redis data structures:

  • redis streams: handle immediate task execution with consumer groups ensuring each task is processed by exactly one worker
  • redis sorted sets: manage scheduled tasks with execution timestamps, automatically moved to streams when ready

workers continuously poll the redis stream for available tasks and move scheduled tasks from the sorted set to the stream when their execution time arrives. this movement is performed atomically using lua scripts to ensure consistency.

advanced examples

retry functionality with exponential backoff

from shadows import Retry, ExponentialRetry

async def flaky_api_call(
    url: str,
    retry: ExponentialRetry = ExponentialRetry(
        attempts=5,
        minimum_delay=timedelta(seconds=1),
        maximum_delay=timedelta(minutes=5)
    )
) -> None:
    # retries with delays: 1s, 2s, 4s, 8s, 16s (capped at 5 minutes)
    response = await http_client.get(url)
    response.raise_for_status()
    print(f"api call succeeded on attempt {retry.attempt}")

perpetual tasks with self-cancellation

from shadows import Perpetual

async def monitor_deployment(
    deployment_id: str,
    perpetual: Perpetual = Perpetual(every=timedelta(seconds=30))
) -> None:
    status = await check_deployment_status(deployment_id)

    if status in ["completed", "failed"]:
        await notify_deployment_finished(deployment_id, status)
        perpetual.cancel()  # stop monitoring this deployment
        return

    print(f"deployment {deployment_id} status: {status}")

task chaining with dependency injection

from shadows import CurrentShadow

async def process_user_data(
    user_id: int,
    shadows: Shadow = CurrentShadow()
) -> None:
    print(f"processing user {user_id}")
    
    # simulate some work
    await asyncio.sleep(0.1)
    
    # schedule follow-up tasks
    await shadows.add(send_notification)(user_id, "processing completed")
    await shadows.add(update_analytics)(user_id)

idempotent task scheduling with custom keys

async def process_order(order_id: int) -> None:
    print(f"processing order {order_id}")

async with Shadow() as shadows:
    # only one task per order_id can be scheduled
    key = f"process-order-{order_id}"
    await shadows.add(process_order, key=key)(order_id)
    
    # duplicate scheduling is ignored
    await shadows.add(process_order, key=key)(order_id)  # ignored

command line interface

shadows provides a comprehensive cli for managing tasks and workers:

basic cli operations

# start a worker with custom tasks
shadows worker --tasks myapp.tasks:all_tasks --concurrency 5

# view current shadows state
shadows snapshot --shadows my-shadows

# clear all pending tasks
shadows clear --shadows my-shadows

# add built-in trace tasks for debugging
shadows tasks trace "system startup completed"

# list active workers
shadows workers ls --shadows my-shadows

production worker configuration

shadows worker \
  --shadows orders \
  --url redis://redis.prod.com:6379/0 \
  --name orders-worker-1 \
  --concurrency 50 \
  --redelivery-timeout 10m \
  --healthcheck-port 8080 \
  --metrics-port 9090 \
  --logging-format json \
  --tasks myapp.tasks:production_tasks

testing and development

# run tests with fast polling for quick feedback
shadows worker \
  --concurrency 10 \
  --minimum-check-interval 50ms \
  --scheduling-resolution 100ms \
  --tasks tests.tasks:test_tasks

testing with shadows

shadows includes powerful testing utilities that make it easy to test complex distributed workflows:

pytest integration

import pytest
from shadows import Shadow, Worker

@pytest.fixture
async def test_shadows():
    async with Shadow(name=f"test-{uuid4()}") as shadows:
        yield shadows
        await shadows.clear()

async def test_order_processing(test_shadows: Shadow):
    test_shadows.register(process_order)
    test_shadows.register(send_confirmation)
    
    await test_shadows.add(process_order)(order_id=123)
    
    async with Worker(test_shadows) as worker:
        await worker.run_until_finished()
    
    assert order_is_processed(123)
    assert confirmation_was_sent(123)

controlling perpetual tasks

async def test_perpetual_monitoring(test_shadows: Shadow):
    test_shadows.register(health_check_service)
    
    await test_shadows.add(health_check_service)("https://api.example.com")
    
    async with Worker(test_shadows) as worker:
        # let health check run 3 times, then stop
        await worker.run_at_most({"health_check_service": 3})
    
    assert health_check_call_count == 3

why shadows?

lightning-fast one-way background task processing devoid of unnecessary complexity

seamless scheduling of immediate or future workloads through a unified interface

bypass problematic tasks or parameters without requiring code redeployment

purpose-built architecture optimized for redis streams

comprehensive type safety and type awareness for all background task functions

sophisticated dependency injection system similar to fastapi, typer, and fastmcp for reusable resources

hacking on shadows

we use uv for project management, so getting set up should be as simple as cloning the repo and running:

uv sync

then to run the test suite:

pytest

we aim to maintain 100% test coverage, which is required for all prs to shadows. we believe that shadows should stay small, simple, understandable, and reliable, and that begins with testing all the dusty branches and corners. and thanks dockettho. this will give us the confidence to upgrade dependencies quickly and to adapt to new versions of redis over time.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shadow_task-0.1.3.tar.gz (141.5 kB view details)

Uploaded Source

File details

Details for the file shadow_task-0.1.3.tar.gz.

File metadata

  • Download URL: shadow_task-0.1.3.tar.gz
  • Upload date:
  • Size: 141.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for shadow_task-0.1.3.tar.gz
Algorithm Hash digest
SHA256 c12ddaeaad479658687f6f8ae36d49b45db15d9bc574f5f6fe3dac61d1317c8e
MD5 fa840f796471e794f1d35317ee531b8c
BLAKE2b-256 1eac5ea1da0c260fb213209826f7813adf70e99523669777ac5996253788b817

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page