A Python SDK for building high-performance, asynchronous batch processing operators
Project description
SandAI Operator SDK
Python framework for developing data operators under the Dataflow architecture. Part of the SandAI Data Project's three-layer separation design.
Overview
The Operator SDK provides the foundation for building data processing operators that form the Dataflow layer in the SandAI architecture. These operators are atomic, reusable components that can be composed into complex pipelines and workflows.
Features
- Asynchronous Batch Processing: Concurrent processing with configurable batch size and concurrency
- Smart File Monitoring: Real-time file change detection with vim editor compatibility
- Task Working Directories: Isolated working directories for each task
- Error Recovery: Automatic handling of file operations and network interruptions
- Standardized Interface: Consistent operator lifecycle and API design
- Celery Integration: Built-in support for distributed task execution
Installation
cd operator-sdk
pip install -e .
conda create -n sandai-operator python=3.14=h0369b99_1_cp314t -c conda-forge
Quick Start
from sandai.operator import BatchProcessor, TaskInput, TaskOutput
from pydantic import BaseModel
from typing import List, Generator
class Options(BaseModel):
param: str = "default"
class Results(BaseModel):
output: str
processor = BatchProcessor(name="my-processor", version="1.0.0")
@processor.on_batch(\n max_concurrency=4,\n max_batch_size=8,\n prepare_concurrency=4,\n output_concurrency=4,\n)
def process_batch(
batch_inputs: List[TaskInput[Options]],
operator_config: dict,
context
) -> Generator[TaskOutput[Results], None, None]:
for task_input in batch_inputs:
# Get task working directory
workdir = context.get_task_workdir(task_input.task_id)
# Your processing logic here
result = Results(output=f"processed-{task_input.options.param}")
yield TaskOutput[Results](
task_id=task_input.task_id,
results=result,
status="success"
)
if __name__ == "__main__":
processor.run()
prepare_concurrency 和 output_concurrency 默认会继承 max_concurrency,因此不配置时行为与旧版本一致。当前实现里,prepare 下载/输入转换、output 上传/清理、channel pull/push 已经使用独立 executor;因此可以单独提高 prepare_concurrency 或 output_concurrency,而不是都挤在同一个 IO 池里竞争。
Core Components
- BatchProcessor: Asynchronous batch processor with configurable concurrency
- FileChannel: File monitoring with real-time change detection
- ProcessingContext: Task-level working directory management
- CeleryChannel: Distributed task execution via Celery
Architecture Integration
This SDK enables the Dataflow layer of the SandAI architecture:
- Operators built with this SDK are deployed in the
operators/directory - Pipelines in the
pipelines/directory compose these operators - Workflows in the
workflows/directory orchestrate complete business processes
Example Operators
See the operators/ directory for complete implementations:
video-clipper/: Video processing operatordata-transformer/: Data format conversion operator
Testing
make test # Run all tests
make test-sdk # Run SDK core tests
Development
Setup Local Minio
brew install minio/stable/minio
brew install minio/stable/mc
minio server var/minio
Setup Local Redis
brew install redis
brew services start redis
Setup Local Postgres
brew install postgresql
brew services start postgresql
List Services
brew services list
Creating New Operators
- Create operator directory in
../operators/my-operator/ - Implement using this SDK
- Deploy as Celery service
- Use in pipelines and workflows
Best Practices
- Keep operators focused on single responsibilities
- Use proper error handling and logging
- Implement comprehensive tests
- Document operator interfaces clearly
License
MIT License
打包和上传
make build ossutil cp dist/sandai_operator_sdk-0.2.7-py3-none-any.whl oss://python-artifacts/ -e oss-cn-shanghai.aliyuncs.com --acl public-read
本地开发安装
pip install -e /path/to/operator-sdk
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sandai_operator_sdk-0.3.0.tar.gz.
File metadata
- Download URL: sandai_operator_sdk-0.3.0.tar.gz
- Upload date:
- Size: 55.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
37125af2015612ebc86996decb2b9f36de00e724357b1faba9ebdc2dcaafa03d
|
|
| MD5 |
f44bec954b34380e9c5bb36bb8363e29
|
|
| BLAKE2b-256 |
bc3a636e57f8ba5d445b9d14af327a4235c666c7229db992fdf8aa303e2d538a
|
File details
Details for the file sandai_operator_sdk-0.3.0-py3-none-any.whl.
File metadata
- Download URL: sandai_operator_sdk-0.3.0-py3-none-any.whl
- Upload date:
- Size: 34.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
de0d89708fdb0d922011f64f362763548fb2097e7a0eee61f302e54f453f8e55
|
|
| MD5 |
ab65d0be616de1f1d062c5dc1d3dc12d
|
|
| BLAKE2b-256 |
c5f5b4bb7515548661ff3d1bfe4b97d16eb0f7da481cda906a09e426523a8170
|