Skip to main content

A Python SDK for building high-performance, asynchronous batch processing operators

Project description

SandAI Operator SDK

Python framework for developing data operators under the Dataflow architecture. Part of the SandAI Data Project's three-layer separation design.

Overview

The Operator SDK provides the foundation for building data processing operators that form the Dataflow layer in the SandAI architecture. These operators are atomic, reusable components that can be composed into complex pipelines and workflows.

Features

  • Asynchronous Batch Processing: Concurrent processing with configurable batch size and concurrency
  • Smart File Monitoring: Real-time file change detection with vim editor compatibility
  • Task Working Directories: Isolated working directories for each task
  • Error Recovery: Automatic handling of file operations and network interruptions
  • Standardized Interface: Consistent operator lifecycle and API design
  • Celery Integration: Built-in support for distributed task execution

Installation

cd operator-sdk
pip install -e .
conda create -n sandai-operator python=3.8 -c conda-forge

运行时要求:SDK 支持 Python 3.8+。如果你在更高版本 Python 上启用了 free-threading/no-GIL,可以获得更好的 CPU 并行表现,但这不是运行前提。

Quick Start

from sandai.operator import BatchProcessor, TaskInput, TaskOutput
from pydantic import BaseModel
from typing import List, Generator

class Options(BaseModel):
    param: str = "default"

class Results(BaseModel):
    output: str

processor = BatchProcessor(name="my-processor", version="1.0.0")

@processor.on_batch(\n    max_concurrency=4,\n    max_batch_size=8,\n    prepare_concurrency=4,\n    output_concurrency=4,\n)
def process_batch(
    batch_inputs: List[TaskInput[Options]], 
    operator_config: dict,
    context
) -> Generator[TaskOutput[Results], None, None]:
    
    for task_input in batch_inputs:
        # Get task working directory
        workdir = context.get_task_workdir(task_input.task_id)
        
        # Your processing logic here
        result = Results(output=f"processed-{task_input.options.param}")
        
        yield TaskOutput[Results](
            task_id=task_input.task_id,
            results=result,
            status="success"
        )

if __name__ == "__main__":
    processor.run()

prepare_concurrencyoutput_concurrency 默认会继承 max_concurrency,因此不配置时行为与旧版本一致。当前实现里,prepare 下载/输入转换、output 上传/清理、channel pull/push 已经使用独立 executor;因此可以单独提高 prepare_concurrencyoutput_concurrency,而不是都挤在同一个 IO 池里竞争。

Celery 协议层现在默认兼容 jsonmsgpack 两种 content-type。服务端默认仍使用 json 编码,但 accept_content 会同时接受 json/msgpack;新版 operator-client 默认使用 msgpack 发送与取回结果,因此可以逐步灰度升级而不要求 client/server 同步切换。

如果 Celery 任务在 visibility_timeout 之后被其他 worker 重新取走,服务端现在还支持用 max_delivery_attempts 做投递次数上限控制。这个参数可以像其它运行时参数一样通过 BatchProcessor(...)@processor.on_batch(...) 或环境变量 SANDAI_OPERATOR_CELERY_MAX_DELIVERY_ATTEMPTS 配置;超过上限后,任务会被服务端标记为 TaskDeliveryLimitExceededError 并停止继续 redelivery。默认值是 0,表示禁用该保护。

Core Components

  • BatchProcessor: Asynchronous batch processor with configurable concurrency
  • FileChannel: File monitoring with real-time change detection
  • ProcessingContext: Task-level working directory management
  • CeleryChannel: Distributed task execution via Celery

Architecture Integration

This SDK enables the Dataflow layer of the SandAI architecture:

  • Operators built with this SDK are deployed in the operators/ directory
  • Pipelines in the pipelines/ directory compose these operators
  • Workflows in the workflows/ directory orchestrate complete business processes

Example Operators

See the operators/ directory for complete implementations:

  • video-clipper/: Video processing operator
  • data-transformer/: Data format conversion operator

Testing

make test          # Run all tests
make test-sdk      # Run SDK core tests

Supervisor CLI

operator-sdk provides sdrun for launching multiple identical worker processes, aggregating logs, forwarding signals, and supervising worker lifecycle policies.

sdrun -w 4 --restart always -- python main.py -j --mode file
  • -w / --worker: number of worker processes to launch, default 1
  • --restart never: default; do not restart workers after a non-zero exit
  • --restart always: always restart a worker after a non-zero exit
  • --restart N: restart a worker at most N times after non-zero exits
  • --success-exit ignore: default; when a worker exits with code 0, do not affect other workers
  • --success-exit shutdown: when a worker exits with code 0, stop the remaining workers
  • --failure-exit ignore: default; when a worker exits non-zero and will not be restarted, do not affect other workers
  • --failure-exit shutdown: when a worker exits non-zero and will not be restarted, stop the remaining workers and return that worker's exit code
  • --startup-stagger SECONDS: sequential startup delay, default 0; for example 0.5 starts worker-1 after 0.5s and worker-2 after 1.0s

