Skip to main content

Client library for the Sandai Operator SDK

Project description

Sandai Operator Client

Lightweight Python client for submitting tasks to a Sandai operator over Celery.

Installation

pip install sandai-operator-client

For local development:

pip install -e ".[dev]"

Quick Start

from sandai.operator_client import create_client

client = create_client("video-clipper", "v1")

artifacts, results = client.sync(
    input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
    options={"clip_duration": 10},
)

Async usage:

import asyncio

from sandai.operator_client import create_async_client


async def main() -> None:
    async with create_async_client("video-clipper", "v1") as client:
        artifacts, results = await client.sync(
            input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
            options={"clip_duration": 10},
        )
        print(artifacts, results)


asyncio.run(main())

Core API

  • Sync client: create_client(operator_name, operator_version, **kwargs) returns OperatorClient.
  • Async client: create_async_client(operator_name, operator_version, **kwargs) returns AsyncOperatorClient.
  • Sync submit: client.async_call(...) returns Celery AsyncResult.
  • Async submit: await async_client.async_call(...) returns AsyncTaskHandle.
  • Sync request/response: client.sync(...) returns (artifacts, results).
  • Async request/response: await async_client.sync(...) returns (artifacts, results).
  • Sync description: client.desc is a cached property.
  • Async description: await async_client.desc() is an async cached method.
  • Sync legacy batch APIs: batch_async(...) and batch_sync(...) keep the existing best-effort contract.
  • Async legacy-shaped batch APIs: await async_client.batch_async(...) and await async_client.batch_sync(...) keep the same result shapes in async call sites.
  • Structured batch APIs: batch_submit(...), batch_collect(...), and batch_sync_detailed(...) are available on both clients.
  • Health APIs: probe() / ping() exist on both clients, with await required on the async client.
  • get_queue_info() returns the queue names used for each priority.

Task Controls

The client supports the same task-control fields used by the operator runtime:

  • timeout: converted into an absolute deadline before sending the task.
  • cancel_key: propagated to the operator so tasks can be skipped before compute starts.
  • persistent_output: stored under meta["persistent-output"] for runtimes that preserve uploaded outputs.

Batch APIs

The package now exposes two batch styles:

  • Legacy compatibility APIs: batch_async(...) and batch_sync(...) keep the previous best-effort semantics.
  • Structured APIs: batch_submit(...), batch_collect(...), and batch_sync_detailed(...) expose explicit per-task submission/execution status instead of collapsing failures into empty results.

Async batch methods follow the same contract, but are awaited:

  • await async_client.batch_submit(...) returns AsyncSubmittedTask objects.
  • await async_client.batch_async(...) returns AsyncTaskHandle | None per task.
  • await async_client.batch_collect(...) returns BatchTaskResult objects.
  • await async_client.batch_sync_detailed(...) returns BatchTaskResult objects.
  • await async_client.batch_sync(...) returns legacy (artifacts, results) tuples.

Prefer the structured APIs for new integrations.

Async API

AsyncOperatorClient is an additive API layer. It does not remove or replace OperatorClient.

AsyncTaskHandle exposes:

  • task_id / id
  • await handle.get(timeout=...)
  • await handle.ready()
  • await handle.state()
  • await handle.forget()

Implementation notes:

  • Task submission remains compatible with the current Celery-based operator side.
  • When result_backend is Redis, the async client uses redis.asyncio plus a shared polling hub for async result waiting.
  • The async client is intended for async application integration and lower blocking overhead on result collection.
  • The sync client remains the compatibility/default path for existing callers.

Health Checks

ping() and probe() are intentionally different:

  • ping() returns True when task submission succeeds.
  • probe() returns a structured HealthCheckResult with submit_ok, retrieval_ok, response_ok, and optional error details.

Use probe() when you need a meaningful health signal.

Priority Model

Supported priorities are:

  • high
  • normal
  • low
  • very_low

Queue names follow this pattern:

<priority>.<operator_name>.<operator_version>

Environment Variables

The client reads these settings when explicit connection arguments are not provided:

  • SANDAI_OPERATOR_CELERY_BROKER_URL
  • SANDAI_OPERATOR_CELERY_RESULT_BACKEND
  • SANDAI_OPERATOR_CELERY_TASK_SERIALIZER
  • SANDAI_OPERATOR_CELERY_RESULT_SERIALIZER
  • SANDAI_OPERATOR_AUTO_FORGET_RESULT
  • SANDAI_OPERATOR_BROKER_POOL_LIMIT
  • SANDAI_OPERATOR_BACKEND_POOL_LIMIT
  • SANDAI_OPERATOR_BACKEND_POLL_CONCURRENCY
  • SANDAI_OPERATOR_POOL_ACQUIRE_TIMEOUT
  • SANDAI_OPERATOR_SHARED_TRANSPORT_SCOPE

