Skip to main content

Client library for the Sandai Operator SDK

Project description

Sandai Operator Client

Lightweight Python client for submitting tasks to a Sandai operator over Celery.

Installation

pip install sandai-operator-client

For local development:

pip install -e ".[dev]"

Quick Start

from sandai.operator_client import create_client

client = create_client("video-clipper", "v1")

artifacts, results = client.sync(
    input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
    options={"clip_duration": 10},
)

Async usage:

import asyncio

from sandai.operator_client import create_async_client


async def main() -> None:
    async with create_async_client("video-clipper", "v1") as client:
        artifacts, results = await client.sync(
            input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
            options={"clip_duration": 10},
        )
        print(artifacts, results)


asyncio.run(main())

Core API

  • Sync client: create_client(operator_name, operator_version, **kwargs) returns OperatorClient.
  • Async client: create_async_client(operator_name, operator_version, **kwargs) returns AsyncOperatorClient.
  • Sync submit: client.async_call(...) returns Celery AsyncResult.
  • Async submit: await async_client.async_call(...) returns AsyncTaskHandle.
  • Sync request/response: client.sync(...) returns (artifacts, results).
  • Async request/response: await async_client.sync(...) returns (artifacts, results).
  • Sync description: client.desc is a cached property.
  • Async description: await async_client.desc() is an async cached method.
  • Sync legacy batch APIs: batch_async(...) and batch_sync(...) keep the existing best-effort contract.
  • Async legacy-shaped batch APIs: await async_client.batch_async(...) and await async_client.batch_sync(...) keep the same result shapes in async call sites.
  • Structured batch APIs: batch_submit(...), batch_collect(...), and batch_sync_detailed(...) are available on both clients.
  • Structured batch collection uses a batch-level timeout budget: completed tasks return normal results, unfinished submitted tasks return TaskTimeoutError, and result order matches input order.
  • Health APIs: probe() / ping() exist on both clients, with await required on the async client.
  • get_queue_info() returns the queue names used for each priority.

Task Controls

The client supports the same task-control fields used by the operator runtime:

  • timeout: converted into an absolute deadline before sending the task.
  • Blocking request/response APIs default to 3600 seconds when timeout is omitted; probe() keeps its short 10 second default.
  • cancel_key: propagated to the operator so tasks can be skipped before compute starts.
  • persistent_output: stored under meta["persistent-output"] for runtimes that preserve uploaded outputs.

Batch APIs

The package now exposes two batch styles:

  • Legacy compatibility APIs: batch_async(...) and batch_sync(...) keep the previous best-effort semantics.
  • Structured APIs: batch_submit(...), batch_collect(...), and batch_sync_detailed(...) expose explicit per-task submission/execution status instead of collapsing failures into empty results.
  • batch_collect(...) and batch_sync_detailed(...) treat timeout as a total batch budget instead of a per-item wait.
  • When the batch deadline expires, unfinished submitted tasks are surfaced as TaskTimeoutError entries instead of keeping later tasks blocked behind earlier waits.

Async batch methods follow the same contract, but are awaited:

  • await async_client.batch_submit(...) returns AsyncSubmittedTask objects.
  • await async_client.batch_async(...) returns AsyncTaskHandle | None per task.
  • await async_client.batch_collect(...) returns BatchTaskResult objects.
  • await async_client.batch_sync_detailed(...) returns BatchTaskResult objects.
  • await async_client.batch_sync(...) returns legacy (artifacts, results) tuples.

Prefer the structured APIs for new integrations.

Async API

AsyncOperatorClient is an additive API layer. It does not remove or replace OperatorClient.

AsyncTaskHandle exposes:

  • task_id / id
  • await handle.get(timeout=...)
  • await handle.ready()
  • await handle.state()
  • await handle.forget()

Implementation notes:

  • Task submission remains compatible with the current Celery-based operator side.
  • When result_backend is Redis, the async client uses redis.asyncio plus a shared polling hub for async result waiting.
  • The async client is intended for async application integration and lower blocking overhead on result collection.
  • The sync client remains the compatibility/default path for existing callers.

Health Checks

ping() and probe() are intentionally different:

  • ping() returns True when task submission succeeds.
  • probe() returns a structured HealthCheckResult with submit_ok, retrieval_ok, response_ok, and optional error details.

Use probe() when you need a meaningful health signal.

Priority Model

Supported priorities are:

  • high
  • normal
  • low
  • very_low

Queue names follow this pattern:

<priority>.<operator_name>.<operator_version>

Environment Variables

