Skip to main content

Redis plugging for pynenc, a distributed task queue for Python.

Project description

Pynenc Redis

Pynenc Redis Plugin

Redis backend implementation for Pynenc distributed task management

Package version Supported Python versions GitHub last commit GitHub license


Main Documentation: https://docs.pynenc.org

Source Code: https://github.com/pynenc/pynenc-redis

Pynenc Core: https://github.com/pynenc/pynenc


The pynenc-redis plugin provides Redis-based implementations for Pynenc's core components, enabling distributed task orchestration with Redis as the backend. This plugin offers production-ready implementations for orchestration, message brokering, state management, argument caching, and trigger systems.

Overview

This plugin extends Pynenc with Redis-backed implementations for:

  • RedisOrchestrator: Distributed task orchestration and coordination
  • RedisBroker: Message queue and task distribution
  • RedisStateBackend: Persistent state and result storage
  • RedisArgCache: Efficient argument caching for large payloads
  • RedisTrigger: Event-driven and scheduled task execution

Installation

Install the plugin using pip:

pip install pynenc-redis

The plugin automatically registers itself with Pynenc when installed.

Quick Start

Using PynencBuilder (Recommended)

The easiest way to configure Redis components is using the PynencBuilder:

from pynenc import PynencBuilder

# Basic Redis configuration
app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .process_runner()
    .build()
)

@app.task
def add(x: int, y: int) -> int:
    return x + y

Configuration Options

Full Redis Stack

Configure all Redis components with a single call:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379/0")  # With database number
    .multi_thread_runner()
    .build()
)

Or using individual parameters:

app = (
    PynencBuilder()
    .redis(db=1)  # Uses default host/port with specific database
    .process_runner()
    .build()
)

Redis Argument Cache

Configure Redis-based argument caching for large payloads:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .redis_arg_cache(
        min_size_to_cache=1024,    # Cache arguments > 1KB
        local_cache_size=1000       # Keep 1000 entries in local cache
    )
    .build()
)

Redis Trigger System

Enable Redis-based triggers for scheduled and event-driven tasks:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .redis_trigger(
        scheduler_interval_seconds=60,  # Check every minute
        enable_scheduler=True
    )
    .build()
)

Using pyproject.toml

Alternatively, configure Redis components in your pyproject.toml:

[tool.pynenc]
app_id = "my_app"
orchestrator_cls = "RedisOrchestrator"
broker_cls = "RedisBroker"
state_backend_cls = "RedisStateBackend"
arg_cache_cls = "RedisArgCache"
trigger_cls = "RedisTrigger"
runner_cls = "ProcessRunner"

[tool.pynenc.redis]
redis_url = "redis://localhost:6379/0"
# Or use individual parameters:
# redis_host = "localhost"
# redis_port = 6379
# redis_db = 0

Plugin Components

RedisOrchestrator

Manages task dependencies and execution flow using Redis:

  • Atomic operations for task state transitions
  • Efficient dependency tracking
  • Batch processing for high-throughput scenarios
  • Connection pooling and automatic reconnection

RedisBroker

Distributes tasks across workers:

  • FIFO task queue with blocking pop operations
  • Efficient batch routing
  • Configurable timeouts and retry mechanisms
  • Support for task priorities

RedisStateBackend

Persists task state and results:

  • Invocation history and status tracking
  • Result storage with TTL support
  • Workflow data persistence
  • App discovery and registration

RedisArgCache

Optimizes handling of large arguments:

  • Automatic caching based on size thresholds
  • LRU eviction for memory management
  • Shared cache across processes
  • Fingerprint-based deduplication

RedisTrigger

Enables event-driven task execution:

  • Cron-based scheduling
  • Task status change triggers
  • Result-based triggers
  • Custom event handling

Connection Configuration

The plugin supports multiple ways to configure Redis connections:

URL-based (Recommended)

.redis(url="redis://localhost:6379/0")
.redis(url="redis://:password@localhost:6379/0")
.redis(url="rediss://localhost:6380/0")  # SSL/TLS

Parameter-based

.redis(db=1)  # Uses REDIS_HOST and REDIS_PORT from config/env

Environment Variables

PYNENC__REDIS__REDIS_URL="redis://localhost:6379/0"
# Or individual parameters:
PYNENC__REDIS__REDIS_HOST="localhost"
PYNENC__REDIS__REDIS_PORT=6379
PYNENC__REDIS__REDIS_DB=0

Advanced Features

Connection Resilience

The plugin includes robust connection handling:

  • Automatic reconnection with exponential backoff
  • Configurable retry mechanisms
  • Socket timeouts and health checks
  • Connection pooling

Performance Optimization

  • Batch processing for parallel task creation
  • Pipeline operations for reduced network overhead
  • Local caching to minimize Redis queries
  • Efficient serialization strategies

Monitoring Integration

Works seamlessly with Pynenc's monitoring interface:

pynenc --app your_app monitor

Development and Testing

The plugin supports both development and production modes:

Development Mode

# Synchronous execution for testing
app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .dev_mode(force_sync_tasks=True)
    .build()
)

Integration Testing

Integration tests use testcontainers to automatically manage Redis instances. Docker must be installed and running for integration tests to execute.

Run tests with:

poetry run pytest tests/integration/

Requirements

  • Python >= 3.11.6
  • Pynenc >= 0.0.24
  • Redis >= 4.6.0
  • A running Redis server

Documentation

For comprehensive documentation on Pynenc and its features, visit:

Contributing

Contributions are welcome! Please see the Contributing Guide for details.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pynenc_redis-0.1.3.tar.gz (1.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pynenc_redis-0.1.3-py3-none-any.whl (31.9 kB view details)

Uploaded Python 3

File details

Details for the file pynenc_redis-0.1.3.tar.gz.

File metadata

  • Download URL: pynenc_redis-0.1.3.tar.gz
  • Upload date:
  • Size: 1.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.2

File hashes

Hashes for pynenc_redis-0.1.3.tar.gz
Algorithm Hash digest
SHA256 14fcae18c669a5a7123ae13583e3d1098450650cc5e6ca79fb344ea1b16a66f3
MD5 8407b1cc2cf621f98a980251b3bb4466
BLAKE2b-256 151ac514be978190574e322c286ed822ec448af93c60bd8ef3faeddcf69d3fd6

See more details on using hashes here.

File details

Details for the file pynenc_redis-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for pynenc_redis-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 cf90ae4ee13a7bf3877920713f4cdb8ed5eeaabe28a060b1fe8fe10ce6f433ea
MD5 a04e55b719a31a025f7ade2b65474c84
BLAKE2b-256 4a3f638d00995551afe34e356c36261dfc8948e0a90b486f46ff9568c89ad640

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page