Skip to main content

Async-first Celery integration for Pico-IoC. Enables defining Celery tasks as async methods on IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.

Project description

๐Ÿ“ฆ pico-celery

PyPI Ask DeepWiki License: MIT CI (tox matrix) codecov Quality Gate Status Duplicated Lines (%) Maintainability Rating Docs

Pico-Celery

Pico-Celery integrates Pico-IoC with Celery 5, giving you true inversion of control for background task execution.

It lets you define Celery tasks as async methods inside IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.

๐Ÿ Requires Python 3.11+ โšก Async-native: tasks run as real async def, with no thread pools ๐Ÿ”ง Works with Celery 5.x ๐Ÿงฉ Full constructor-based DI ๐Ÿš€ Perfect for FastAPI apps, worker daemons, and distributed pipelines

With pico-celery, you get predictable scoping, a clean separation of concerns, and a unified dependency model across HTTP, CLI, and background execution.


๐ŸŽฏ Why pico-celery?

Celery is powerful, but typical usage introduces:

  • Module-level tasks
  • Global Celery apps
  • No dependency injection
  • Shared mutable state
  • Difficult testing setups

pico-celery fixes all of that:

  • Tasks become async methods inside components
  • Dependency injection is constructor-based
  • Task handlers are resolved through Pico-IoC
  • Each execution receives a fresh instance (prototype scope)
  • Workers bootstrap the IoC container exactly once
  • No global state, no magic imports, no tight coupling
Feature Default Celery pico-celery
Task Definition Global functions Component methods
Dependency Injection None Constructor injection
State Isolation Manual Automatic (prototype scope)
Testability Hard Container-managed
Async Tasks Requires custom pools First-class async
Task Clients Manual (app.send_task) Declarative (@send_task)

๐Ÿงฑ Core Features

  • @task decorator for async component methods
  • @celery and @send_task decorators for declarative, injectable clients
  • Auto-Discovery: Automatically discovered via entry points when using pico-boot.
  • Automatic task discovery inside Pico-IoC
  • Dependency injection for all task handlers
  • Container-scoped execution (prototype by default)
  • Async-safe task execution wrappers
  • Unified config via CelerySettings
  • Method interception for client-side task sending

๐Ÿ“ฆ Installation

pip install pico-celery

You will also need:

pip install pico-ioc celery

If using Redis (recommended):

pip install celery[redis]

๐Ÿš€ Quick Example

This example shows both a worker and a client that sends the task.

1. Define a Task Component (Worker)

This component defines the task logic and its dependencies.

# my_app/tasks.py
from pico_ioc import component
from pico_celery import task
from my_app.services import UserService  # Your business logic

@component(scope="prototype")
class UserTasks:
    def __init__(self, user_service: UserService):
        self.user_service = user_service

    @task(name="tasks.create_user")
    async def create_user(self, username: str, email: str) -> dict:
        # Real async logic with injected dependencies
        user = await self.user_service.create(username, email)
        return user.to_dict()

2. Define a Task Client (Sender)

This is a declarative client that your web API (e.g., FastAPI) can inject and use.

# my_app/clients.py
from pico_celery import celery, send_task, CeleryClient

@celery  # Marks it as a pico-celery client component
class UserTaskClient(CeleryClient):

    @send_task(name="tasks.create_user")
    def create_user(self, username: str, email: str):
        # This body is never executed.
        # pico-celery intercepts the call and sends it to Celery.
        pass

3. Create the Worker Entrypoint

This file (worker.py) is what Celery will use to boot up.

# my_app/worker.py
from pico_ioc import init, configuration, DictSource
from celery import Celery

# Your application's configuration (broker, backend, etc.)
cfg = configuration(DictSource({
    "celery": {
        "broker_url": "redis://localhost:6379/0",
        "backend_url": "redis://localhost:6379/1"
    }
}))

# Modules to scan for @component, @task, @celery
modules = [
    "pico_celery",
    "my_app.services",
    "my_app.tasks",
    "my_app.clients"
]

# Initialize the container
container = init(modules=modules, config=cfg)

# Get the IoC-managed Celery app
# The PicoTaskRegistrar has already found and registered
# the 'tasks.create_user' task.
celery_app = container.get(Celery)

4. Run the Worker

You will need an async pool like eventlet or gevent.

# Install the pool: pip install eventlet
celery -A my_app.worker:celery_app worker -P eventlet -l info

5. Use the Client in your API

Your web API (e.g., FastAPI) can now inject the UserTaskClient and use it.

# my_app/main.py
from fastapi import FastAPI
from pico_ioc import init
from my_app.clients import UserTaskClient
from my_app.worker import container  # Reuse the worker's container

app = FastAPI()

@app.post("/users/")
async def create_user_endpoint(username: str, email: str):
    # Resolve the client from the container
    client = await container.aget(UserTaskClient)
    
    # Call the client method
    # This sends the task to Celery and returns an AsyncResult
    result = client.create_user(username, email)
    
    return {"message": "Task submitted", "task_id": result.id}

๐Ÿ”Œ Even Simpler with pico-boot

