Skip to main content

A Python SDK for building high-performance, asynchronous batch processing operators

Project description

SandAI Operator SDK

Python framework for developing data operators under the Dataflow architecture. Part of the SandAI Data Project's three-layer separation design.

Overview

The Operator SDK provides the foundation for building data processing operators that form the Dataflow layer in the SandAI architecture. These operators are atomic, reusable components that can be composed into complex pipelines and workflows.

Features

  • Asynchronous Batch Processing: Concurrent processing with configurable batch size and concurrency
  • Smart File Monitoring: Real-time file change detection with vim editor compatibility
  • Task Working Directories: Isolated working directories for each task
  • Error Recovery: Automatic handling of file operations and network interruptions
  • Standardized Interface: Consistent operator lifecycle and API design
  • Celery Integration: Built-in support for distributed task execution

Installation

cd operator-sdk
pip install -e .
conda create -n sandai-operator python=3.14=h0369b99_1_cp314t -c conda-forge

Quick Start

from sandai.operator import BatchProcessor, TaskInput, TaskOutput
from pydantic import BaseModel
from typing import List, Generator

class Options(BaseModel):
    param: str = "default"

class Results(BaseModel):
    output: str

processor = BatchProcessor(name="my-processor", version="1.0.0")

@processor.on_batch(max_concurrency=4, max_batch_size=8)
def process_batch(
    batch_inputs: List[TaskInput[Options]], 
    operator_config: dict,
    context
) -> Generator[TaskOutput[Results], None, None]:
    
    for task_input in batch_inputs:
        # Get task working directory
        workdir = context.get_task_workdir(task_input.task_id)
        
        # Your processing logic here
        result = Results(output=f"processed-{task_input.options.param}")
        
        yield TaskOutput[Results](
            task_id=task_input.task_id,
            results=result,
            status="success"
        )

if __name__ == "__main__":
    processor.run()

Core Components

  • BatchProcessor: Asynchronous batch processor with configurable concurrency
  • FileChannel: File monitoring with real-time change detection
  • ProcessingContext: Task-level working directory management
  • CeleryChannel: Distributed task execution via Celery

Architecture Integration

This SDK enables the Dataflow layer of the SandAI architecture:

  • Operators built with this SDK are deployed in the operators/ directory
  • Pipelines in the pipelines/ directory compose these operators
  • Workflows in the workflows/ directory orchestrate complete business processes

Example Operators

See the operators/ directory for complete implementations:

  • video-clipper/: Video processing operator
  • data-transformer/: Data format conversion operator

Testing

make test          # Run all tests
make test-sdk      # Run SDK core tests

Development

Setup Local Minio

brew install minio/stable/minio
brew install minio/stable/mc
minio server var/minio

Setup Local Redis

brew install redis
brew services start redis

Setup Local Postgres

brew install postgresql
brew services start postgresql

List Services

brew services list

Creating New Operators

  1. Create operator directory in ../operators/my-operator/
  2. Implement using this SDK
  3. Deploy as Celery service
  4. Use in pipelines and workflows

Best Practices

  • Keep operators focused on single responsibilities
  • Use proper error handling and logging
  • Implement comprehensive tests
  • Document operator interfaces clearly

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sandai_operator_sdk-0.2.6.tar.gz (63.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sandai_operator_sdk-0.2.6-py3-none-any.whl (31.6 kB view details)

Uploaded Python 3

File details

Details for the file sandai_operator_sdk-0.2.6.tar.gz.

File metadata

  • Download URL: sandai_operator_sdk-0.2.6.tar.gz
  • Upload date:
  • Size: 63.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for sandai_operator_sdk-0.2.6.tar.gz
Algorithm Hash digest
SHA256 b050392fd401c07e730585cea86c1ec07175e6fabc0e949b7f44ab6212e0fb9d
MD5 d671eebfcf4c748006af395ccbab4e25
BLAKE2b-256 22553cb508f48bf8aff66ba45ce96f6cd77e09d699458fff3656dbf0904d499b

See more details on using hashes here.

File details

Details for the file sandai_operator_sdk-0.2.6-py3-none-any.whl.

File metadata

File hashes

Hashes for sandai_operator_sdk-0.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 fc3c3c78240c97f81b0cab6dc36a2f4ff2a7f015dd8c14222123ce97e97c9419
MD5 875ad6d6c9656a4fe64bb383b7b2006b
BLAKE2b-256 e6b3d072ac987937286c84457e951eed723a1f4d43383c6b1b26d5166117f513

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page