Sequential task orchestrator with dependency management and progress tracking
Reason this release was yanked:
Critical issues during adapter installation
Project description
Task Sequencer
Универсальный Python-компонент для управления последовательным выполнением задач с зависимостями, отслеживанием прогресса и возможностью восстановления с места остановки.
Описание
task-sequencer — это переиспользуемый компонент, он предоставляет гибкий механизм для управления последовательным выполнением задач с поддержкой зависимостей, отслеживания прогресса и восстановления после прерываний.
Компонент абстрагирован от доменной логики и работает через интерфейсы (ABC), что делает его универсальным для различных сценариев использования.
Основные возможности
- ✅ Управление порядком выполнения — задачи выполняются в указанном порядке с проверкой зависимостей
- ✅ Отслеживание прогресса — детальное отслеживание состояния выполнения каждой задачи
- ✅ Восстановление с места остановки — возможность продолжить выполнение после прерывания
- ✅ Валидация зависимостей — автоматическая проверка корректности зависимостей между задачами
- ✅ Итеративные задачи — поддержка обработки коллекций элементов с возможностью прерывания
- ✅ Гибкое хранилище прогресса — поддержка различных адаптеров (Memory, MySQL, MongoDB, PostgreSQL)
Установка
Базовая установка
pip install task-sequencer
С опциональными зависимостями
Для использования адаптеров БД установите соответствующие опциональные зависимости:
# MySQL адаптер
pip install task-sequencer[mysql]
# MongoDB адаптер
pip install task-sequencer[mongodb]
# PostgreSQL адаптер
pip install task-sequencer[postgresql]
# Все адаптеры
pip install task-sequencer[mysql,mongodb,postgresql]
Быстрый старт
Простой пример
from task_sequencer import (
DependencyValidator,
ExecutionContext,
Task,
TaskOrchestrator,
TaskRegistry,
TaskResult,
)
from task_sequencer.adapters.memory import MemoryProgressTracker
class MyTask(Task):
"""Простая задача."""
@property
def name(self) -> str:
return "my_task"
@property
def depends_on(self) -> list[str]:
return []
def execute(self, context: ExecutionContext) -> TaskResult:
print("Executing task...")
return TaskResult.success_result(data={"message": "Task completed"})
# Создаем компоненты
registry = TaskRegistry([MyTask()])
tracker = MemoryProgressTracker()
validator = DependencyValidator()
orchestrator = TaskOrchestrator(registry, tracker, validator)
# Выполняем задачу
result = orchestrator.execute(["my_task"])
print(f"Status: {result.status.value}")
print(f"Completed: {result.completed_tasks}")
Пример с зависимостями
class TaskA(Task):
@property
def name(self) -> str:
return "task_a"
@property
def depends_on(self) -> list[str]:
return []
def execute(self, context: ExecutionContext) -> TaskResult:
return TaskResult.success_result()
class TaskB(Task):
@property
def name(self) -> str:
return "task_b"
@property
def depends_on(self) -> list[str]:
return ["task_a"] # Зависит от task_a
def execute(self, context: ExecutionContext) -> TaskResult:
return TaskResult.success_result()
registry = TaskRegistry([TaskA(), TaskB()])
tracker = MemoryProgressTracker()
validator = DependencyValidator()
orchestrator = TaskOrchestrator(registry, tracker, validator)
# task_a выполнится первой, затем task_b
result = orchestrator.execute(["task_a", "task_b"])
Пример с ParameterizedIterableTask
Для задач, которые обрабатывают элементы, зависящие от результатов предыдущих задач:
from task_sequencer import ParameterizedIterableTask
from task_sequencer.interfaces import ExecutionContext, TaskResult
class ProcessStatsForMatchTask(ParameterizedIterableTask[str]):
@property
def name(self) -> str:
return "process_stats_for_match"
@property
def depends_on(self) -> list[str]:
return ["extract_matches"]
def get_parameters(self, context: ExecutionContext) -> list[str]:
"""Получает параметры из результата предыдущей задачи."""
extract_result = context.results.get("extract_matches")
if extract_result and extract_result.data:
return extract_result.data.get("match_ids", [])
return []
def execute_for_parameter(self, match_id: str, context: ExecutionContext) -> None:
"""Обрабатывает один параметр."""
print(f"Processing match: {match_id}")
def execute(self, context: ExecutionContext) -> TaskResult:
for param in self.get_parameters(context):
self.execute_for_parameter(param, context)
return TaskResult.success_result()
См. полный пример в examples/parameterized_task_example.py.
Пример с итеративной задачей и восстановлением
from task_sequencer import IterableTask
from task_sequencer.iterators import ResumeIterator
class ProcessItemsTask(IterableTask):
@property
def name(self) -> str:
return "process_items"
@property
def depends_on(self) -> list[str]:
return []
def execute(self, context: ExecutionContext) -> TaskResult:
items = list(self.get_items(context))
# Используем ResumeIterator для поддержки восстановления
if context.metadata.get("resume", False):
id_extractor = lambda x: x["id"]
items_iterator = ResumeIterator(
items=items,
progress_tracker=context.progress_tracker,
task_name=self.name,
id_extractor=id_extractor,
)
else:
items_iterator = iter(items)
for item in items_iterator:
self.execute_for_item(item, context)
return TaskResult.success_result()
def get_items(self, context: ExecutionContext) -> list[dict]:
return [{"id": str(i), "data": f"item_{i}"} for i in range(1, 101)]
def execute_for_item(self, item: dict, context: ExecutionContext) -> None:
# Обработка элемента
print(f"Processing {item['id']}")
# Первый запуск
orchestrator.execute(["process_items"])
# Второй запуск с восстановлением
orchestrator.execute(["process_items"], resume=True)
Логирование
task-sequencer использует стандартный модуль logging для отслеживания прогресса выполнения задач.
Настройка логирования
from task_sequencer.logging import setup_logging
import logging
# Настройка логирования с уровнем INFO
setup_logging(logging.INFO)
# Или используйте стандартную настройку Python logging
import logging
logging.basicConfig(
level=logging.INFO,
format='[%(levelname)s] %(name)s: %(message)s'
)
Примеры логов
При выполнении задач вы увидите логи вида:
[task-sequencer] core: Starting execution of 2 tasks
[task-sequencer] task1: Task started
[task-sequencer] task1: Task completed successfully
[task-sequencer] task2: Task started
[task-sequencer] task2: Processing 100 items
[task-sequencer] task2: Task completed: 100/100 items processed
[task-sequencer] core: Execution completed: 2 completed, 0 failed
Примеры использования
В папке examples/ доступны полные примеры:
basic_usage.py— простой пример с одной задачейetl_example.py— пример ETL процесса с зависимостямиsync_example.py— пример синхронизации данных с восстановлениемparameterized_task_example.py— пример использования ParameterizedIterableTask
Запуск примеров:
python examples/basic_usage.py
python examples/etl_example.py
python examples/sync_example.py
API Документация
Основные классы
TaskOrchestrator
Основной класс для управления выполнением задач.
orchestrator = TaskOrchestrator(
task_registry=TaskRegistry([...]),
progress_tracker=MemoryProgressTracker(),
dependency_validator=DependencyValidator(),
)
result = orchestrator.execute(
task_order=["task1", "task2"],
mode="run", # 'run', 'dry-run', 'resume'
resume=False,
)
Task и IterableTask
Абстрактные классы для определения задач.
class MyTask(Task):
@property
def name(self) -> str:
return "my_task"
@property
def depends_on(self) -> list[str]:
return ["other_task"]
def execute(self, context: ExecutionContext) -> TaskResult:
# Логика выполнения
return TaskResult.success_result()
ProgressTracker
Абстрактный класс для отслеживания прогресса. Реализации:
MemoryProgressTracker— хранение в памяти (встроенный)MySQLProgressTracker— MySQL (опционально)MongoDBProgressTracker— MongoDB (опционально)PostgreSQLProgressTracker— PostgreSQL (опционально)
Требования
- Python 3.8+
- Для опциональных адаптеров БД:
- MySQL:
pymysql>=1.0.0,sqlalchemy>=1.4.0 - MongoDB:
pymongo>=4.0.0 - PostgreSQL:
psycopg2-binary>=2.9.0,sqlalchemy>=1.4.0
- MySQL:
Разработка
Установка для разработки
git clone git@github.com:seligoroff/task_sequencer.git
cd task_sequencer
python -m venv .venv
source .venv/bin/activate # Linux/Mac
# или
.venv\Scripts\activate # Windows
pip install -e ".[dev]"
Запуск тестов
pytest tests/ -v --cov=task_sequencer --cov-report=html
Форматирование кода
black --line-length=100 task_sequencer/ tests/ examples/
Проверка линтером
flake8 task_sequencer/ tests/ examples/
Проверка типов
mypy task_sequencer/
Документация
Полная документация доступна в папке docs/ или на Read the Docs (после публикации).
Лицензия
MIT License - см. файл LICENSE для деталей.
Авторы
[Ваше имя/команда]
Поддержка
Для вопросов и предложений создавайте issues в GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file task_sequencer-0.1.0.tar.gz.
File metadata
- Download URL: task_sequencer-0.1.0.tar.gz
- Upload date:
- Size: 40.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e9a73f622510f6720e323e342ce6bb54ff7139ef823ca130d0655995df3e03a8
|
|
| MD5 |
144fec45211cca9219c16976224ae9c8
|
|
| BLAKE2b-256 |
468a90877791e71cb7c52248512b115a1daa5c0e0dca5f2123a0191dd5d1069e
|
File details
Details for the file task_sequencer-0.1.0-py3-none-any.whl.
File metadata
- Download URL: task_sequencer-0.1.0-py3-none-any.whl
- Upload date:
- Size: 23.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e15c1edd8fbea8ccd08a7f1a22ff2c8952c28dbc488d0d6df0c9212b3ab2667e
|
|
| MD5 |
57c0c27c54c3ddd969dd4b5a152ddf65
|
|
| BLAKE2b-256 |
cacbe4c67458bb6aaf705e265ca528dc9283db422de83ae5697567aab6e71a18
|