If you use pico-boot, you don't need to register "pico_celery" in your modules list. It is automatically discovered via entry points:

from pico_boot import init

# pico_celery is automatically loaded โ€” no need to include it in modules!
container = init(modules=["my_app.services", "my_app.tasks", "my_app.clients"], config=cfg)

๐Ÿ”„ Task Execution Semantics (Worker)

When Celery receives a task:

Celery Worker
     โ†“
Async Wrapper (generated by PicoTaskRegistrar)
     โ†“
await container.aget(UserTasks)  (Resolves component + dependencies)
     โ†“
component_instance.create_user(...) (Executes your async method)
     โ†“
await self.user_service.create(...)
     โ†“
'prototype' scope is destroyed

Key benefits:

  • True async execution.
  • No global state.
  • Fully injected services.
  • Guaranteed isolation via prototype scope.

๐Ÿงช Testing with Pico-IoC

You can test your task logic just like any other component, with no Celery worker needed.

import pytest
from pico_ioc import init, configuration, DictSource
from my_app.tasks import UserTasks
from unittest.mock import AsyncMock, MagicMock

# Mock the dependencies
@pytest.fixture
def mock_user_service():
    service = AsyncMock()
    service.create.return_value = MagicMock(to_dict=lambda: {"id": 1})
    return service

@pytest.mark.asyncio
async def test_user_task_logic(mock_user_service):
    cfg = configuration(DictSource({}))
    
    # Initialize the container with only the task
    container = init(modules=[UserTasks], config=cfg)
    
    # Register the mocked dependency
    container.register_instance(mock_user_service)

    # Resolve the task component
    task_component = await container.aget(UserTasks)
    
    # Call the async method directly
    result = await task_component.create_user("test", "test@example.com")

    # Assert the logic
    assert result == {"id": 1}
    mock_user_service.create.assert_called_with("test", "test@example.com")
    
    await container.cleanup_all_async()

โš™๏ธ How It Works

  • @task (in decorators.py) flags async methods inside components.
  • PicoTaskRegistrar (in registrar.py) is a component that scans IoC metadata upon configuration.
  • For each @task method found, it generates an async wrapper.
  • This wrapper is what gets registered with Celery (celery_app.task(...)).
  • When Celery executes the task, it invokes the wrapper, which in turn uses await container.aget(Component) to get a fresh instance (thanks to prototype) and then calls your original method, ensuring DI.
  • @send_task (in client.py) flags methods on client classes.
  • @celery (in client.py) applies an interceptor (CeleryClientInterceptor) to all methods flagged with @send_task.
  • When you call a client method (e.g., client.create_user(...)), the interceptor activates, extracts the @send_task metadata (like the task name) and the call arguments, and executes self._celery.send_task(...) on your behalf.

๐Ÿ’ก Architecture Overview

pico-celery manages both sides: the Worker (execution) and the Client (sending).

Worker Flow (Task Execution)

       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚        Celery Worker        โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
         Async Wrapper (from pico-celery)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚         pico-celery       โ”‚
       โ”‚  (@task, Registrar, Scopes) โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
          IoC Resolution (await aget)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚           Pico-IoC        โ”‚
       โ”‚ (Container, Scopes, DI)   โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
      Your Business Logic (Services, Repos)

Client Flow (Task Sending)

       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚      Your App (e.g., FastAPI) โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
     Call to: client.create_user(...)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚         pico-celery       โ”‚
       โ”‚ (@celery, @send_task, Interceptor)
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
      Intercepts call and transforms it into:
      celery_app.send_task("tasks.create_user", ...)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚       Broker (e.g., Redis)  โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿค– Claude Code Skills

This project includes pre-designed skills for Claude Code, enabling AI-assisted development with pico-celery patterns.

Skill Command Description
Pico Celery Task /pico-celery-task Creates Celery tasks integrated with pico-ioc
Pico Test Generator /pico-tests Generates tests for pico-framework components

See Skills documentation for full details and installation instructions.


๐Ÿ“ License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pico_celery-0.2.1.tar.gz (33.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pico_celery-0.2.1-py3-none-any.whl (11.2 kB view details)

Uploaded Python 3

File details

Details for the file pico_celery-0.2.1.tar.gz.

File metadata

  • Download URL: pico_celery-0.2.1.tar.gz
  • Upload date:
  • Size: 33.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for pico_celery-0.2.1.tar.gz
Algorithm Hash digest
SHA256 879a405942f48ba9a597e42bb21c916049a19fdee6aaa6be7df476572401bb5a
MD5 547c760c7a3cbdc1e73c1370dad017d3
BLAKE2b-256 d1087388b82da7f415fafd4ec89e18bf8491483dcc2d3debc3f3dc37993e2ca3

See more details on using hashes here.

File details

Details for the file pico_celery-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: pico_celery-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 11.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for pico_celery-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c9e65c9415532451c7af3598abbccbc67d88f0325997bad0c6eb0b275f4ea605
MD5 a245c4482287c83cf01306c39bde912e
BLAKE2b-256 514de9e04f171fc11500866565105db42201abfabc9bafb0f668c786cdd2cc54

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page