A Python SDK for building high-performance, asynchronous batch processing operators
Project description
SandAI Operator SDK
Python framework for developing data operators under the Dataflow architecture. Part of the SandAI Data Project's three-layer separation design.
Overview
The Operator SDK provides the foundation for building data processing operators that form the Dataflow layer in the SandAI architecture. These operators are atomic, reusable components that can be composed into complex pipelines and workflows.
For the full documentation set, topic guides, and architecture notes, start with docs/README.md.
Features
- Asynchronous Batch Processing: Concurrent processing with configurable batch size and concurrency
- Smart File Monitoring: Real-time file change detection with vim editor compatibility
- Task Working Directories: Isolated working directories for each task
- Error Recovery: Automatic handling of file operations and network interruptions
- Standardized Interface: Consistent operator lifecycle and API design
- Celery Integration: Built-in support for distributed task execution
Installation
pip install sandai-operator-sdk
conda create -n sandai-operator python=3.8 -c conda-forge
Runtime requirement: the SDK supports Python 3.8+. If you enable free-threading/no-GIL on a newer Python version, you may get better CPU parallelism, but it is not required to run the SDK.
Quick Start
from sandai.operator import BatchProcessor, TaskInput, TaskOutput
from pydantic import BaseModel
from typing import List, Generator
class Options(BaseModel):
param: str = "default"
class Results(BaseModel):
output: str
processor = BatchProcessor(name="my-processor", version="1.0.0")
@processor.on_batch(
max_concurrency=4,
max_batch_size=8,
prepare_concurrency=4,
output_concurrency=4,
)
def process_batch(
batch_inputs: List[TaskInput[Options]],
operator_config: dict,
context
) -> Generator[TaskOutput[Results], None, None]:
for task_input in batch_inputs:
# Get task working directory
workdir = context.get_task_workdir(task_input.task_id)
# Your processing logic here
result = Results(output=f"processed-{task_input.options.param}")
yield TaskOutput[Results](
task_id=task_input.task_id,
results=result,
status="success"
)
if __name__ == "__main__":
processor.run()
prepare_concurrency and output_concurrency inherit from max_concurrency by default, so behavior remains backward-compatible when they are not configured. In the current implementation, prepare download/input conversion, output upload/cleanup, and channel pull/push already run on separate executors. That means you can increase prepare_concurrency or output_concurrency independently instead of forcing all IO work to compete in the same pool.
At the Celery protocol layer, the server still uses json encoding by default, but it now accepts both json and msgpack content types by default. The newer operator-client uses msgpack by default, so the two components interoperate out of the box.
If a Celery task is redelivered to another worker after visibility_timeout, the server now also supports limiting delivery attempts through max_delivery_attempts. Like other runtime parameters, you can configure it through BatchProcessor(...), @processor.on_batch(...), or the SANDAI_OPERATOR_CELERY_MAX_DELIVERY_ATTEMPTS environment variable. Once the limit is exceeded, the server marks the task as TaskDeliveryLimitExceededError and stops further redelivery. The default value is 0, which disables this protection.
Core Components
- BatchProcessor: Asynchronous batch processor with configurable concurrency
- FileChannel: File monitoring with real-time change detection
- ProcessingContext: Task-level working directory management
- CeleryChannel: Distributed task execution via Celery
Architecture Integration
This SDK enables the Dataflow layer of the SandAI architecture:
- Operators built with this SDK are deployed in the
operators/directory - Pipelines in the
pipelines/directory compose these operators - Workflows in the
workflows/directory orchestrate complete business processes
Example Operators
See the operators/ directory for complete implementations:
video-clipper/: Video processing operatordata-transformer/: Data format conversion operator
Testing
make test # Run all tests
make test-sdk # Run SDK core tests
Supervisor CLI
operator-sdk provides sdrun for launching multiple identical worker processes, aggregating logs, forwarding signals, and supervising worker lifecycle policies.
sdrun -w 4 --restart always -- python main.py -j --mode file
-w/--worker: number of worker processes to launch, default1--restart never: default; do not restart workers after a non-zero exit--restart always: always restart a worker after a non-zero exit--restart N: restart a worker at mostNtimes after non-zero exits--success-exit ignore: default; when a worker exits with code0, do not affect other workers--success-exit shutdown: when a worker exits with code0, stop the remaining workers--failure-exit ignore: default; when a worker exits non-zero and will not be restarted, do not affect other workers--failure-exit shutdown: when a worker exits non-zero and will not be restarted, stop the remaining workers and return that worker's exit code--startup-stagger SECONDS: sequential startup delay, default0; for example0.5startsworker-1after0.5sandworker-2after1.0s
Policy model:
--restartonly controls whether the exited worker itself should be restarted after a non-zero exit.--success-exitcontrols whether a clean exit from one worker should stop the rest.--failure-exitcontrols whether a non-zero exit from one worker, once no more restarts apply, should stop the rest.- If all workers eventually exit without supervisor-forced shutdown,
sdrunexits with the sum of all final worker exit codes. - If
--failure-exit shutdownis used,sdrunexits with the first non-restarted failing worker's exit code. SIGTERM,SIGINT,SIGHUP, andSIGQUITreceived bysdrunare forwarded to all workers.- Logs are prefixed with worker identity, for example
[worker-2#1][stdout] .... - On POSIX,
sdrunstarts each worker in its own process group. On Linux it also installs a parent-death signal beforeexecso workers are terminated if the supervisor disappears unexpectedly. - Child processes receive
SDRUN_MODE=true,SDRUN_WORLD_SIZE,SDRUN_RANK, andSDRUN_LOCAL_RANK.
Common combinations:
- Independent workers:
--restart never --success-exit ignore --failure-exit ignore - Fail-fast workers:
--restart never --success-exit ignore --failure-exit shutdown - Elastic recovery on failures:
--restart always --success-exit ignore --failure-exit shutdown - First clean completion wins:
--restart never --success-exit shutdown --failure-exit shutdown
If sdrun causes GPU memory usage to explode because multiple worker processes each hold their own copy of large tensors or model weights, consider using shared-tensor to share those tensors across processes: https://github.com/world-sim-dev/shared-tensor. This is especially useful for single-GPU, multi-process inference when the model runtime is not thread-safe and threads cannot be used safely.
FileChannel With SDRUN
When workers are launched by sdrun and the operator runs in file mode:
FileChannelshards input lines by line index usingline_index % SDRUN_WORLD_SIZE == SDRUN_RANK.- Each worker processes only the JSONL rows assigned to its rank.
- Output files are renamed by inserting the rank before the extension, for example
output.jsonlbecomesoutput.0.jsonlandoutput.1.jsonl. - If the output file has no extension, the rank suffix is appended directly to the filename.
This means sdrun -w 4 -- python main.py --mode file ... produces 4 parallel output files that must be merged by the caller if a single combined result is needed.
Development
Set Up Local MinIO
brew install minio/stable/minio
brew install minio/stable/mc
minio server var/minio
Set Up Local Redis
brew install redis
brew services start redis
Set Up Local Postgres
brew install postgresql
brew services start postgresql
List Services
brew services list
Creating New Operators
- Create operator directory in
../operators/my-operator/ - Implement using this SDK
- Deploy as Celery service
- Use in pipelines and workflows
Best Practices
- Keep operators focused on single responsibilities
- Use proper error handling and logging
- Implement comprehensive tests
- Document operator interfaces clearly
License
MIT License
Build and Upload
make build ossutil cp dist/sandai_operator_sdk-0.2.7-py3-none-any.whl oss://python-artifacts/ -e oss-cn-shanghai.aliyuncs.com --acl public-read
Local Development Install
pip install -e /path/to/operator-sdk
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sandai_operator_sdk-0.6.2.tar.gz.
File metadata
- Download URL: sandai_operator_sdk-0.6.2.tar.gz
- Upload date:
- Size: 98.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60552a0360f4f8989cdcd309f0c2bb64de5c5cd709cf328c9f0035eefa6a181f
|
|
| MD5 |
8af688b5e36c330dcf508f40df4ac514
|
|
| BLAKE2b-256 |
c9875711b791ddf41023494b7961edc8d2235e40cc3378cebea35d9eb1e7cbe4
|
File details
Details for the file sandai_operator_sdk-0.6.2-py3-none-any.whl.
File metadata
- Download URL: sandai_operator_sdk-0.6.2-py3-none-any.whl
- Upload date:
- Size: 57.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4a12514f6ffdcbb85c46973d48e3b3f7d582f91ace94fae03c145ccd6b59407e
|
|
| MD5 |
bb98c6c2b0a8c16a1518907db5e4db35
|
|
| BLAKE2b-256 |
3a0e0b9f95af4378daf5cec5141cd5ef8267c42bd48dbea1872f6aeb2ce08969
|