Intelligent batch processing tool for Replicate models with automatic fallback mechanisms
Project description
Replicate Batch Process
中文版 README | English | PyPI Package
Intelligent batch processing tool for Replicate models with automatic fallback mechanisms and concurrent processing.
✨ Key Features
- 🔄 Intelligent Fallback System - Automatic model switching on incompatibility
- ⚡ Smart Concurrency Control - Adaptive rate limiting and batch processing
- 🎯 Three Usage Modes - Single, batch same-model, and mixed-model processing
- 📝 Custom File Naming - Ordered output with correspondence control
- 🛡️ Error Resilience - Comprehensive retry and recovery mechanisms
- ✅ Model Validation - Automatic detection of unsupported models with clear error messages
📋 Requirements
System Requirements
- Python: 3.8, 3.9, 3.10, 3.11, or 3.12
- Operating System: Windows, macOS, Linux
- Memory: Minimum 4GB RAM recommended
Dependencies
replicate>=0.15.0 # Replicate API client
requests>=2.25.0 # HTTP library
asyncio-throttle>=1.0.2 # Rate limiting for async operations
python-dotenv>=0.19.0 # Environment variable management
API Requirements
- Replicate API Token: Required (Get one here)
- Network: Stable internet connection for API calls
- Rate Limits: 600 requests/minute (shared across all models)
📦 Installation
pip install replicate-batch-process
🚀 Quick Start
1. Set up API Token
# Option 1: Interactive setup
replicate-init
# Option 2: Manual setup
export REPLICATE_API_TOKEN="your-token-here"
# Option 3: .env file
echo "REPLICATE_API_TOKEN=your-token-here" > .env
2. Single Image Generation
from replicate_batch_process import replicate_model_calling
file_paths = replicate_model_calling(
prompt="A beautiful sunset over mountains",
model_name="qwen/qwen-image", # Use supported model
output_filepath="output/sunset.jpg"
)
3. Batch Processing (Async Required)
Basic Batch Processing
import asyncio
from replicate_batch_process import intelligent_batch_process
async def main():
# Process multiple prompts with the same model
files = await intelligent_batch_process(
prompts=["sunset", "city", "forest"],
model_name="qwen/qwen-image",
max_concurrent=8,
output_filepath=["output/sunset.png", "output/city.png", "output/forest.png"]
)
print(f"Generated {len(files)} images:")
for file in files:
print(f" - {file}")
return files
# Run the async function
if __name__ == "__main__":
results = asyncio.run(main())
Advanced Mixed-Model Batch Processing
import asyncio
from replicate_batch_process import IntelligentBatchProcessor, BatchRequest
async def advanced_batch():
processor = IntelligentBatchProcessor()
# Create requests with different models and parameters
requests = [
BatchRequest(
prompt="A futuristic city",
model_name="qwen/qwen-image",
output_filepath="output/city.png",
kwargs={"aspect_ratio": "16:9"}
),
BatchRequest(
prompt="A magical forest",
model_name="google/imagen-4-ultra",
output_filepath="output/forest.png",
kwargs={"output_quality": 90}
),
BatchRequest(
prompt="Character with red hair",
model_name="black-forest-labs/flux-kontext-max",
output_filepath="output/character.png",
kwargs={"input_image": "reference.jpg"}
)
]
# Process all requests concurrently
results = await processor.process_intelligent_batch(requests, max_concurrent=5)
# Handle results
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
print(f"✅ Success: {len(successful)}/{len(results)}")
for result in failed:
print(f"❌ Failed: {result.error}")
return results
# Run with proper async context
if __name__ == "__main__":
asyncio.run(advanced_batch())
📋 Supported Models
Image Generation Models
| Model | Price | Specialization | Reference Image Support |
|---|---|---|---|
| black-forest-labs/flux-dev | $0.025 | Fast generation, minimal censorship | ❌ |
| black-forest-labs/flux-kontext-max | $0.08 | Image editing, character consistency | ✅ |
| qwen/qwen-image | $0.025 | Text rendering, cover images | ❌ |
| google/imagen-4-ultra | $0.06 | High-quality detailed images | ❌ |
Video Generation Models
| Model | Price | Specialization | Reference Image Support |
|---|---|---|---|
| google/veo-3-fast | $3.32/call | Fast video with audio | ✅ |
| kwaivgi/kling-v2.1-master | $0.28/sec | 1080p video, 5-10 second duration | ✅ |
⚠️ Note: Using unsupported models will return a clear error message: "Model '{model_name}' is not supported. Please use one of the supported models listed above."
🔄 Intelligent Fallback System
Automatic model switching when issues arise:
Reference Image Auto-Detection
# User provides reference image to non-supporting model
replicate_model_calling(
prompt="Generate based on this image",
model_name="black-forest-labs/flux-dev", # Doesn't support reference images
input_image="path/to/image.jpg" # → Auto-switches to flux-kontext-max
)
Parameter Compatibility Handling
# Unsupported parameters automatically cleaned and model switched
replicate_model_calling(
prompt="Generate image",
model_name="black-forest-labs/flux-kontext-max",
guidance=3.5, # Unsupported parameter
num_outputs=2 # → Auto-switches to compatible model
)
API Error Recovery
Automatic fallback chain: Flux Dev → Qwen Image → Imagen 4 Ultra
📋 Usage Scenarios
| Mode | Use Case | Command |
|---|---|---|
| Single | One-off generation, testing | replicate_model_calling() |
| Batch Same | Multiple prompts, same model | intelligent_batch_process() |
| Mixed Models | Different models/parameters | IntelligentBatchProcessor() |
🧠 Smart Processing Strategies
The system automatically selects optimal processing strategy:
- Immediate Processing: Tasks ≤ available quota → Full concurrency
- Window Processing: Tasks ≤ 600 but > current quota → Wait then batch
- Dynamic Queue: Tasks > 600 → Continuous processing with queue management
⚙️ Configuration
API Keys
Get your Replicate API token: replicate.com/account/api-tokens
Custom Fallback Rules
Modify config.py:
FALLBACK_MODELS = {
'your-model': {
'fail': {
'fallback_model': 'backup-model',
'condition': 'api_error'
}
}
}
⚠️ Common Pitfalls
- Async/Await: Batch functions must be called within async context
- Model Names: Use exact model names from supported list above
- File Paths: Ensure output directories exist before processing
- Rate Limits: Keep max_concurrent ≤ 15 to avoid 429 errors
📊 Performance Benchmarks
Real-World Test Results (v1.0.7)
Tested on: Python 3.9.16, macOS, Replicate API
| Task | Model | Time | Success Rate | Notes |
|---|---|---|---|---|
| Single Image | qwen/qwen-image | 11.7s | 100% | Fastest for single generation |
| Batch (3 images) | qwen/qwen-image | 23.2s | 100% | ~7.7s per image with concurrency |
| Batch (10 images) | qwen/qwen-image | 45s | 100% | Optimal with max_concurrent=8 |
| Mixed Models (3) | Various | 28s | 100% | Parallel processing advantage |
| With Fallback | flux-kontext → qwen | 15s | 100% | Includes fallback overhead |
| Large Batch (50) | qwen/qwen-image | 3.5min | 98% | 2% retry on rate limits |
Concurrency Performance
| Concurrent Tasks | Time per Image | Efficiency | Recommended For |
|---|---|---|---|
| 1 (Sequential) | 12s | Baseline | Testing/Debug |
| 5 | 4.8s | 250% faster | Conservative usage |
| 8 | 3.2s | 375% faster | Optimal balance |
| 12 | 2.8s | 428% faster | Aggressive, risk of 429 |
| 15+ | Variable | Diminishing returns | Not recommended |
📊 Rate Limiting
- Replicate API: 600 requests/minute (shared across all models)
- Recommended Concurrency: 5-8 (conservative) to 12 (aggressive)
- Auto-Retry: Built-in 429 error handling with exponential backoff
⚠️ Known Issues & Workarounds
Fixed in v1.0.7
✅ FileOutput Handling Bug (v1.0.2-1.0.6)
- Issue: Kontext Max model created 814 empty files when returning single image
- Root Cause: Incorrect iteration over bytes instead of file object
- Fix: Added intelligent output type detection with
hasattr(output, 'read')check - Status: ✅ Fully resolved
✅ Parameter Routing Bug (v1.0.2-1.0.6)
- Issue:
output_filepathincorrectly placed in kwargs for batch processing - Fix: Corrected parameter assignment in BatchRequest
- Status: ✅ Fully resolved
Current Limitations (v1.0.7)
⚠️ Kontext Max Input Image
- Issue: Input image parameter sometimes fails with Kontext Max model
- Workaround: Automatic fallback to Qwen model (transparent to user)
- Impact: Minor - generation still succeeds via fallback
- Fix Timeline: Investigating for v1.0.8
ℹ️ Rate Limiting on Large Batches
- Issue: Batches >50 may hit rate limits even with throttling
- Workaround: Use chunking strategy (see Best Practices)
- Impact: Minor - automatic retry handles most cases
Reporting Issues
Found a bug? Please report at: https://github.com/preangelleo/replicate_batch_process/issues
📦 Migration Guide
Upgrading from v1.0.6 → v1.0.7
pip install --upgrade replicate-batch-process==1.0.7
Changes:
- ✅ Fixed FileOutput handling bug (814 empty files issue)
- ✅ Enhanced README documentation
- ✅ No API changes - drop-in replacement
Action Required: None - fully backward compatible
Upgrading from v1.0.2-1.0.5 → v1.0.7
pip install --upgrade replicate-batch-process==1.0.7
Critical Fixes:
- intelligent_batch_process parameter bug - Now correctly handles output_filepath
- FileOutput compatibility - No more empty file creation
- Model validation - Clear error messages for unsupported models
Code Changes Needed: None, but review error handling for better messages
Upgrading from v0.x → v1.0.7
Breaking Changes:
- Package renamed from
replicate_batchtoreplicate-batch-process - New import structure:
# Old (v0.x) from replicate_batch import process_batch # New (v1.0.7) from replicate_batch_process import intelligent_batch_process
Migration Steps:
- Uninstall old package:
pip uninstall replicate_batch - Install new package:
pip install replicate-batch-process - Update imports in your code
- Test with small batches first
Version History
| Version | Release Date | Key Changes |
|---|---|---|
| v1.0.7 | 2025-01-05 | FileOutput fix, README improvements |
| v1.0.6 | 2025-01-05 | Bug fixes, model validation |
| v1.0.5 | 2025-01-04 | Parameter handling improvements |
| v1.0.4 | 2025-01-04 | Model support documentation |
| v1.0.3 | 2025-01-03 | Initial stable release |
💡 Best Practices
# For large batches, use chunking
def process_large_batch(prompts, chunk_size=50):
for chunk in chunks(prompts, chunk_size):
files = await intelligent_batch_process(chunk, model_name)
yield files
# Error handling with complete example
from replicate_batch_process import IntelligentBatchProcessor, BatchRequest
async def batch_with_error_handling():
processor = IntelligentBatchProcessor()
requests = [
BatchRequest(prompt="sunset", model_name="qwen/qwen-image", output_filepath="output/sunset.png"),
BatchRequest(prompt="city", model_name="qwen/qwen-image", output_filepath="output/city.png"),
]
results = await processor.process_intelligent_batch(requests)
for result in results:
if result.success:
print(f"✅ Generated: {result.file_paths}")
else:
print(f"❌ Failed: {result.error}")
asyncio.run(batch_with_error_handling())
🏗️ Project Structure
replicate-batch-process/
├── main.py # Single image generation
├── intelligent_batch_processor.py # Batch processing engine
├── config.py # Model configurations & fallbacks
├── init_environment.py # Environment setup
└── example_usage.py # Complete examples
🔧 Development
# Clone repository
git clone https://github.com/preangelleo/replicate_batch_process.git
# Install in development mode
pip install -e .
# Run examples
python example_usage.py
📄 License
MIT License - see LICENSE file for details.
🤝 Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
🔗 Links
- PyPI: https://pypi.org/project/replicate-batch-process/
- GitHub: https://github.com/preangelleo/replicate_batch_process
- Issues: https://github.com/preangelleo/replicate_batch_process/issues
Made with ❤️ for the AI community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file replicate_batch_process-1.0.8.tar.gz.
File metadata
- Download URL: replicate_batch_process-1.0.8.tar.gz
- Upload date:
- Size: 63.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9778649c1f6418bfd844c5bdbeeca94f1cc9c63845bfd6f5d724579d0d580b5a
|
|
| MD5 |
3958a4a42f5478c043433edd332d0fca
|
|
| BLAKE2b-256 |
7ecf81d57ed372d6a8e7599b90e23ec17eacff93ca6927eae21212b7fba32cef
|
File details
Details for the file replicate_batch_process-1.0.8-py3-none-any.whl.
File metadata
- Download URL: replicate_batch_process-1.0.8-py3-none-any.whl
- Upload date:
- Size: 37.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1b00273623ed8ffadb2e17300d9b2a5508eaea3efffc8e0f439ced561673045d
|
|
| MD5 |
96972bbe2f33e2ef7864e41560e99f93
|
|
| BLAKE2b-256 |
b3b06df8eac08ff44fd3f12f830e01aeac7d6a2b2bf35d00f9f0476bd5422b96
|