Skip to main content

Redis plugging for pynenc, a distributed task queue for Python.

Project description

Pynenc Redis

Pynenc Redis Plugin

Redis backend implementation for Pynenc distributed task management

Package version Supported Python versions GitHub last commit GitHub license


Main Documentation: https://docs.pynenc.org

Source Code: https://github.com/pynenc/pynenc-redis

Pynenc Core: https://github.com/pynenc/pynenc


The pynenc-redis plugin provides Redis-based implementations for Pynenc's core components, enabling distributed task orchestration with Redis as the backend. This plugin offers production-ready implementations for orchestration, message brokering, state management, argument caching, and trigger systems.

Overview

This plugin extends Pynenc with Redis-backed implementations for:

  • RedisOrchestrator: Distributed task orchestration and coordination
  • RedisBroker: Message queue and task distribution
  • RedisStateBackend: Persistent state and result storage
  • RedisArgCache: Efficient argument caching for large payloads
  • RedisTrigger: Event-driven and scheduled task execution

Installation

Install the plugin using pip:

pip install pynenc-redis

The plugin automatically registers itself with Pynenc when installed.

Quick Start

Using PynencBuilder (Recommended)

The easiest way to configure Redis components is using the PynencBuilder:

from pynenc import PynencBuilder

# Basic Redis configuration
app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .process_runner()
    .build()
)

@app.task
def add(x: int, y: int) -> int:
    return x + y

Configuration Options

Full Redis Stack

Configure all Redis components with a single call:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379/0")  # With database number
    .multi_thread_runner()
    .build()
)

Or using individual parameters:

app = (
    PynencBuilder()
    .redis(db=1)  # Uses default host/port with specific database
    .process_runner()
    .build()
)

Redis Argument Cache

Configure Redis-based argument caching for large payloads:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .redis_arg_cache(
        min_size_to_cache=1024,    # Cache arguments > 1KB
        local_cache_size=1000       # Keep 1000 entries in local cache
    )
    .build()
)

Redis Trigger System

Enable Redis-based triggers for scheduled and event-driven tasks:

app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .redis_trigger(
        scheduler_interval_seconds=60,  # Check every minute
        enable_scheduler=True
    )
    .build()
)

Using pyproject.toml

Alternatively, configure Redis components in your pyproject.toml:

[tool.pynenc]
app_id = "my_app"
orchestrator_cls = "RedisOrchestrator"
broker_cls = "RedisBroker"
state_backend_cls = "RedisStateBackend"
arg_cache_cls = "RedisArgCache"
trigger_cls = "RedisTrigger"
runner_cls = "ProcessRunner"

[tool.pynenc.redis]
redis_url = "redis://localhost:6379/0"
# Or use individual parameters:
# redis_host = "localhost"
# redis_port = 6379
# redis_db = 0

Plugin Components

RedisOrchestrator

Manages task dependencies and execution flow using Redis:

  • Atomic operations for task state transitions
  • Efficient dependency tracking
  • Batch processing for high-throughput scenarios
  • Connection pooling and automatic reconnection

RedisBroker

Distributes tasks across workers:

  • FIFO task queue with blocking pop operations
  • Efficient batch routing
  • Configurable timeouts and retry mechanisms
  • Support for task priorities

RedisStateBackend

Persists task state and results:

  • Invocation history and status tracking
  • Result storage with TTL support
  • Workflow data persistence
  • App discovery and registration

RedisArgCache

Optimizes handling of large arguments:

  • Automatic caching based on size thresholds
  • LRU eviction for memory management
  • Shared cache across processes
  • Fingerprint-based deduplication

RedisTrigger

Enables event-driven task execution:

  • Cron-based scheduling
  • Task status change triggers
  • Result-based triggers
  • Custom event handling

Connection Configuration

The plugin supports multiple ways to configure Redis connections:

URL-based (Recommended)

.redis(url="redis://localhost:6379/0")
.redis(url="redis://:password@localhost:6379/0")
.redis(url="rediss://localhost:6380/0")  # SSL/TLS

Parameter-based

.redis(db=1)  # Uses REDIS_HOST and REDIS_PORT from config/env

Environment Variables

PYNENC__REDIS__REDIS_URL="redis://localhost:6379/0"
# Or individual parameters:
PYNENC__REDIS__REDIS_HOST="localhost"
PYNENC__REDIS__REDIS_PORT=6379
PYNENC__REDIS__REDIS_DB=0

Advanced Features

Connection Resilience

The plugin includes robust connection handling:

  • Automatic reconnection with exponential backoff
  • Configurable retry mechanisms
  • Socket timeouts and health checks
  • Connection pooling

Performance Optimization

  • Batch processing for parallel task creation
  • Pipeline operations for reduced network overhead
  • Local caching to minimize Redis queries
  • Efficient serialization strategies

Monitoring Integration

Works seamlessly with Pynenc's monitoring interface:

pynenc --app your_app monitor

Development and Testing

The plugin supports both development and production modes:

Development Mode

# Synchronous execution for testing
app = (
    PynencBuilder()
    .redis(url="redis://localhost:6379")
    .dev_mode(force_sync_tasks=True)
    .build()
)

Integration Testing

Integration tests use testcontainers to automatically manage Redis instances. Docker must be installed and running for integration tests to execute.

Run tests with:

poetry run pytest tests/integration/

Requirements

  • Python >= 3.11.6
  • Pynenc >= 0.0.24
  • Redis >= 4.6.0
  • A running Redis server

Documentation

For comprehensive documentation on Pynenc and its features, visit:

Contributing

Contributions are welcome! Please see the Contributing Guide for details.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pynenc_redis-0.1.2.tar.gz (27.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pynenc_redis-0.1.2-py3-none-any.whl (31.9 kB view details)

Uploaded Python 3

File details

Details for the file pynenc_redis-0.1.2.tar.gz.

File metadata

  • Download URL: pynenc_redis-0.1.2.tar.gz
  • Upload date:
  • Size: 27.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.13

File hashes

Hashes for pynenc_redis-0.1.2.tar.gz
Algorithm Hash digest
SHA256 a080be2c26c1ab4d143952386b5fb91cc0a390c769408ffc873000b3135aaecd
MD5 ff63d65366496ca3e41682bd924335af
BLAKE2b-256 683a9ad85712e02f7ea721ce920900cef018ef4d919544e1a1adde89e44e15fc

See more details on using hashes here.

File details

Details for the file pynenc_redis-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pynenc_redis-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 31.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.13

File hashes

Hashes for pynenc_redis-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 854fc4ff07419396d4433e5746b23f5ae7e3b62f851ce019776c562378116e97
MD5 03c57a22e5438c105c8c545f0dc5c6c0
BLAKE2b-256 c21f54c97277c641a4829f86078ddcc05a1c22713329697d55c1e4fb2f6d8069

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page