Skip to main content

Sequential task orchestrator with dependency management and progress tracking

Reason this release was yanked:

Critical issues during adapter installation

Project description

Task Sequencer

Python Version License

Универсальный Python-компонент для управления последовательным выполнением задач с зависимостями, отслеживанием прогресса и возможностью восстановления с места остановки.

Описание

task-sequencer — это переиспользуемый компонент, он предоставляет гибкий механизм для управления последовательным выполнением задач с поддержкой зависимостей, отслеживания прогресса и восстановления после прерываний.

Компонент абстрагирован от доменной логики и работает через интерфейсы (ABC), что делает его универсальным для различных сценариев использования.

Основные возможности

  • Управление порядком выполнения — задачи выполняются в указанном порядке с проверкой зависимостей
  • Отслеживание прогресса — детальное отслеживание состояния выполнения каждой задачи
  • Восстановление с места остановки — возможность продолжить выполнение после прерывания
  • Валидация зависимостей — автоматическая проверка корректности зависимостей между задачами
  • Итеративные задачи — поддержка обработки коллекций элементов с возможностью прерывания
  • Гибкое хранилище прогресса — поддержка различных адаптеров (Memory, MySQL, MongoDB, PostgreSQL)

Установка

Базовая установка

pip install task-sequencer

С опциональными зависимостями

Для использования адаптеров БД установите соответствующие опциональные зависимости:

# MySQL адаптер
pip install task-sequencer[mysql]

# MongoDB адаптер
pip install task-sequencer[mongodb]

# PostgreSQL адаптер
pip install task-sequencer[postgresql]

# Все адаптеры
pip install task-sequencer[mysql,mongodb,postgresql]

Быстрый старт

Простой пример

from task_sequencer import (
    DependencyValidator,
    ExecutionContext,
    Task,
    TaskOrchestrator,
    TaskRegistry,
    TaskResult,
)
from task_sequencer.adapters.memory import MemoryProgressTracker


class MyTask(Task):
    """Простая задача."""

    @property
    def name(self) -> str:
        return "my_task"

    @property
    def depends_on(self) -> list[str]:
        return []

    def execute(self, context: ExecutionContext) -> TaskResult:
        print("Executing task...")
        return TaskResult.success_result(data={"message": "Task completed"})


# Создаем компоненты
registry = TaskRegistry([MyTask()])
tracker = MemoryProgressTracker()
validator = DependencyValidator()
orchestrator = TaskOrchestrator(registry, tracker, validator)

# Выполняем задачу
result = orchestrator.execute(["my_task"])

print(f"Status: {result.status.value}")
print(f"Completed: {result.completed_tasks}")

Пример с зависимостями

class TaskA(Task):
    @property
    def name(self) -> str:
        return "task_a"

    @property
    def depends_on(self) -> list[str]:
        return []

    def execute(self, context: ExecutionContext) -> TaskResult:
        return TaskResult.success_result()


class TaskB(Task):
    @property
    def name(self) -> str:
        return "task_b"

    @property
    def depends_on(self) -> list[str]:
        return ["task_a"]  # Зависит от task_a

    def execute(self, context: ExecutionContext) -> TaskResult:
        return TaskResult.success_result()


registry = TaskRegistry([TaskA(), TaskB()])
tracker = MemoryProgressTracker()
validator = DependencyValidator()
orchestrator = TaskOrchestrator(registry, tracker, validator)

# task_a выполнится первой, затем task_b
result = orchestrator.execute(["task_a", "task_b"])

Пример с ParameterizedIterableTask

Для задач, которые обрабатывают элементы, зависящие от результатов предыдущих задач:

from task_sequencer import ParameterizedIterableTask
from task_sequencer.interfaces import ExecutionContext, TaskResult

class ProcessStatsForMatchTask(ParameterizedIterableTask[str]):
    @property
    def name(self) -> str:
        return "process_stats_for_match"
    
    @property
    def depends_on(self) -> list[str]:
        return ["extract_matches"]
    
    def get_parameters(self, context: ExecutionContext) -> list[str]:
        """Получает параметры из результата предыдущей задачи."""
        extract_result = context.results.get("extract_matches")
        if extract_result and extract_result.data:
            return extract_result.data.get("match_ids", [])
        return []
    
    def execute_for_parameter(self, match_id: str, context: ExecutionContext) -> None:
        """Обрабатывает один параметр."""
        print(f"Processing match: {match_id}")
    
    def execute(self, context: ExecutionContext) -> TaskResult:
        for param in self.get_parameters(context):
            self.execute_for_parameter(param, context)
        return TaskResult.success_result()

См. полный пример в examples/parameterized_task_example.py.

Пример с итеративной задачей и восстановлением

from task_sequencer import IterableTask
from task_sequencer.iterators import ResumeIterator


class ProcessItemsTask(IterableTask):
    @property
    def name(self) -> str:
        return "process_items"

    @property
    def depends_on(self) -> list[str]:
        return []

    def execute(self, context: ExecutionContext) -> TaskResult:
        items = list(self.get_items(context))
        
        # Используем ResumeIterator для поддержки восстановления
        if context.metadata.get("resume", False):
            id_extractor = lambda x: x["id"]
            items_iterator = ResumeIterator(
                items=items,
                progress_tracker=context.progress_tracker,
                task_name=self.name,
                id_extractor=id_extractor,
            )
        else:
            items_iterator = iter(items)
        
        for item in items_iterator:
            self.execute_for_item(item, context)
        
        return TaskResult.success_result()

    def get_items(self, context: ExecutionContext) -> list[dict]:
        return [{"id": str(i), "data": f"item_{i}"} for i in range(1, 101)]

    def execute_for_item(self, item: dict, context: ExecutionContext) -> None:
        # Обработка элемента
        print(f"Processing {item['id']}")