The client reads these settings when explicit connection arguments are not provided:

  • SANDAI_OPERATOR_CELERY_BROKER_URL
  • SANDAI_OPERATOR_CELERY_RESULT_BACKEND
  • SANDAI_OPERATOR_CELERY_TASK_SERIALIZER
  • SANDAI_OPERATOR_CELERY_RESULT_SERIALIZER
  • SANDAI_OPERATOR_AUTO_FORGET_RESULT
  • SANDAI_OPERATOR_BROKER_POOL_LIMIT
  • SANDAI_OPERATOR_BACKEND_POOL_LIMIT
  • SANDAI_OPERATOR_BACKEND_POLL_CONCURRENCY
  • SANDAI_OPERATOR_POOL_ACQUIRE_TIMEOUT
  • SANDAI_OPERATOR_SHARED_TRANSPORT_SCOPE

If broker/backend are not configured, the client falls back to local Redis defaults:

redis://localhost:6379/0

By default the client now submits Celery task payloads with msgpack, keeps Celery results on json, and advertises both json and msgpack in accept_content so it can interoperate with operators during rollout.

Connection Sharing

The client now uses process-local shared transport resources for broker access and result-backend access.

This sharing is intentionally conservative:

  • Default scope is operator.
  • Broker and result backend are shared independently.
  • A resource is shared only when the full sharing key matches.

Default Behavior

With the default shared_transport_scope="operator":

  • Two clients for the same operator_name + operator_version may share broker/backend resources.
  • Two clients for different operators do not share by default, even if they point to the same Redis address.
  • If you set shared_transport_scope="process", compatible clients across different operators can share the same resources.

Sharing Rules

The following table describes when two clients share the same process-local resource.

Parameter / property Broker sharing affected Backend sharing affected Notes
shared_transport_scope Yes Yes Default is operator. process allows cross-operator sharing when other fields also match.
operator_name Yes, when scope is operator Yes, when scope is operator Different operators do not share by default.
operator_version Yes, when scope is operator Yes, when scope is operator Version is part of the default scope identity.
broker_url Yes No Different broker addresses always use different broker resources.
result_backend No Yes Different result-backend addresses always use different backend resources.
task_serializer Yes No Broker-side Celery app compatibility depends on task serializer.
result_serializer No Yes Backend decode behavior depends on result serializer.
accept_content Yes Yes Different accepted serializer sets do not share.
broker_pool_limit Yes No Different broker limits create different broker resource groups.
backend_pool_limit No Yes Different backend pool limits create different backend resource groups.
backend_poll_concurrency No Yes Backend operation concurrency participates in backend sharing.

Practical examples:

Scenario Shared? Why
Same operator, same Redis addresses, same pool settings Yes Full sharing key matches.
Same operator, same Redis addresses, different backend_poll_concurrency No for backend Backend concurrency is part of the backend key.
Same operator, same backend, different broker_pool_limit No for broker Broker pool limit is part of the broker key.
Different operators, same Redis addresses, default scope No Default scope is operator.
Different operators, same Redis addresses, shared_transport_scope="process", same settings Yes Cross-operator sharing is enabled by scope.

Connection Limits And Runtime Behavior

The client has two layers of control:

  • Pool size: how many Redis connections the underlying client is allowed to keep.
  • Operation concurrency: how many broker/backend operations can actively hit Redis at the same time.

Key Limits

Setting Default Applies to Effect
broker_pool_limit 24 Broker Broker-side connection budget for shared broker resources.
backend_pool_limit 24 Result backend Redis connection-pool budget for shared backend resources.
backend_poll_concurrency 12 Result backend Maximum concurrent backend polling/forget operations. Effective backend concurrency is min(backend_pool_limit, backend_poll_concurrency).
pool_acquire_timeout 10.0 Broker and backend Maximum time to wait for a bounded broker/backend slot before raising ClientPoolTimeoutError.

You can pass these directly in code:

from sandai.operator_client import OperatorClient

client = OperatorClient(
    "video-clipper",
    "v1",
    broker_pool_limit=4,
    backend_pool_limit=6,
    backend_poll_concurrency=3,
    pool_acquire_timeout=10.0,
    shared_transport_scope="operator",
)

The same settings are available on AsyncOperatorClient.

Current Connection Behavior

Current implementation behavior:

  • Broker submission uses short-lived write connections instead of a long-lived producer connection held by each client.
  • Result polling uses bounded backend concurrency and releases slots promptly after each poll.
  • sync and async clients both use the same bounded transport model.
  • Connection resources are keyed separately for broker and backend, so different Redis addresses are isolated naturally.
  • close() / aclose() release the client's references to shared resources.

Verified Behavior Under Load