If broker/backend are not configured, the client falls back to local Redis defaults:

redis://localhost:6379/0

By default the client now submits Celery task payloads with msgpack, keeps Celery results on json, and advertises both json and msgpack in accept_content so it can interoperate with operators during rollout.

Connection Sharing

The client now uses process-local shared transport resources for broker access and result-backend access.

This sharing is intentionally conservative:

  • Default scope is operator.
  • Broker and result backend are shared independently.
  • A resource is shared only when the full sharing key matches.

Default Behavior

With the default shared_transport_scope="operator":

  • Two clients for the same operator_name + operator_version may share broker/backend resources.
  • Two clients for different operators do not share by default, even if they point to the same Redis address.
  • If you set shared_transport_scope="process", compatible clients across different operators can share the same resources.

Sharing Rules

The following table describes when two clients share the same process-local resource.

Parameter / property Broker sharing affected Backend sharing affected Notes
shared_transport_scope Yes Yes Default is operator. process allows cross-operator sharing when other fields also match.
operator_name Yes, when scope is operator Yes, when scope is operator Different operators do not share by default.
operator_version Yes, when scope is operator Yes, when scope is operator Version is part of the default scope identity.
broker_url Yes No Different broker addresses always use different broker resources.
result_backend No Yes Different result-backend addresses always use different backend resources.
task_serializer Yes No Broker-side Celery app compatibility depends on task serializer.
result_serializer No Yes Backend decode behavior depends on result serializer.
accept_content Yes Yes Different accepted serializer sets do not share.
broker_pool_limit Yes No Different broker limits create different broker resource groups.
backend_pool_limit No Yes Different backend pool limits create different backend resource groups.
backend_poll_concurrency No Yes Backend operation concurrency participates in backend sharing.

Practical examples:

Scenario Shared? Why
Same operator, same Redis addresses, same pool settings Yes Full sharing key matches.
Same operator, same Redis addresses, different backend_poll_concurrency No for backend Backend concurrency is part of the backend key.
Same operator, same backend, different broker_pool_limit No for broker Broker pool limit is part of the broker key.
Different operators, same Redis addresses, default scope No Default scope is operator.
Different operators, same Redis addresses, shared_transport_scope="process", same settings Yes Cross-operator sharing is enabled by scope.

Connection Limits And Runtime Behavior

The client has two layers of control:

  • Pool size: how many Redis connections the underlying client is allowed to keep.
  • Operation concurrency: how many broker/backend operations can actively hit Redis at the same time.

Key Limits

Setting Default Applies to Effect
broker_pool_limit 8 Broker Broker-side connection budget for shared broker resources.
backend_pool_limit 8 Result backend Redis connection-pool budget for shared backend resources.
backend_poll_concurrency 4 Result backend Maximum concurrent backend polling/forget operations. Effective backend concurrency is min(backend_pool_limit, backend_poll_concurrency).
pool_acquire_timeout 30.0 Broker and backend Maximum time to wait for a bounded broker/backend slot before raising ClientPoolTimeoutError.

You can pass these directly in code:

from sandai.operator_client import OperatorClient

client = OperatorClient(
    "video-clipper",
    "v1",
    broker_pool_limit=4,
    backend_pool_limit=6,
    backend_poll_concurrency=3,
    pool_acquire_timeout=10.0,
    shared_transport_scope="operator",
)

The same settings are available on AsyncOperatorClient.

Current Connection Behavior

Current implementation behavior:

  • Broker submission uses short-lived write connections instead of a long-lived producer connection held by each client.
  • Result polling uses bounded backend concurrency and releases slots promptly after each poll.
  • sync and async clients both use the same bounded transport model.
  • Connection resources are keyed separately for broker and backend, so different Redis addresses are isolated naturally.
  • close() / aclose() release the client's references to shared resources.

Verified Behavior Under Load

Using the real Redis benchmark in operator-client/tests/redis_connection_benchmark.py, the current implementation shows:

  • Broker-side operator-client Redis connections return to 0 after scenario completion.
  • Backend-side operator-client Redis connections also return to 0 after scenario completion.
  • Peak backend operator-client connections stay bounded by the configured backend concurrency budget rather than request count.
  • Increasing request count from 40 to 120 did not cause unbounded operator-client connection growth.

