Skip to main content

Async-first Celery integration for Pico-IoC. Enables defining Celery tasks as async methods on IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.

Project description

๐Ÿ“ฆ pico-celery

PyPI Ask DeepWiki License: MIT CI (tox matrix) codecov Quality Gate Status Duplicated Lines (%) Maintainability Rating

Pico-Celery

Pico-Celery integrates Pico-IoC with Celery 5, giving you true inversion of control for background task execution.

It lets you define Celery tasks as async methods inside IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.

๐Ÿ Requires Python 3.10+ โšก Async-native: tasks run as real async def, with no thread pools ๐Ÿ”ง Works with Celery 5.x ๐Ÿงฉ Full constructor-based DI ๐Ÿš€ Perfect for FastAPI apps, worker daemons, and distributed pipelines

With pico-celery, you get predictable scoping, a clean separation of concerns, and a unified dependency model across HTTP, CLI, and background execution.


๐ŸŽฏ Why pico-celery?

Celery is powerful, but typical usage introduces:

  • Module-level tasks
  • Global Celery apps
  • No dependency injection
  • Shared mutable state
  • Difficult testing setups

pico-celery fixes all of that:

  • Tasks become async methods inside components
  • Dependency injection is constructor-based
  • Task handlers are resolved through Pico-IoC
  • Each execution receives a fresh instance (prototype scope)
  • Workers bootstrap the IoC container exactly once
  • No global state, no magic imports, no tight coupling
Feature Default Celery pico-celery
Task Definition Global functions Component methods
Dependency Injection None Constructor injection
State Isolation Manual Automatic (prototype scope)
Testability Hard Container-managed
Async Tasks Requires custom pools First-class async
Task Clients Manual (app.send_task) Declarative (@send_task)

๐Ÿงฑ Core Features

  • @task decorator for async component methods
  • @celery and @send_task decorators for declarative, injectable clients
  • Auto-Discovery: Automatically loaded when using pico-stack (v0.2.0+).
  • Automatic task discovery inside Pico-IoC
  • Dependency injection for all task handlers
  • Container-scoped execution (prototype by default)
  • Async-safe task execution wrappers
  • Unified config via CelerySettings
  • Method interception for client-side task sending

๐Ÿ“ฆ Installation

pip install pico-celery

You will also need:

pip install pico-ioc celery

If using Redis (recommended):

pip install celery[redis]

๐Ÿ”Œ Zero-Config with Pico-Stack

If you are using pico-stack, you do not need to manually register "pico_celery" in your modules list. It is automatically discovered via entry points.

from pico_stack import init

# pico_celery is automatically loaded!
container = init(modules=[__name__, "my_app"])

If you are using standard pico-ioc, follow the manual registration in the example below.


๐Ÿš€ Quick Example

This example shows both a worker and a client that sends the task.

1. Define a Task Component (Worker)

This component defines the task logic and its dependencies.

# my_app/tasks.py
from pico_ioc import component
from pico_celery import task
from my_app.services import UserService  # Your business logic

@component(scope="prototype")
class UserTasks:
    def __init__(self, user_service: UserService):
        self.user_service = user_service

    @task(name="tasks.create_user")
    async def create_user(self, username: str, email: str) -> dict:
        # Real async logic with injected dependencies
        user = await self.user_service.create(username, email)
        return user.to_dict()

2. Define a Task Client (Sender)

This is a declarative client that your web API (e.g., FastAPI) can inject and use.

# my_app/clients.py
from pico_celery import celery, send_task, CeleryClient

@celery  # Marks it as a pico-celery client component
class UserTaskClient(CeleryClient):

    @send_task(name="tasks.create_user")
    def create_user(self, username: str, email: str):
        # This body is never executed.
        # pico-celery intercepts the call and sends it to Celery.
        pass

3. Create the Worker Entrypoint

This file (worker.py) is what Celery will use to boot up.

# my_app/worker.py
from pico_ioc import init, configuration, DictSource
from celery import Celery

# Your application's configuration (broker, backend, etc.)
cfg = configuration(DictSource({
    "celery": {
        "broker_url": "redis://localhost:6379/0",
        "backend_url": "redis://localhost:6379/1"
    }
}))

# Modules to scan for @component, @task, @celery
# NOTE: If using pico-stack, 'pico_celery' can be omitted here.
modules = [
    "pico_celery",
    "my_app.services",
    "my_app.tasks",
    "my_app.clients"
]

# Initialize the container
container = init(modules=modules, config=cfg)

# Get the IoC-managed Celery app
# The PicoTaskRegistrar has already found and registered
# the 'tasks.create_user' task.
celery_app = container.get(Celery)

4. Run the Worker

You will need an async pool like eventlet or gevent.

# Install the pool: pip install eventlet
celery -A my_app.worker:celery_app worker -P eventlet -l info

5. Use the Client in your API

Your web API (e.g., FastAPI) can now inject the UserTaskClient and use it.