Using the real Redis benchmark in operator-client/tests/redis_connection_benchmark.py, the current implementation shows:

  • Broker-side operator-client Redis connections return to 0 after scenario completion.
  • Backend-side operator-client Redis connections also return to 0 after scenario completion.
  • Peak backend operator-client connections stay bounded by the configured backend concurrency budget rather than request count.
  • Increasing request count from 40 to 120 did not cause unbounded operator-client connection growth.

Representative benchmark outcome after the current refactor:

Scenario Requests Broker operator clients at settle Backend operator clients at settle Notes
sync_shared_client_threads 120 0 0 Shared sync client across threads.
sync_thread_local_clients 120 0 0 One sync client per worker thread.
async_single_loop 120 0 0 Async client on one event loop.
async_multi_thread_loops 120 0 0 Async clients across multiple threads and loops.

This means the current implementation is designed to keep connection behavior bounded by configuration, not by request volume.

Pressure Test And Performance Impact

An additional real Redis pressure test was run with a larger load profile:

  • requests=480
  • sync_threads=64
  • async_concurrency=192
  • async_threads=8
  • async_thread_concurrency=48
  • operator_concurrency=64
  • mock_delay_ms=300
  • request_timeout=3600

Two configurations were compared:

  • Current default settings: broker_pool_limit=24, backend_pool_limit=24, backend_poll_concurrency=12
  • Relaxed settings: broker_pool_limit=32, backend_pool_limit=32, backend_poll_concurrency=32

Observed wall-clock times:

Scenario Current default settings Relaxed settings Observation
sync_shared_client_threads 13.22s N/A 480/480 succeeded with bounded backend usage.
sync_thread_local_clients 14.11s N/A 480/480 succeeded; per-thread clients still reused the same operator-scoped resources.
async_single_loop 6.77s N/A 480/480 succeeded with the same backend ceiling.
async_multi_thread_loops 5.07s N/A 480/480 succeeded across 8 event-loop threads.

Observed backend operator-client connection peaks:

Scenario Current default settings Relaxed settings
sync_shared_client_threads 4 N/A
sync_thread_local_clients 4 N/A
async_single_loop 4 N/A
async_multi_thread_loops 4 N/A

Interpretation:

  • The current default configuration kept backend operator-client connections capped at 4 even at 480 requests and much higher client-side concurrency.
  • Broker-side operator-client connections stayed near baseline and settled back after each scenario.
  • Increasing request volume from 240 to 480 did not cause unbounded operator-client connection growth.
  • Throughput remained acceptable without raising the configured pool limits, which is the intended tradeoff for worker-side thread-heavy deployments.

Current recommendation:

  • Keep the current defaults unless you have a measured throughput bottleneck.
  • If you need to tune for higher throughput, adjust backend_poll_concurrency first and re-measure before increasing all limits together.
  • Treat larger pool limits as a deliberate tradeoff: more concurrency headroom in exchange for higher Redis connection usage.

Error Model

sync() and desc normalize failures into typed exceptions where possible:

  • TaskTimeoutError
  • TaskExecutionError
  • TaskDeadlineExceededError
  • TaskCancelledError
  • InvalidTaskStatusError
  • InvalidResultFormatError
  • InvalidPriorityError

Structured batch APIs surface task errors directly on each result object instead of converting them into empty legacy tuples.

Testing

Run the client regression suite from the repository root:

python operator-client/tests/run_all_tests.py

The suite is mock-based and does not require a live Celery or Redis deployment.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sandai_operator_client-0.4.8.tar.gz (34.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sandai_operator_client-0.4.8-py3-none-any.whl (23.8 kB view details)

Uploaded Python 3

File details

Details for the file sandai_operator_client-0.4.8.tar.gz.

File metadata

  • Download URL: sandai_operator_client-0.4.8.tar.gz
  • Upload date:
  • Size: 34.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for sandai_operator_client-0.4.8.tar.gz
Algorithm Hash digest
SHA256 0df8ec9bb336bed8afb650d03f9ae55809881991e77539da9d79e3b78dd6dcf7
MD5 28065e74428ed07701344f525e098dc5
BLAKE2b-256 ce6926aa46d04c3da7223447cbb4acfcc326ab5103a3e1f55131aaa23b1a2adc

See more details on using hashes here.

File details

Details for the file sandai_operator_client-0.4.8-py3-none-any.whl.

File metadata

File hashes

Hashes for sandai_operator_client-0.4.8-py3-none-any.whl
Algorithm Hash digest
SHA256 722be141c2d2018b23e93abd77edb2b2d999324379340be65ae655d0981a5d87
MD5 66b537488c1948150f7c54d02a43e1c8
BLAKE2b-256 ed34b818dd2fa99271f5e206932a7cf72aff19c60561737b1cba52b5fdd2ea09

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page