Skip to main content

Intelligent batch processing tool for Replicate models with automatic fallback mechanisms

Project description

Replicate Batch Process

中文版 README | English | PyPI Package

PyPI version Python 3.8+ License: MIT

Intelligent batch processing tool for Replicate models with automatic fallback mechanisms and concurrent processing.

✨ Key Features

  • 🔄 Intelligent Fallback System - Automatic model switching on incompatibility
  • Smart Concurrency Control - Adaptive rate limiting and batch processing
  • 🎯 Three Usage Modes - Single, batch same-model, and mixed-model processing
  • 📝 Custom File Naming - Ordered output with correspondence control
  • 🛡️ Error Resilience - Comprehensive retry and recovery mechanisms
  • Model Validation - Automatic detection of unsupported models with clear error messages

📋 Requirements

  • Python 3.8 or higher
  • Replicate API token (Get one here)
  • asyncio support for batch processing

📦 Installation

pip install replicate-batch-process

🚀 Quick Start

1. Set up API Token

# Option 1: Interactive setup
replicate-init

# Option 2: Manual setup
export REPLICATE_API_TOKEN="your-token-here"

# Option 3: .env file
echo "REPLICATE_API_TOKEN=your-token-here" > .env

2. Single Image Generation

from replicate_batch_process import replicate_model_calling

file_paths = replicate_model_calling(
    prompt="A beautiful sunset over mountains",
    model_name="qwen/qwen-image",  # Use supported model
    output_filepath="output/sunset.jpg"
)

3. Batch Processing (Async Required)

import asyncio
from replicate_batch_process import intelligent_batch_process

async def main():
    files = await intelligent_batch_process(
        prompts=["sunset", "city", "forest"],
        model_name="qwen/qwen-image",
        max_concurrent=8,
        output_filepath=["output/sunset.png", "output/city.png", "output/forest.png"]
    )
    return files

# Run the async function
asyncio.run(main())

📋 Supported Models

Image Generation Models

Model Price Specialization Reference Image Support
black-forest-labs/flux-dev $0.025 Fast generation, minimal censorship
black-forest-labs/flux-kontext-max $0.08 Image editing, character consistency
qwen/qwen-image $0.025 Text rendering, cover images
google/imagen-4-ultra $0.06 High-quality detailed images

Video Generation Models

Model Price Specialization Reference Image Support
google/veo-3-fast $3.32/call Fast video with audio
kwaivgi/kling-v2.1-master $0.28/sec 1080p video, 5-10 second duration

⚠️ Note: Using unsupported models will return a clear error message: "Model '{model_name}' is not supported. Please use one of the supported models listed above."

🔄 Intelligent Fallback System

Automatic model switching when issues arise:

Reference Image Auto-Detection

# User provides reference image to non-supporting model
replicate_model_calling(
    prompt="Generate based on this image",
    model_name="black-forest-labs/flux-dev",  # Doesn't support reference images
    input_image="path/to/image.jpg"           # → Auto-switches to flux-kontext-max
)

Parameter Compatibility Handling

# Unsupported parameters automatically cleaned and model switched
replicate_model_calling(
    prompt="Generate image",
    model_name="black-forest-labs/flux-kontext-max",
    guidance=3.5,        # Unsupported parameter
    num_outputs=2        # → Auto-switches to compatible model
)

API Error Recovery

Automatic fallback chain: Flux DevQwen ImageImagen 4 Ultra

📋 Usage Scenarios

Mode Use Case Command
Single One-off generation, testing replicate_model_calling()
Batch Same Multiple prompts, same model intelligent_batch_process()
Mixed Models Different models/parameters IntelligentBatchProcessor()

🧠 Smart Processing Strategies

The system automatically selects optimal processing strategy:

  • Immediate Processing: Tasks ≤ available quota → Full concurrency
  • Window Processing: Tasks ≤ 600 but > current quota → Wait then batch
  • Dynamic Queue: Tasks > 600 → Continuous processing with queue management

⚙️ Configuration

API Keys

Get your Replicate API token: replicate.com/account/api-tokens

Custom Fallback Rules

Modify config.py:

