Client library for the Sandai Operator SDK
Project description
Sandai Operator Client
Lightweight Python client for submitting tasks to a Sandai operator over Celery.
Installation
pip install sandai-operator-client
For local development:
pip install -e ".[dev]"
Quick Start
from sandai.operator_client import create_client
client = create_client("video-clipper", "v1")
artifacts, results = client.sync(
input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
options={"clip_duration": 10},
)
Async usage:
import asyncio
from sandai.operator_client import create_async_client
async def main() -> None:
async with create_async_client("video-clipper", "v1") as client:
artifacts, results = await client.sync(
input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
options={"clip_duration": 10},
)
print(artifacts, results)
asyncio.run(main())
Core API
- Sync client:
create_client(operator_name, operator_version, **kwargs)returnsOperatorClient. - Async client:
create_async_client(operator_name, operator_version, **kwargs)returnsAsyncOperatorClient. - Sync submit:
client.async_call(...)returns CeleryAsyncResult. - Async submit:
await async_client.async_call(...)returnsAsyncTaskHandle. - Sync request/response:
client.sync(...)returns(artifacts, results). - Async request/response:
await async_client.sync(...)returns(artifacts, results). - Sync description:
client.descis a cached property. - Async description:
await async_client.desc()is an async cached method. - Sync legacy batch APIs:
batch_async(...)andbatch_sync(...)keep the existing best-effort contract. - Async legacy-shaped batch APIs:
await async_client.batch_async(...)andawait async_client.batch_sync(...)keep the same result shapes in async call sites. - Structured batch APIs:
batch_submit(...),batch_collect(...), andbatch_sync_detailed(...)are available on both clients. - Structured batch collection uses a batch-level timeout budget: completed tasks return normal results, unfinished submitted tasks return
TaskTimeoutError, and result order matches input order. - Health APIs:
probe()/ping()exist on both clients, withawaitrequired on the async client. get_queue_info()returns the queue names used for each priority.
Task Controls
The client supports the same task-control fields used by the operator runtime:
timeout: converted into an absolutedeadlinebefore sending the task.- Blocking request/response APIs default to
3600seconds whentimeoutis omitted;probe()keeps its short10second default. cancel_key: propagated to the operator so tasks can be skipped before compute starts.persistent_output: stored undermeta["persistent-output"]for runtimes that preserve uploaded outputs.
Batch APIs
The package now exposes two batch styles:
- Legacy compatibility APIs:
batch_async(...)andbatch_sync(...)keep the previous best-effort semantics. - Structured APIs:
batch_submit(...),batch_collect(...), andbatch_sync_detailed(...)expose explicit per-task submission/execution status instead of collapsing failures into empty results. batch_collect(...)andbatch_sync_detailed(...)treattimeoutas a total batch budget instead of a per-item wait.- When the batch deadline expires, unfinished submitted tasks are surfaced as
TaskTimeoutErrorentries instead of keeping later tasks blocked behind earlier waits.
Async batch methods follow the same contract, but are awaited:
await async_client.batch_submit(...)returnsAsyncSubmittedTaskobjects.await async_client.batch_async(...)returnsAsyncTaskHandle | Noneper task.await async_client.batch_collect(...)returnsBatchTaskResultobjects.await async_client.batch_sync_detailed(...)returnsBatchTaskResultobjects.await async_client.batch_sync(...)returns legacy(artifacts, results)tuples.
Prefer the structured APIs for new integrations.
Async API
AsyncOperatorClient is an additive API layer. It does not remove or replace OperatorClient.
AsyncTaskHandle exposes:
task_id/idawait handle.get(timeout=...)await handle.ready()await handle.state()await handle.forget()
Implementation notes:
- Task submission remains compatible with the current Celery-based operator side.
- When
result_backendis Redis, the async client usesredis.asyncioplus a shared polling hub for async result waiting. - The async client is intended for async application integration and lower blocking overhead on result collection.
- The sync client remains the compatibility/default path for existing callers.
Health Checks
ping() and probe() are intentionally different:
ping()returnsTruewhen task submission succeeds.probe()returns a structuredHealthCheckResultwithsubmit_ok,retrieval_ok,response_ok, and optional error details.
Use probe() when you need a meaningful health signal.
Priority Model
Supported priorities are:
highnormallowvery_low
Queue names follow this pattern:
<priority>.<operator_name>.<operator_version>
Environment Variables
The client reads these settings when explicit connection arguments are not provided:
SANDAI_OPERATOR_CELERY_BROKER_URLSANDAI_OPERATOR_CELERY_RESULT_BACKENDSANDAI_OPERATOR_CELERY_TASK_SERIALIZERSANDAI_OPERATOR_CELERY_RESULT_SERIALIZERSANDAI_OPERATOR_AUTO_FORGET_RESULTSANDAI_OPERATOR_BROKER_POOL_LIMITSANDAI_OPERATOR_BACKEND_POOL_LIMITSANDAI_OPERATOR_BACKEND_POLL_CONCURRENCYSANDAI_OPERATOR_POOL_ACQUIRE_TIMEOUTSANDAI_OPERATOR_SHARED_TRANSPORT_SCOPE
If broker/backend are not configured, the client falls back to local Redis defaults:
redis://localhost:6379/0
By default the client now submits Celery task payloads with msgpack, keeps Celery results on json, and advertises both json and msgpack in accept_content so it can interoperate with operators during rollout.
Connection Sharing
The client now uses process-local shared transport resources for broker access and result-backend access.
This sharing is intentionally conservative:
- Default scope is
operator. - Broker and result backend are shared independently.
- A resource is shared only when the full sharing key matches.
Default Behavior
With the default shared_transport_scope="operator":
- Two clients for the same
operator_name+operator_versionmay share broker/backend resources. - Two clients for different operators do not share by default, even if they point to the same Redis address.
- If you set
shared_transport_scope="process", compatible clients across different operators can share the same resources.
Sharing Rules
The following table describes when two clients share the same process-local resource.
| Parameter / property | Broker sharing affected | Backend sharing affected | Notes |
|---|---|---|---|
shared_transport_scope |
Yes | Yes | Default is operator. process allows cross-operator sharing when other fields also match. |
operator_name |
Yes, when scope is operator |
Yes, when scope is operator |
Different operators do not share by default. |
operator_version |
Yes, when scope is operator |
Yes, when scope is operator |
Version is part of the default scope identity. |
broker_url |
Yes | No | Different broker addresses always use different broker resources. |
result_backend |
No | Yes | Different result-backend addresses always use different backend resources. |
task_serializer |
Yes | No | Broker-side Celery app compatibility depends on task serializer. |
result_serializer |
No | Yes | Backend decode behavior depends on result serializer. |
accept_content |
Yes | Yes | Different accepted serializer sets do not share. |
broker_pool_limit |
Yes | No | Different broker limits create different broker resource groups. |
backend_pool_limit |
No | Yes | Different backend pool limits create different backend resource groups. |
backend_poll_concurrency |
No | Yes | Backend operation concurrency participates in backend sharing. |
Practical examples:
| Scenario | Shared? | Why |
|---|---|---|
| Same operator, same Redis addresses, same pool settings | Yes | Full sharing key matches. |
Same operator, same Redis addresses, different backend_poll_concurrency |
No for backend | Backend concurrency is part of the backend key. |
Same operator, same backend, different broker_pool_limit |
No for broker | Broker pool limit is part of the broker key. |
| Different operators, same Redis addresses, default scope | No | Default scope is operator. |
Different operators, same Redis addresses, shared_transport_scope="process", same settings |
Yes | Cross-operator sharing is enabled by scope. |
Connection Limits And Runtime Behavior
The client has two layers of control:
- Pool size: how many Redis connections the underlying client is allowed to keep.
- Operation concurrency: how many broker/backend operations can actively hit Redis at the same time.
Key Limits
| Setting | Default | Applies to | Effect |
|---|---|---|---|
broker_pool_limit |
24 |
Broker | Broker-side connection budget for shared broker resources. |
backend_pool_limit |
24 |
Result backend | Redis connection-pool budget for shared backend resources. |
backend_poll_concurrency |
12 |
Result backend | Maximum concurrent backend polling/forget operations. Effective backend concurrency is min(backend_pool_limit, backend_poll_concurrency). |
pool_acquire_timeout |
10.0 |
Broker and backend | Maximum time to wait for a bounded broker/backend slot before raising ClientPoolTimeoutError. |
You can pass these directly in code:
from sandai.operator_client import OperatorClient
client = OperatorClient(
"video-clipper",
"v1",
broker_pool_limit=4,
backend_pool_limit=6,
backend_poll_concurrency=3,
pool_acquire_timeout=10.0,
shared_transport_scope="operator",
)
The same settings are available on AsyncOperatorClient.
Current Connection Behavior
Current implementation behavior:
- Broker submission uses short-lived write connections instead of a long-lived producer connection held by each client.
- Result polling uses bounded backend concurrency and releases slots promptly after each poll.
syncandasyncclients both use the same bounded transport model.- Connection resources are keyed separately for broker and backend, so different Redis addresses are isolated naturally.
close()/aclose()release the client's references to shared resources.
Verified Behavior Under Load
Using the real Redis benchmark in operator-client/tests/redis_connection_benchmark.py, the current implementation shows:
- Broker-side operator-client Redis connections return to
0after scenario completion. - Backend-side operator-client Redis connections also return to
0after scenario completion. - Peak backend operator-client connections stay bounded by the configured backend concurrency budget rather than request count.
- Increasing request count from
40to120did not cause unbounded operator-client connection growth.
Representative benchmark outcome after the current refactor:
| Scenario | Requests | Broker operator clients at settle | Backend operator clients at settle | Notes |
|---|---|---|---|---|
sync_shared_client_threads |
120 |
0 |
0 |
Shared sync client across threads. |
sync_thread_local_clients |
120 |
0 |
0 |
One sync client per worker thread. |
async_single_loop |
120 |
0 |
0 |
Async client on one event loop. |
async_multi_thread_loops |
120 |
0 |
0 |
Async clients across multiple threads and loops. |
This means the current implementation is designed to keep connection behavior bounded by configuration, not by request volume.
Pressure Test And Performance Impact
An additional real Redis pressure test was run with a larger load profile:
requests=480sync_threads=64async_concurrency=192async_threads=8async_thread_concurrency=48operator_concurrency=64mock_delay_ms=300request_timeout=3600
Two configurations were compared:
- Current default settings:
broker_pool_limit=24,backend_pool_limit=24,backend_poll_concurrency=12 - Relaxed settings:
broker_pool_limit=32,backend_pool_limit=32,backend_poll_concurrency=32
Observed wall-clock times:
| Scenario | Current default settings | Relaxed settings | Observation |
|---|---|---|---|
sync_shared_client_threads |
13.22s |
N/A | 480/480 succeeded with bounded backend usage. |
sync_thread_local_clients |
14.11s |
N/A | 480/480 succeeded; per-thread clients still reused the same operator-scoped resources. |
async_single_loop |
6.77s |
N/A | 480/480 succeeded with the same backend ceiling. |
async_multi_thread_loops |
5.07s |
N/A | 480/480 succeeded across 8 event-loop threads. |
Observed backend operator-client connection peaks:
| Scenario | Current default settings | Relaxed settings |
|---|---|---|
sync_shared_client_threads |
4 |
N/A |
sync_thread_local_clients |
4 |
N/A |
async_single_loop |
4 |
N/A |
async_multi_thread_loops |
4 |
N/A |
Interpretation:
- The current default configuration kept backend operator-client connections capped at
4even at480requests and much higher client-side concurrency. - Broker-side operator-client connections stayed near baseline and settled back after each scenario.
- Increasing request volume from
240to480did not cause unbounded operator-client connection growth. - Throughput remained acceptable without raising the configured pool limits, which is the intended tradeoff for worker-side thread-heavy deployments.
Current recommendation:
- Keep the current defaults unless you have a measured throughput bottleneck.
- If you need to tune for higher throughput, adjust
backend_poll_concurrencyfirst and re-measure before increasing all limits together. - Treat larger pool limits as a deliberate tradeoff: more concurrency headroom in exchange for higher Redis connection usage.
Error Model
sync() and desc normalize failures into typed exceptions where possible:
TaskTimeoutErrorTaskExecutionErrorTaskDeadlineExceededErrorTaskCancelledErrorInvalidTaskStatusErrorInvalidResultFormatErrorInvalidPriorityError
Structured batch APIs surface task errors directly on each result object instead of converting them into empty legacy tuples.
Testing
Run the client regression suite from the repository root:
python operator-client/tests/run_all_tests.py
The suite is mock-based and does not require a live Celery or Redis deployment.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sandai_operator_client-0.4.8.tar.gz.
File metadata
- Download URL: sandai_operator_client-0.4.8.tar.gz
- Upload date:
- Size: 34.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0df8ec9bb336bed8afb650d03f9ae55809881991e77539da9d79e3b78dd6dcf7
|
|
| MD5 |
28065e74428ed07701344f525e098dc5
|
|
| BLAKE2b-256 |
ce6926aa46d04c3da7223447cbb4acfcc326ab5103a3e1f55131aaa23b1a2adc
|
File details
Details for the file sandai_operator_client-0.4.8-py3-none-any.whl.
File metadata
- Download URL: sandai_operator_client-0.4.8-py3-none-any.whl
- Upload date:
- Size: 23.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
722be141c2d2018b23e93abd77edb2b2d999324379340be65ae655d0981a5d87
|
|
| MD5 |
66b537488c1948150f7c54d02a43e1c8
|
|
| BLAKE2b-256 |
ed34b818dd2fa99271f5e206932a7cf72aff19c60561737b1cba52b5fdd2ea09
|