Representative benchmark outcome after the current refactor:

Scenario Requests Broker operator clients at settle Backend operator clients at settle Notes
sync_shared_client_threads 120 0 0 Shared sync client across threads.
sync_thread_local_clients 120 0 0 One sync client per worker thread.
async_single_loop 120 0 0 Async client on one event loop.
async_multi_thread_loops 120 0 0 Async clients across multiple threads and loops.

This means the current implementation is designed to keep connection behavior bounded by configuration, not by request volume.

Pressure Test And Performance Impact

An additional real Redis pressure test was run with a higher load profile:

  • requests=240
  • sync_threads=24
  • async_concurrency=96
  • async_threads=4
  • async_thread_concurrency=24
  • operator_concurrency=32
  • mock_delay_ms=150

Two configurations were compared:

  • Default bounded settings: broker_pool_limit=8, backend_pool_limit=8, backend_poll_concurrency=4
  • Relaxed settings: broker_pool_limit=32, backend_pool_limit=32, backend_poll_concurrency=32

Observed wall-clock times:

Scenario Default bounded settings Relaxed settings Observation
sync_shared_client_threads 12.39s 12.29s Nearly identical.
sync_thread_local_clients 13.99s 12.35s Faster with relaxed settings, but this mode also raised backend connection peaks significantly.
async_single_loop 5.07s 4.92s Small improvement, about 3 percent.
async_multi_thread_loops 4.94s 5.03s Effectively the same within normal run-to-run variance.

Observed backend operator-client connection peaks:

Scenario Default bounded settings Relaxed settings
sync_shared_client_threads 4 11
sync_thread_local_clients 4 14
async_single_loop 4 13
async_multi_thread_loops 4 25

Interpretation:

  • The bounded default configuration keeps backend operator-client connections stable at a low ceiling.
  • Increasing request volume from 120 to 240 did not cause backend operator-client connection growth under the default settings.
  • Relaxing the limits produced only small or inconsistent latency improvements in most scenarios.
  • Relaxing the limits did increase backend connection peaks substantially.

Current recommendation:

  • Keep the default limits unless you have a measured throughput bottleneck.
  • If you need to tune for higher throughput, adjust backend_poll_concurrency first and re-measure before increasing all limits together.
  • Treat larger pool limits as a deliberate tradeoff: slightly more concurrency headroom in exchange for meaningfully higher Redis connection usage.

Error Model

sync() and desc normalize failures into typed exceptions where possible:

  • TaskTimeoutError
  • TaskExecutionError
  • TaskDeadlineExceededError
  • TaskCancelledError
  • InvalidTaskStatusError
  • InvalidResultFormatError
  • InvalidPriorityError

Structured batch APIs surface task errors directly on each result object instead of converting them into empty legacy tuples.

Testing

Run the client regression suite from the repository root:

python operator-client/tests/run_all_tests.py

The suite is mock-based and does not require a live Celery or Redis deployment.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sandai_operator_client-0.4.7.tar.gz (32.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sandai_operator_client-0.4.7-py3-none-any.whl (23.1 kB view details)

Uploaded Python 3

File details

Details for the file sandai_operator_client-0.4.7.tar.gz.

File metadata

  • Download URL: sandai_operator_client-0.4.7.tar.gz
  • Upload date:
  • Size: 32.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for sandai_operator_client-0.4.7.tar.gz
Algorithm Hash digest
SHA256 06b7ec4536638ff7bb0d1fe0124c6f59e9715aae994eff30a3c1e76194ea1056
MD5 a7f1989cc31fdfe42ef7c1e47e322f83
BLAKE2b-256 876b8942f0ef6138ac7232e183f641351a5159ff33c52b0b0dcc55a7667dc852

See more details on using hashes here.

File details

Details for the file sandai_operator_client-0.4.7-py3-none-any.whl.

File metadata

File hashes

Hashes for sandai_operator_client-0.4.7-py3-none-any.whl
Algorithm Hash digest
SHA256 158e11807913262178c2728b57e89c8e8baa2d3055cc24d96c19cce138d48bf1
MD5 ee739b80b90cf4eb3211e6a45d1cf99d
BLAKE2b-256 1efae8a2dce3ca8d2013417aa4c9f1ca35c0319388a9da8a9a8684827c050844

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page