Skip to main content

Redis plugging for pynenc, a distributed task queue for Python.

Project description

Pynenc Redis

Pynenc Redis Plugin

Redis backend implementation for Pynenc distributed task management

Package version Supported Python versions GitHub last commit GitHub license


Main Documentation: https://docs.pynenc.org

Source Code: https://github.com/pynenc/pynenc-redis

Pynenc Core: https://github.com/pynenc/pynenc


The pynenc-redis plugin provides Redis-based implementations for Pynenc's core components, enabling distributed task orchestration with Redis as the backend. This plugin offers production-ready implementations for orchestration, message brokering, state management, argument caching, and trigger systems.

Overview

This plugin extends Pynenc with Redis-backed implementations for:

  • RedisOrchestrator: Distributed task orchestration and coordination
  • RedisBroker: Message queue and task distribution
  • RedisStateBackend: Persistent state and result storage
  • RedisArgCache: Efficient argument caching for large payloads
  • RedisTrigger: Event-driven and scheduled task execution

Installation

Install the plugin using pip:

pip install pynenc-redis

The plugin automatically registers itself with Pynenc when installed.

Quick Start

Using PynencBuilder (Recommended)

The easiest way to configure Redis components is using the PynencBuilder:

from pynenc import PynencBuilder

# Basic Redis configuration
app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .process_runner()
    .build()
)

@app.task
def add(x: int, y: int) -> int:
    return x + y

Configuration Options

Full Redis Stack

Configure all Redis components with a single call:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379/0")  # With database number
    .multi_thread_runner()
    .build()
)

Or using individual parameters:

app = (
    PynencBuilder()
    .redis(db=1)  # Uses default host/port with specific database
    .process_runner()
    .build()
)

Redis Argument Cache

Configure Redis-based argument caching for large payloads:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .redis_arg_cache(
        min_size_to_cache=1024,    # Cache arguments > 1KB
        local_cache_size=1000       # Keep 1000 entries in local cache
    )
    .build()
)

Redis Trigger System

Enable Redis-based triggers for scheduled and event-driven tasks:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .redis_trigger(
        scheduler_interval_seconds=60,  # Check every minute
        enable_scheduler=True
    )
    .build()
)

Using pyproject.toml

Alternatively, configure Redis components in your pyproject.toml:

[tool.pynenc]
app_id = "my_app"
orchestrator_cls = "RedisOrchestrator"
broker_cls = "RedisBroker"
state_backend_cls = "RedisStateBackend"
arg_cache_cls = "RedisArgCache"
trigger_cls = "RedisTrigger"
runner_cls = "ProcessRunner"

[tool.pynenc.redis]
redis_url = "redis://localhost:6379/0"
# Or use individual parameters:
# redis_host = "localhost"
# redis_port = 6379
# redis_db = 0

Plugin Components

RedisOrchestrator

Manages task dependencies and execution flow using Redis:

  • Atomic operations for task state transitions
  • Efficient dependency tracking
  • Batch processing for high-throughput scenarios
  • Connection pooling and automatic reconnection

RedisBroker

Distributes tasks across workers:

  • FIFO task queue with blocking pop operations
  • Efficient batch routing
  • Configurable timeouts and retry mechanisms
  • Support for task priorities

RedisStateBackend

Persists task state and results:

  • Invocation history and status tracking
  • Result storage with TTL support
  • Workflow data persistence
  • App discovery and registration

RedisArgCache

Optimizes handling of large arguments:

  • Automatic caching based on size thresholds
  • LRU eviction for memory management
  • Shared cache across processes
  • Fingerprint-based deduplication

RedisTrigger

Enables event-driven task execution:

  • Cron-based scheduling
  • Task status change triggers
  • Result-based triggers
  • Custom event handling

Connection Configuration

The plugin supports multiple ways to configure Redis connections:

URL-based (Recommended)

.redis(url="redis://localhost:6379/0")
.redis(url="redis://:password@localhost:6379/0")
.redis(url="rediss://localhost:6380/0")  # SSL/TLS

Parameter-based

.redis(db=1)  # Uses REDIS_HOST and REDIS_PORT from config/env

Environment Variables

PYNENC__REDIS__REDIS_URL="redis://localhost:6379/0"
# Or individual parameters:
PYNENC__REDIS__REDIS_HOST="localhost"
PYNENC__REDIS__REDIS_PORT=6379
PYNENC__REDIS__REDIS_DB=0

Advanced Features

Connection Resilience

The plugin includes robust connection handling:

  • Automatic reconnection with exponential backoff
  • Configurable retry mechanisms
  • Socket timeouts and health checks
  • Connection pooling

Performance Optimization

  • Batch processing for parallel task creation
  • Pipeline operations for reduced network overhead
  • Local caching to minimize Redis queries
  • Efficient serialization strategies

Monitoring Integration

Works seamlessly with Pynenc's monitoring interface:

pynenc --app your_app monitor

Development and Testing

The plugin supports both development and production modes:

Development Mode

# Synchronous execution for testing
app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .dev_mode(force_sync_tasks=True)
    .build()
)

Integration Testing

Integration tests use testcontainers to automatically manage Redis instances. Docker must be installed and running for integration tests to execute.

Run tests with:

poetry run pytest tests/integration/

Requirements

  • Python >= 3.11.6
  • Pynenc >= 0.0.24
  • Redis >= 4.6.0
  • A running Redis server

Documentation

For comprehensive documentation on Pynenc and its features, visit:

Contributing

Contributions are welcome! Please see the Contributing Guide for details.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pynenc_redis-0.1.4.tar.gz (1.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pynenc_redis-0.1.4-py3-none-any.whl (31.9 kB view details)

Uploaded Python 3

File details

Details for the file pynenc_redis-0.1.4.tar.gz.

File metadata

  • Download URL: pynenc_redis-0.1.4.tar.gz
  • Upload date:
  • Size: 1.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.2

File hashes

Hashes for pynenc_redis-0.1.4.tar.gz
Algorithm Hash digest
SHA256 4259ae60a781beaa9928b8a236551b947c1e21f4988c8840068e4953279345b0
MD5 6496ec01060a9b0fa4e745d8d4798bb6
BLAKE2b-256 e7c0731ad832c111b16baa1053da579cd7abb914e00c43774a56efc9c1313af6

See more details on using hashes here.

File details

Details for the file pynenc_redis-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for pynenc_redis-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 29f82f87c94ab0806e60596f20aea1c4f3816ef3a135414b8d537852139e7e2c
MD5 ea0c415470907062741405bcb55d872a
BLAKE2b-256 cef482c4ce954a2914c6234bc414e9f092b3ad28239ccebb82d4559f98949359

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page