FALLBACK_MODELS = {
    'your-model': {
        'fail': {
            'fallback_model': 'backup-model',
            'condition': 'api_error'
        }
    }
}

⚠️ Common Pitfalls

  1. Async/Await: Batch functions must be called within async context
  2. Model Names: Use exact model names from supported list above
  3. File Paths: Ensure output directories exist before processing
  4. Rate Limits: Keep max_concurrent ≤ 15 to avoid 429 errors

📊 Performance Benchmarks

Task Time Throughput
Single image ~12s 5 images/min
Batch (10 images) ~25s 24 images/min
Mixed models (5) ~30s 10 images/min

📊 Rate Limiting

  • Replicate API: 600 requests/minute (shared across all models)
  • Recommended Concurrency: 5-8 (conservative) to 12 (aggressive)
  • Auto-Retry: Built-in 429 error handling with exponential backoff

⚠️ Known Issues (v1.0.7)

  • Fixed in v1.0.7: FileOutput handling improved for Kontext Max model
  • Fixed in v1.0.7: Empty file creation issue resolved
  • Input Image: Kontext Max input_image handling may fallback to other models (automatic workaround)
  • All issues have automatic fallback mechanisms ensuring successful generation

📦 Migration from v1.0.x

v1.0.6 → v1.0.7

  • Fixed: FileOutput handling for flux-kontext-max model
  • Fixed: Empty file creation bug when processing certain outputs
  • No API changes required - drop-in replacement

💡 Best Practices

# For large batches, use chunking
def process_large_batch(prompts, chunk_size=50):
    for chunk in chunks(prompts, chunk_size):
        files = await intelligent_batch_process(chunk, model_name)
        yield files

# Error handling with complete example
from replicate_batch_process import IntelligentBatchProcessor, BatchRequest

async def batch_with_error_handling():
    processor = IntelligentBatchProcessor()
    requests = [
        BatchRequest(prompt="sunset", model_name="qwen/qwen-image", output_filepath="output/sunset.png"),
        BatchRequest(prompt="city", model_name="qwen/qwen-image", output_filepath="output/city.png"),
    ]
    results = await processor.process_intelligent_batch(requests)
    
    for result in results:
        if result.success:
            print(f"✅ Generated: {result.file_paths}")
        else:
            print(f"❌ Failed: {result.error}")

asyncio.run(batch_with_error_handling())

🏗️ Project Structure

replicate-batch-process/
├── main.py                      # Single image generation
├── intelligent_batch_processor.py  # Batch processing engine
├── config.py                    # Model configurations & fallbacks
├── init_environment.py          # Environment setup
└── example_usage.py            # Complete examples

🔧 Development

# Clone repository
git clone https://github.com/preangelleo/replicate_batch_process.git

# Install in development mode
pip install -e .

# Run examples
python example_usage.py

📄 License

MIT License - see LICENSE file for details.

🤝 Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

🔗 Links


Made with ❤️ for the AI community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

replicate_batch_process-1.0.7.tar.gz (60.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

replicate_batch_process-1.0.7-py3-none-any.whl (35.3 kB view details)

Uploaded Python 3

File details

Details for the file replicate_batch_process-1.0.7.tar.gz.

File metadata

  • Download URL: replicate_batch_process-1.0.7.tar.gz
  • Upload date:
  • Size: 60.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.23

File hashes

Hashes for replicate_batch_process-1.0.7.tar.gz
Algorithm Hash digest
SHA256 bf15a89187d805d937eea9fdbdcc1c98c24e8824000a0f9560d80c9b00b8d60f
MD5 6459f2d846a411d61dbcc3546dd6f935
BLAKE2b-256 1d6bf6a0fe28d7ac24469ed96857414a261a34348432dfcdc91918f66423daa4

See more details on using hashes here.

File details

Details for the file replicate_batch_process-1.0.7-py3-none-any.whl.

File metadata

File hashes

Hashes for replicate_batch_process-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 460a8739aa5159fdac22fe9f3ace598217f728822bac90a9d981faeba349402f
MD5 ac966d8eb483af734bf80f85e0fdc4cd
BLAKE2b-256 14a84f5f3d7bb045d178ffbcf35a9d72a2d220ac425f167804ae356036167c6d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page