The official Python library for the Scale GP API
Project description
Scale GP Python API library
The Scale GP Python library provides convenient access to the Scale GP REST API from any Python 3.8+ application. The library includes type definitions for all request params and response fields, and offers both synchronous and asynchronous clients powered by httpx.
It is generated with Stainless.
Documentation
The REST API documentation can be found on docs.gp.scale.com. The full API of this library can be found in api.md.
Installation
# install from PyPI
pip install --pre scale-gp-beta
Usage
The full API of this library can be found in api.md.
import os
from scale_gp_beta import SGPClient
client = SGPClient(
account_id="My Account ID",
api_key=os.environ.get("SGP_API_KEY"), # This is the default and can be omitted
# defaults to "production".
environment="development",
)
completion = client.chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
top_k=2,
)
While you can provide an api_key keyword argument,
we recommend using python-dotenv
to add SGP_API_KEY="My API Key" to your .env file
so that your API Key is not stored in source control.
Tracing & Spans
The SGP Tracing library provides a convenient way to instrument your Python applications with tracing capabilities, allowing you to generate, manage, and send spans to the Scale GP platform. This enables detailed monitoring and debugging of your workflows.
Quick Start Examples
For runnable examples, see the examples/tracing directory in the repository.
Using the SDK
Initialization
Before you can create any traces or spans, you should initialize the tracing SDK with your SGPClient. It's best practice to do this once at your application's entry point. You can omit this step if you have set the SGP_API_KEY and SGP_ACCOUNT_ID environment variables, as the SDK will attempt to create a default client.
import scale_gp_beta.lib.tracing as tracing
from scale_gp_beta import SGPClient
client = SGPClient(api_key="YOUR_API_KEY", account_id="YOUR_ACCOUNT_ID")
tracing.init(client=client)
Tracing uses the SGPClient for all requests. You can edit the base_url or any other parameters via the client you pass to init().
Disabling Tracing
You can disable tracing by setting the environment variable DISABLE_SCALE_TRACING or programmatically via the disabled parameter in init().
Core Concepts
The SDK revolves around two primary concepts: Traces and Spans.
- Trace: A trace represents a complete workflow or transaction, such as a web request or an AI agent's operation. It's a collection of related spans. Every trace has a single root span.
- Span: A span represents a single unit of work within a trace, like a function call, a database query, or an external API request. Spans can be nested to show hierarchical relationships.
When starting a trace, we will also create a root span. Server-side, we do not record the trace resource, only spans, but rely on the root span for trace data.
Creating Traces and Spans
The SDK offers flexible ways to create traces and spans: using context managers for automatic start/end handling, or explicit control for manual lifecycle management.
1. Using Context Managers (Recommended)
The most straightforward way to create traces and spans is by using them as context managers (with statements). This ensures that spans are automatically started and ended, and errors are captured.
Creating a Trace with a Root Span:
Use tracing.create_trace() as a context manager to define a new trace. This automatically creates a root span for your trace.
import scale_gp_beta.lib.tracing as tracing
def my_workflow():
with tracing.create_trace(name="my_application_workflow", metadata={"env": "production"}):
# All spans created within this block will belong to "my_application_workflow" trace
print("Starting my application workflow...")
# ... your workflow logic
print("Application workflow completed.")
Creating Spans within a Trace:
Inside a create_trace block, use tracing.create_span() as a context manager. These spans will automatically be associated with the current trace and parent span (if one exists).
import time
import scale_gp_beta.lib.tracing as tracing
def fibonacci(curr: int) -> int:
with tracing.create_span("fibonacci_calculation", input={"curr": curr}) as span:
time.sleep(0.1) # Simulate some work
if curr < 2:
span.output = {"res": curr}
return curr
res = fibonacci(curr - 1) + fibonacci(curr - 2)
span.output = {"res": res}
return res
def main_traced_example():
with tracing.create_trace("my_fibonacci_trace"): # Creates a root span
# This span will be a child of the "my_fibonacci_trace" root span
with tracing.create_span("main_execution", metadata={"version": "1.0"}) as main_span:
fib_result = fibonacci(5)
main_span.output = {"final_fib_result": fib_result}
print(f"Fibonacci(5) = {fib_result}")
2. Explicit Control
For scenarios where context managers aren't suitable, you can manually start and end spans. This approach requires more diligence to ensure all spans are properly ended and maintaining consistency.
Manually Managing Spans (without an explicit Trace context):
You can create spans and explicitly provide their trace_id and parent_id for fine-grained control. This is useful when integrating with existing systems that manage trace IDs.
import uuid
import time
import random
from typing import Any, Dict
import scale_gp_beta.lib.tracing as tracing
class MockDatabase:
def __init__(self) -> None:
self._data = {
"SELECT * FROM users WHERE id = 1;": {"id": 1, "name": "Alice"},
"SELECT * FROM users WHERE id = 2;": {"id": 2, "name": "Bob"},
}
def execute_query(self, query: str, trace_id: str) -> Dict[str, Any]:
db_span = tracing.create_span("db_query", input={"query": query}, trace_id=trace_id)
db_span.start()
try:
time.sleep(random.uniform(0.1, 0.3)) # Simulate delay
result = self._data.get(query, {})
db_span.output = {"result": result}
return result
finally:
db_span.end()
def get_user_from_db_explicit(db: MockDatabase, user_id: int, trace_id: str) -> Dict[str, Any]:
with tracing.create_span("get_user_from_db", input={"user_id": user_id}, trace_id=trace_id):
query = f"SELECT * FROM users WHERE id = {user_id};"
return db.execute_query(query, trace_id)
def main_explicit_control_example():
db = MockDatabase()
my_trace_id = str(uuid.uuid4())
# Manually create a root span
main_span = tracing.create_span("main_explicit_call", metadata={"env": "local"}, trace_id=my_trace_id)
main_span.start()
try:
user = get_user_from_db_explicit(db, 1, my_trace_id)
print(f"Retrieved user: {user.get('name')}")
finally:
main_span.end()
Exporting Existing Tracing Data (Manual Timestamps)
You can even pre-define start_time, end_time, span_id, parent_id, and trace_id if you need to report historical data or reconstruct traces.
import uuid
from datetime import datetime, timezone, timedelta
import scale_gp_beta.lib.tracing as tracing
parent_span_id = str(uuid.uuid4())
trace_id = str(uuid.uuid4())
child_span_id = str(uuid.uuid4())
now = datetime.now(timezone.utc)
# Parent Span
parent_span = tracing.create_span(
"my_parent_span_name",
input={"test": "input"},
output={"test": "output"},
metadata={"test": "metadata"},
span_id=parent_span_id,
trace_id=trace_id,
)
parent_span.start_time = (now - timedelta(minutes=10)).isoformat()
parent_span.end_time = now.isoformat()
parent_span.flush(blocking=True)
# Child Span
child_span = tracing.create_span(
"my_child_span_name",
input={"test": "another input"},
output={"test": "another output"},
metadata={"test": "another metadata"},
span_id=child_span_id,
trace_id=trace_id,
parent_id=parent_span_id,
)
child_span.start_time = (now - timedelta(minutes=6)).isoformat()
child_span.end_time = (now - timedelta(minutes=2)).isoformat()
child_span.flush()
Note that span.flush() will by default block the main thread until the request has finished. Use blocking=False to enqueue the request which will be picked up by the background worker.
Helper Methods
You can retrieve the currently active span or trace in the execution context using current_span() and current_trace():
import scale_gp_beta.lib.tracing as tracing
def nested_function():
with tracing.create_span("nested_operation"):
current = tracing.current_span()
if current:
print(f"Currently active span: {current.name} (ID: {current.span_id})")
current_t = tracing.current_trace()
if current_t:
print(f"Currently active trace: (ID: {current_t.trace_id})")
Flushing Tracing Data
Spans are generally batched and sent asynchronously by a background worker for efficiency. However, you might need to ensure all buffered spans are sent before an application exits or at critical points in your workflow (e.g., in a distributed worker setting).
You can force a synchronous flush of all queued spans using flush_queue() or on individual spans and traces (via their root span) with span.flush() & trace.root_span.flush().
import scale_gp_beta.lib.tracing as tracing
# ... (create some spans) ...
# Ensure all spans are sent before continuing
tracing.flush_queue()
print("All pending spans have been flushed.")
You do not need to manually flush all spans on program exit; when shutting down the background worker, we will attempt to flush all tracing data before exiting.
Configuration Options
| ENV Variable | Description |
|---|---|
DISABLE_SCALE_TRACING |
If set, no tracing data will be exported. You can still observe tracing data programmatically via the No-Op variant of Trace and Span objects. |
SGP_API_KEY |
SGP API Key. Used by SGPClient. |
SGP_ACCOUNT_ID |
SGP Account ID. Used by SGPClient. |
Multi-Process / Multi-Worker Tracing
WARNING: Developers should be careful when attempting tracing over multiple workers / Python processes. The SGP backend will expect well-formed trace data, and there is a strong chance of race conditions if a child span is reported before a parent span.
The easiest approach to working over multiple workers and Python processes is to only create one trace per worker. You can group traces with a group_id.
If you want to track an entire workflow over multiple workers, ensure you call tracing.flush_queue() before you enqueue a job which creates child spans of the current trace.
You will need to use the explicit controls to forward trace and parent span IDs to your workers. The automatic context detection works within the context of the original Python process only.
Async usage
Simply import AsyncSGPClient instead of SGPClient and use await with each API call:
import os
import asyncio
from scale_gp_beta import AsyncSGPClient
client = AsyncSGPClient(
account_id="My Account ID",
api_key=os.environ.get("SGP_API_KEY"), # This is the default and can be omitted
# defaults to "production".
environment="development",
)
async def main() -> None:
completion = await client.chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
top_k=2,
)
asyncio.run(main())
Functionality between the synchronous and asynchronous clients is otherwise identical.
With aiohttp
By default, the async client uses httpx for HTTP requests. However, for improved concurrency performance you may also use aiohttp as the HTTP backend.
You can enable this by installing aiohttp:
# install from PyPI
pip install --pre scale-gp-beta[aiohttp]
Then you can enable it by instantiating the client with http_client=DefaultAioHttpClient():
import os
import asyncio
from scale_gp_beta import DefaultAioHttpClient
from scale_gp_beta import AsyncSGPClient
async def main() -> None:
async with AsyncSGPClient(
account_id="My Account ID",
api_key=os.environ.get("SGP_API_KEY"), # This is the default and can be omitted
http_client=DefaultAioHttpClient(),
) as client:
completion = await client.chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
top_k=2,
)
asyncio.run(main())
Streaming responses
We provide support for streaming responses using Server Side Events (SSE).
from scale_gp_beta import SGPClient
client = SGPClient(
account_id="My Account ID",
)
stream = client.chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
stream=True,
)
for completion in stream:
print(completion)
The async client uses the exact same interface.
from scale_gp_beta import AsyncSGPClient
client = AsyncSGPClient(
account_id="My Account ID",
)
stream = await client.chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
stream=True,
)
async for completion in stream:
print(completion)
Using types
Nested request parameters are TypedDicts. Responses are Pydantic models which also provide helper methods for things like:
- Serializing back into JSON,
model.to_json() - Converting to a dictionary,
model.to_dict()
Typed requests and responses provide autocomplete and documentation within your editor. If you would like to see type errors in VS Code to help catch bugs earlier, set python.analysis.typeCheckingMode to basic.
Pagination
List methods in the Scale GP API are paginated.
This library provides auto-paginating iterators with each list response, so you do not have to request successive pages manually:
from scale_gp_beta import SGPClient
client = SGPClient(
account_id="My Account ID",
)
all_models = []
# Automatically fetches more pages as needed.
for model in client.models.list(
limit=10,
):
# Do something with model here
all_models.append(model)
print(all_models)
Or, asynchronously:
import asyncio
from scale_gp_beta import AsyncSGPClient
client = AsyncSGPClient(
account_id="My Account ID",
)
async def main() -> None:
all_models = []
# Iterate through items across all pages, issuing requests as needed.
async for model in client.models.list(
limit=10,
):
all_models.append(model)
print(all_models)
asyncio.run(main())
Alternatively, you can use the .has_next_page(), .next_page_info(), or .get_next_page() methods for more granular control working with pages:
first_page = await client.models.list(
limit=10,
)
if first_page.has_next_page():
print(f"will fetch next page using these details: {first_page.next_page_info()}")
next_page = await first_page.get_next_page()
print(f"number of items we just fetched: {len(next_page.items)}")
# Remove `await` for non-async usage.
Or just work directly with the returned data:
first_page = await client.models.list(
limit=10,
)
print(f"next page cursor: {first_page.starting_after}") # => "next page cursor: ..."
for model in first_page.items:
print(model.id)
# Remove `await` for non-async usage.
Nested params
Nested parameters are dictionaries, typed using TypedDict, for example:
from scale_gp_beta import SGPClient
client = SGPClient(
account_id="My Account ID",
)
inference = client.inference.create(
model="model",
inference_configuration={},
)
print(inference.inference_configuration)
File uploads
Request parameters that correspond to file uploads can be passed as bytes, or a PathLike instance or a tuple of (filename, contents, media type).
from pathlib import Path
from scale_gp_beta import SGPClient
client = SGPClient(
account_id="My Account ID",
)
client.files.create(
file=Path("/path/to/file"),
)
The async client uses the exact same interface. If you pass a PathLike instance, the file contents will be read asynchronously automatically.
Handling errors
When the library is unable to connect to the API (for example, due to network connection problems or a timeout), a subclass of scale_gp_beta.APIConnectionError is raised.
When the API returns a non-success status code (that is, 4xx or 5xx
response), a subclass of scale_gp_beta.APIStatusError is raised, containing status_code and response properties.
All errors inherit from scale_gp_beta.APIError.
import scale_gp_beta
from scale_gp_beta import SGPClient
client = SGPClient(
account_id="My Account ID",
)
try:
client.chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
)
except scale_gp_beta.APIConnectionError as e:
print("The server could not be reached")
print(e.__cause__) # an underlying Exception, likely raised within httpx.
except scale_gp_beta.RateLimitError as e:
print("A 429 status code was received; we should back off a bit.")
except scale_gp_beta.APIStatusError as e:
print("Another non-200-range status code was received")
print(e.status_code)
print(e.response)
Error codes are as follows:
| Status Code | Error Type |
|---|---|
| 400 | BadRequestError |
| 401 | AuthenticationError |
| 403 | PermissionDeniedError |
| 404 | NotFoundError |
| 422 | UnprocessableEntityError |
| 429 | RateLimitError |
| >=500 | InternalServerError |
| N/A | APIConnectionError |
Retries
Certain errors are automatically retried 2 times by default, with a short exponential backoff. Connection errors (for example, due to a network connectivity problem), 408 Request Timeout, 409 Conflict, 429 Rate Limit, and >=500 Internal errors are all retried by default.
You can use the max_retries option to configure or disable retry settings:
from scale_gp_beta import SGPClient
# Configure the default for all requests:
client = SGPClient(
account_id="My Account ID",
# default is 2
max_retries=0,
)
# Or, configure per-request:
client.with_options(max_retries=5).chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
)
Timeouts
By default requests time out after 1 minute. You can configure this with a timeout option,
which accepts a float or an httpx.Timeout object:
from scale_gp_beta import SGPClient
# Configure the default for all requests:
client = SGPClient(
account_id="My Account ID",
# 20 seconds (default is 1 minute)
timeout=20.0,
)
# More granular control:
client = SGPClient(
account_id="My Account ID",
timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)
# Override per-request:
client.with_options(timeout=5.0).chat.completions.create(
messages=[{"foo": "bar"}],
model="model",
)
On timeout, an APITimeoutError is thrown.
Note that requests that time out are retried twice by default.
Advanced
Logging
We use the standard library logging module.
You can enable logging by setting the environment variable SGP_CLIENT_LOG to info.
$ export SGP_CLIENT_LOG=info
Or to debug for more verbose logging.
How to tell whether None means null or missing
In an API response, a field may be explicitly null, or missing entirely; in either case, its value is None in this library. You can differentiate the two cases with .model_fields_set:
if response.my_field is None:
if 'my_field' not in response.model_fields_set:
print('Got json like {}, without a "my_field" key present at all.')
else:
print('Got json like {"my_field": null}.')
Accessing raw response data (e.g. headers)
The "raw" Response object can be accessed by prefixing .with_raw_response. to any HTTP method call, e.g.,
from scale_gp_beta import SGPClient
client = SGPClient(
account_id="My Account ID",
)
response = client.chat.completions.with_raw_response.create(
messages=[{
"foo": "bar"
}],
model="model",
)
print(response.headers.get('X-My-Header'))
completion = response.parse() # get the object that `chat.completions.create()` would have returned
print(completion)
These methods return an APIResponse object.
The async client returns an AsyncAPIResponse with the same structure, the only difference being awaitable methods for reading the response content.
.with_streaming_response
The above interface eagerly reads the full response body when you make the request, which may not always be what you want.
To stream the response body, use .with_streaming_response instead, which requires a context manager and only reads the response body once you call .read(), .text(), .json(), .iter_bytes(), .iter_text(), .iter_lines() or .parse(). In the async client, these are async methods.
with client.chat.completions.with_streaming_response.create(
messages=[{"foo": "bar"}],
model="model",
) as response:
print(response.headers.get("X-My-Header"))
for line in response.iter_lines():
print(line)
The context manager is required so that the response will reliably be closed.
Making custom/undocumented requests
This library is typed for convenient access to the documented API.
If you need to access undocumented endpoints, params, or response properties, the library can still be used.
Undocumented endpoints
To make requests to undocumented endpoints, you can make requests using client.get, client.post, and other
http verbs. Options on the client will be respected (such as retries) when making this request.
import httpx
response = client.post(
"/foo",
cast_to=httpx.Response,
body={"my_param": True},
)
print(response.headers.get("x-foo"))
Undocumented request params
If you want to explicitly send an extra param, you can do so with the extra_query, extra_body, and extra_headers request
options.
Undocumented response properties
To access undocumented response properties, you can access the extra fields like response.unknown_prop. You
can also get all the extra fields on the Pydantic model as a dict with
response.model_extra.
Configuring the HTTP client
You can directly override the httpx client to customize it for your use case, including:
- Support for proxies
- Custom transports
- Additional advanced functionality
import httpx
from scale_gp_beta import SGPClient, DefaultHttpxClient
client = SGPClient(
account_id="My Account ID",
# Or use the `SGP_CLIENT_BASE_URL` env var
base_url="http://my.test.server.example.com:8083",
http_client=DefaultHttpxClient(
proxy="http://my.test.proxy.example.com",
transport=httpx.HTTPTransport(local_address="0.0.0.0"),
),
)
You can also customize the client on a per-request basis by using with_options():
client.with_options(http_client=DefaultHttpxClient(...))
Managing HTTP resources
By default the library closes underlying HTTP connections whenever the client is garbage collected. You can manually close the client using the .close() method if desired, or with a context manager that closes when exiting.
from scale_gp_beta import SGPClient
with SGPClient(
account_id="My Account ID",
) as client:
# make requests here
...
# HTTP client is now closed
Versioning
This package generally follows SemVer conventions, though certain backwards-incompatible changes may be released as minor versions:
- Changes that only affect static types, without breaking runtime behavior.
- Changes to library internals which are technically public but not intended or documented for external use. (Please open a GitHub issue to let us know if you are relying on such internals.)
- Changes that we do not expect to impact the vast majority of users in practice.
We take backwards-compatibility seriously and work hard to ensure you can rely on a smooth upgrade experience.
We are keen for your feedback; please open an issue with questions, bugs, or suggestions.
Determining the installed version
If you've upgraded to the latest version but aren't seeing any new features you were expecting then your python environment is likely still using an older version.
You can determine the version that is being used at runtime with:
import scale_gp_beta
print(scale_gp_beta.__version__)
Requirements
Python 3.8 or higher.
Contributing
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file scale_gp_beta-0.1.0a26.tar.gz.
File metadata
- Download URL: scale_gp_beta-0.1.0a26.tar.gz
- Upload date:
- Size: 173.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b67f71f449bd4cbd3cb15f3137c5b7ba4a5bc42f9336406211d94d6a90f6416a
|
|
| MD5 |
b8cf0b1df8185d9f1193f1def1b54112
|
|
| BLAKE2b-256 |
c8f486314b715f77678d191a9f689bdffe3f56f789aeca8fa7c75a904b069d56
|
File details
Details for the file scale_gp_beta-0.1.0a26-py3-none-any.whl.
File metadata
- Download URL: scale_gp_beta-0.1.0a26-py3-none-any.whl
- Upload date:
- Size: 168.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
88accf9006004603f79677098d14137df56b3928ff02f62710d6e4044513eda6
|
|
| MD5 |
8b91f2a30b00f789cb9a7542cfdd47c4
|
|
| BLAKE2b-256 |
9ede6187a9b034099bd685ace02ccb934ed6b09d4748c24bb211a461a305a953
|