Policy model:

  • --restart only controls whether the exited worker itself should be restarted after a non-zero exit.
  • --success-exit controls whether a clean exit from one worker should stop the rest.
  • --failure-exit controls whether a non-zero exit from one worker, once no more restarts apply, should stop the rest.
  • If all workers eventually exit without supervisor-forced shutdown, sdrun exits with the sum of all final worker exit codes.
  • If --failure-exit shutdown is used, sdrun exits with the first non-restarted failing worker's exit code.
  • SIGTERM, SIGINT, SIGHUP, and SIGQUIT received by sdrun are forwarded to all workers.
  • Logs are prefixed with worker identity, for example [worker-2#1][stdout] ....
  • On POSIX, sdrun starts each worker in its own process group. On Linux it also installs a parent-death signal before exec so workers are terminated if the supervisor disappears unexpectedly.
  • Child processes receive SDRUN_MODE=true, SDRUN_WORLD_SIZE, SDRUN_RANK, and SDRUN_LOCAL_RANK.

Common combinations:

  • Independent workers: --restart never --success-exit ignore --failure-exit ignore
  • Fail-fast workers: --restart never --success-exit ignore --failure-exit shutdown
  • Elastic recovery on failures: --restart always --success-exit ignore --failure-exit shutdown
  • First clean completion wins: --restart never --success-exit shutdown --failure-exit shutdown

If sdrun causes GPU memory usage to explode because multiple worker processes each hold their own copy of large tensors or model weights, consider using shared-tensor to share those tensors across processes: https://github.com/world-sim-dev/shared-tensor. This is especially useful for single-GPU, multi-process inference when the model runtime is not thread-safe and threads cannot be used safely.

FileChannel With SDRUN

When workers are launched by sdrun and the operator runs in file mode:

  • FileChannel shards input lines by line index using line_index % SDRUN_WORLD_SIZE == SDRUN_RANK.
  • Each worker processes only the JSONL rows assigned to its rank.
  • Output files are renamed by inserting the rank before the extension, for example output.jsonl becomes output.0.jsonl and output.1.jsonl.
  • If the output file has no extension, the rank suffix is appended directly to the filename.

This means sdrun -w 4 -- python main.py --mode file ... produces 4 parallel output files that must be merged by the caller if a single combined result is needed.

Development

Setup Local Minio

brew install minio/stable/minio
brew install minio/stable/mc
minio server var/minio

Setup Local Redis

brew install redis
brew services start redis

Setup Local Postgres

brew install postgresql
brew services start postgresql

List Services

brew services list

Creating New Operators

  1. Create operator directory in ../operators/my-operator/
  2. Implement using this SDK
  3. Deploy as Celery service
  4. Use in pipelines and workflows

Best Practices

  • Keep operators focused on single responsibilities
  • Use proper error handling and logging
  • Implement comprehensive tests
  • Document operator interfaces clearly

License

MIT License

打包和上传

make build ossutil cp dist/sandai_operator_sdk-0.2.7-py3-none-any.whl oss://python-artifacts/ -e oss-cn-shanghai.aliyuncs.com --acl public-read

本地开发安装

pip install -e /path/to/operator-sdk

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sandai_operator_sdk-0.4.4.tar.gz (72.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sandai_operator_sdk-0.4.4-py3-none-any.whl (45.1 kB view details)

Uploaded Python 3

File details

Details for the file sandai_operator_sdk-0.4.4.tar.gz.

File metadata

  • Download URL: sandai_operator_sdk-0.4.4.tar.gz
  • Upload date:
  • Size: 72.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for sandai_operator_sdk-0.4.4.tar.gz
Algorithm Hash digest
SHA256 50679112570f08ba1cdeeef825f3af308fe8b26e7015e23354f123cd99565c12
MD5 f8ce9b4be3a3191c485d33ff1dfbf4ac
BLAKE2b-256 e4b65de6fd8a871978cb75f80335e83102c3fd27dc7c04090104781cd9fa00b8

See more details on using hashes here.

File details

Details for the file sandai_operator_sdk-0.4.4-py3-none-any.whl.

File metadata

File hashes

Hashes for sandai_operator_sdk-0.4.4-py3-none-any.whl
Algorithm Hash digest
SHA256 580498a91250a2d09ab427cf0ac4966c77baefdf97baa4c4113561c0ad624dd5
MD5 5b3e3501ad6f351f4a901629728cf6c8
BLAKE2b-256 93627049eb684bffeab70f9d775a87a3ff7b1e271e2c4ef3fa1208c7ca916fb1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page