# Первый запуск
orchestrator.execute(["process_items"])

# Второй запуск с восстановлением
orchestrator.execute(["process_items"], resume=True)

Логирование

task-sequencer использует стандартный модуль logging для отслеживания прогресса выполнения задач.

Настройка логирования

from task_sequencer.logging import setup_logging
import logging

# Настройка логирования с уровнем INFO
setup_logging(logging.INFO)

# Или используйте стандартную настройку Python logging
import logging
logging.basicConfig(
    level=logging.INFO,
    format='[%(levelname)s] %(name)s: %(message)s'
)

Примеры логов

При выполнении задач вы увидите логи вида:

[task-sequencer] core: Starting execution of 2 tasks
[task-sequencer] task1: Task started
[task-sequencer] task1: Task completed successfully
[task-sequencer] task2: Task started
[task-sequencer] task2: Processing 100 items
[task-sequencer] task2: Task completed: 100/100 items processed
[task-sequencer] core: Execution completed: 2 completed, 0 failed

Примеры использования

В папке examples/ доступны полные примеры:

  • basic_usage.py — простой пример с одной задачей
  • etl_example.py — пример ETL процесса с зависимостями
  • sync_example.py — пример синхронизации данных с восстановлением
  • parameterized_task_example.py — пример использования ParameterizedIterableTask

Запуск примеров:

python examples/basic_usage.py
python examples/etl_example.py
python examples/sync_example.py

API Документация

Основные классы

TaskOrchestrator

Основной класс для управления выполнением задач.

orchestrator = TaskOrchestrator(
    task_registry=TaskRegistry([...]),
    progress_tracker=MemoryProgressTracker(),
    dependency_validator=DependencyValidator(),
)

result = orchestrator.execute(
    task_order=["task1", "task2"],
    mode="run",  # 'run', 'dry-run', 'resume'
    resume=False,
)

Task и IterableTask

Абстрактные классы для определения задач.

class MyTask(Task):
    @property
    def name(self) -> str:
        return "my_task"

    @property
    def depends_on(self) -> list[str]:
        return ["other_task"]

    def execute(self, context: ExecutionContext) -> TaskResult:
        # Логика выполнения
        return TaskResult.success_result()

ProgressTracker

Абстрактный класс для отслеживания прогресса. Реализации:

  • MemoryProgressTracker — хранение в памяти (встроенный)
  • MySQLProgressTracker — MySQL (опционально)
  • MongoDBProgressTracker — MongoDB (опционально)
  • PostgreSQLProgressTracker — PostgreSQL (опционально)

Требования

  • Python 3.8+
  • Для опциональных адаптеров БД:
    • MySQL: pymysql>=1.0.0, sqlalchemy>=1.4.0
    • MongoDB: pymongo>=4.0.0
    • PostgreSQL: psycopg2-binary>=2.9.0, sqlalchemy>=1.4.0

Разработка

Установка для разработки

git clone git@github.com:seligoroff/task_sequencer.git
cd task_sequencer
python -m venv .venv
source .venv/bin/activate  # Linux/Mac
# или
.venv\Scripts\activate  # Windows

pip install -e ".[dev]"

Запуск тестов

pytest tests/ -v --cov=task_sequencer --cov-report=html

Форматирование кода

black --line-length=100 task_sequencer/ tests/ examples/

Проверка линтером

flake8 task_sequencer/ tests/ examples/

Проверка типов

mypy task_sequencer/

Документация

Полная документация доступна в папке docs/ или на Read the Docs (после публикации).

Лицензия

MIT License - см. файл LICENSE для деталей.

Авторы

[Ваше имя/команда]

Поддержка

Для вопросов и предложений создавайте issues в GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

task_sequencer-0.1.0.tar.gz (40.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

task_sequencer-0.1.0-py3-none-any.whl (23.1 kB view details)

Uploaded Python 3

File details

Details for the file task_sequencer-0.1.0.tar.gz.

File metadata

  • Download URL: task_sequencer-0.1.0.tar.gz
  • Upload date:
  • Size: 40.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.2

File hashes

Hashes for task_sequencer-0.1.0.tar.gz
Algorithm Hash digest
SHA256 e9a73f622510f6720e323e342ce6bb54ff7139ef823ca130d0655995df3e03a8
MD5 144fec45211cca9219c16976224ae9c8
BLAKE2b-256 468a90877791e71cb7c52248512b115a1daa5c0e0dca5f2123a0191dd5d1069e

See more details on using hashes here.

File details

Details for the file task_sequencer-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: task_sequencer-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 23.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.2

File hashes

Hashes for task_sequencer-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e15c1edd8fbea8ccd08a7f1a22ff2c8952c28dbc488d0d6df0c9212b3ab2667e
MD5 57c0c27c54c3ddd969dd4b5a152ddf65
BLAKE2b-256 cacbe4c67458bb6aaf705e265ca528dc9283db422de83ae5697567aab6e71a18

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page