# my_app/main.py
from fastapi import FastAPI
from pico_ioc import init
from my_app.clients import UserTaskClient
from my_app.worker import container  # Reuse the worker's container

app = FastAPI()

@app.post("/users/")
async def create_user_endpoint(username: str, email: str):
    # Resolve the client from the container
    client = await container.aget(UserTaskClient)
    
    # Call the client method
    # This sends the task to Celery and returns an AsyncResult
    result = client.create_user(username, email)
    
    return {"message": "Task submitted", "task_id": result.id}

๐Ÿ”„ Task Execution Semantics (Worker)

When Celery receives a task:

Celery Worker
     โ†“
Async Wrapper (generated by PicoTaskRegistrar)
     โ†“
await container.aget(UserTasks)  (Resolves component + dependencies)
     โ†“
component_instance.create_user(...) (Executes your async method)
     โ†“
await self.user_service.create(...)
     โ†“
'prototype' scope is destroyed

Key benefits:

  • True async execution.
  • No global state.
  • Fully injected services.
  • Guaranteed isolation via prototype scope.

๐Ÿงช Testing with Pico-IoC

You can test your task logic just like any other component, with no Celery worker needed.

import pytest
from pico_ioc import init, configuration, DictSource
from my_app.tasks import UserTasks
from unittest.mock import AsyncMock, MagicMock

# Mock the dependencies
@pytest.fixture
def mock_user_service():
    service = AsyncMock()
    service.create.return_value = MagicMock(to_dict=lambda: {"id": 1})
    return service

@pytest.mark.asyncio
async def test_user_task_logic(mock_user_service):
    cfg = configuration(DictSource({}))
    
    # Initialize the container with only the task
    container = init(modules=[UserTasks], config=cfg)
    
    # Register the mocked dependency
    container.register_instance(mock_user_service)

    # Resolve the task component
    task_component = await container.aget(UserTasks)
    
    # Call the async method directly
    result = await task_component.create_user("test", "test@example.com")

    # Assert the logic
    assert result == {"id": 1}
    mock_user_service.create.assert_called_with("test", "test@example.com")
    
    await container.cleanup_all_async()

โš™๏ธ How It Works

  • @task (in decorators.py) flags async methods inside components.
  • PicoTaskRegistrar (in registrar.py) is a component that scans IoC metadata upon configuration.
  • For each @task method found, it generates an async wrapper.
  • This wrapper is what gets registered with Celery (celery_app.task(...)).
  • When Celery executes the task, it invokes the wrapper, which in turn uses await container.aget(Component) to get a fresh instance (thanks to prototype) and then calls your original method, ensuring DI.
  • @send_task (in client.py) flags methods on client classes.
  • @celery (in client.py) applies an interceptor (CeleryClientInterceptor) to all methods flagged with @send_task.
  • When you call a client method (e.g., client.create_user(...)), the interceptor activates, extracts the @send_task metadata (like the task name) and the call arguments, and executes self._celery.send_task(...) on your behalf.

๐Ÿ’ก Architecture Overview

pico-celery manages both sides: the Worker (execution) and the Client (sending).

Worker Flow (Task Execution)

       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚        Celery Worker        โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
         Async Wrapper (from pico-celery)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚         pico-celery       โ”‚
       โ”‚  (@task, Registrar, Scopes) โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
          IoC Resolution (await aget)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚           Pico-IoC        โ”‚
       โ”‚ (Container, Scopes, DI)   โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
      Your Business Logic (Services, Repos)

Client Flow (Task Sending)

       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚      Your App (e.g., FastAPI) โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
     Call to: client.create_user(...)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚         pico-celery       โ”‚
       โ”‚ (@celery, @send_task, Interceptor)
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
      Intercepts call and transforms it into:
      celery_app.send_task("tasks.create_user", ...)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚       Broker (e.g., Redis)  โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pico_celery-0.2.0.tar.gz (25.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pico_celery-0.2.0-py3-none-any.whl (11.1 kB view details)

Uploaded Python 3

File details

Details for the file pico_celery-0.2.0.tar.gz.

File metadata

  • Download URL: pico_celery-0.2.0.tar.gz
  • Upload date:
  • Size: 25.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pico_celery-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d9267c0313bac689df62f208aa1fa92bfb8ca6833a4d7b06b18ea291dae238ba
MD5 11449f948a9ca20c453fdddefd8ca8d5
BLAKE2b-256 f58cdabfb6f5bb9cb45fbf3b47cc5a3686b769fe2fb6fba01708a0bcb799eb0a

See more details on using hashes here.

File details

Details for the file pico_celery-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pico_celery-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pico_celery-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0d30a880d5c22d4ce43c201368d685da3131a2458e0d4e70bdb84d6cf3807f9e
MD5 224cc9eec0d632bcdf2f7673ed4bcad3
BLAKE2b-256 66bb3115cd74bf3161df4491ba80b4b65c5cc97a762a98dd4912277116ea5114

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page