Type-safe Pydantic serialization for Celery tasks
Project description
celery-typed
Type-safe Pydantic serialization for Celery tasks
A low-friction library that enables seamless Pydantic model serialization in Celery tasks using Kombu's preserializer functionality. Pass Pydantic models directly to your Celery tasks without any special decorators or manual serialization.
Features
- ✅ Zero-friction integration - No special task decorators or parameters needed
- ✅ Type-safe serialization - Full Pydantic model support with proper type hints
- ✅ Automatic registration - One-line setup in your Celery configuration
- ✅ Production ready - Based on Kombu's proven serialization system
- ✅ Extensible - Clean protocol-based design for custom preserializers
Installation
pip install celery-typed
Or with uv:
uv add celery-typed
Quick Start
1. Register the preserializer in your Celery app
# core/celery.py
from celery import Celery
from celery_typed import register_pydantic_serializer
app = Celery("myapp")
register_pydantic_serializer() # One line setup!
# ... rest of your Celery configuration
2. Use Pydantic models directly in tasks
from pydantic import BaseModel
from myapp.celery import app
class UserModel(BaseModel):
id: int
name: str
email: str
class ProcessingResult(BaseModel):
status: str
message: str
user_id: int
@app.task
def process_user(user: UserModel) -> ProcessingResult:
# Your task logic here
return ProcessingResult(
status="success",
message=f"Processed user {user.name}",
user_id=user.id
)
3. Call tasks with Pydantic model instances
# Pass model instances directly
user = UserModel(id=1, name="Alice", email="alice@example.com")
result = process_user.delay(user)
# Get back a proper Pydantic model instance
processing_result: ProcessingResult = result.get()
print(processing_result.status) # "success"
Comparison with Official Pydantic Support
Before (Official Celery + Pydantic)
@app.task(pydantic=True) # Must remember this decorator
def process_user(user_data: dict) -> dict: # Work with dicts
user = UserModel(**user_data) # Manual conversion
# ... process user
return result.model_dump() # Manual serialization
# Usage
result = process_user.delay({'id': 1, 'name': 'Alice'}) # Pass dict
result_dict = result.get() # Get back dict
result_obj = ProcessingResult(**result_dict) # Manual conversion
After (celery-typed)
@app.task # No special decorator needed
def process_user(user: UserModel) -> ProcessingResult: # Real type hints
# ... process user
return result # Return model directly
# Usage
user = UserModel(id=1, name='Alice', email='alice@example.com')
result = process_user.delay(user) # Pass model instance
result_obj = result.get() # Get back model instance
How It Works
celery-typed uses Kombu's register_type functionality to automatically serialize and deserialize Pydantic models. When you pass a Pydantic model to a task:
-
Serialization: The model is packed into a JSON-serializable format containing:
- Module path and class name for reconstruction
- The model's data via
model_dump()
-
Transmission: Standard Celery message passing (no changes needed)
-
Deserialization: The worker reconstructs the original model instance using the stored class information and data
This approach is based on the excellent implementation described in Dosu's blog post.
API Reference
register_pydantic_serializer()
Convenience function to register Pydantic model serialization for all BaseModel subclasses.
from celery_typed import register_pydantic_serializer
# Call once in your Celery app configuration
register_pydantic_serializer()
register_preserializer(preserializer)
Decorator factory for registering custom preserializers.
from celery_typed import register_preserializer, Preserializer
class MyCustomPreserializer:
@classmethod
def compatible_with(cls, type_: type) -> bool:
return isinstance(type_, MyCustomType)
@classmethod
def pack(cls, obj):
return {"data": obj.serialize()}
@classmethod
def unpack(cls, data):
return MyCustomType.deserialize(data["data"])
# Register for your custom type
register_preserializer(MyCustomPreserializer)(MyCustomType)
Preserializer Protocol
Protocol defining the interface for custom preserializers:
class Preserializer(Protocol):
@classmethod
def compatible_with(cls, type_: type) -> Literal[True]:
"""Check if type is compatible with this preserializer"""
@classmethod
def pack(cls, obj: Any) -> Any:
"""Pack object into JSON-serializable format"""
@classmethod
def unpack(cls, data: Any) -> object:
"""Unpack data back into original object"""
Requirements
- Python 3.10+
- Celery 5.2+
- Kombu 5.2+
- Pydantic 2.0+
Development
This project uses uv for dependency management. To set up a development environment:
# Clone the repository
git clone https://github.com/nwcell/celery-typed.git
cd celery-typed
# Install dependencies
uv sync --dev
# Run tests
uv run pytest
# Run linting
uv run ruff check
uv run ruff format
# Type checking
uv run mypy src/
Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Make sure to:
- Add tests for any new functionality
- Update documentation as needed
- Follow the existing code style (enforced by Ruff)
- Ensure all tests and type checks pass
License
This project is licensed under the MIT License - see the LICENSE file for details.
Credits
- Inspired by Dosu's blog post on Celery preserializers
- Built on top of Celery and Pydantic
- Uses Kombu's type registration system
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_typed-0.2.0.tar.gz.
File metadata
- Download URL: celery_typed-0.2.0.tar.gz
- Upload date:
- Size: 48.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1e59bd6df5cb92d5645bd21194322182a4bfb7da8cdd93d161a7bb8c1c8a16a
|
|
| MD5 |
2c6eb87a2855a626863aabdf60178f71
|
|
| BLAKE2b-256 |
30949ec81bb24957bf1a87b1f7b6f72e5c53f004d3f8f97072a98b3381fb818e
|
File details
Details for the file celery_typed-0.2.0-py3-none-any.whl.
File metadata
- Download URL: celery_typed-0.2.0-py3-none-any.whl
- Upload date:
- Size: 8.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5930864be79e0643a0f0a6d855b3ff8e2e4e319779e5b38747a6fe92a0ceb335
|
|
| MD5 |
6871ed9e90057bc3b7b43b764d4b842a
|
|
| BLAKE2b-256 |
85bd58d82660045193331b7e86cbebf1769d69cd5d0dba901f6344de6bf894ea
|