Databricks Zerobus Ingest SDK for Python
Reason this release was yanked:
This version contains breaking changes for users
Project description
Databricks Zerobus Ingest SDK for Python
Public Preview: This SDK is supported for production use cases and is available to all customers. Databricks is actively working on stabilizing the Zerobus Ingest SDK for Python. Minor version updates may include backwards-incompatible changes.
We are keen to hear feedback from you on this SDK. Please file issues, and we will address them.
The Databricks Zerobus Ingest SDK for Python provides a high-performance, Rust-backed client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol. Built on top of the battle-tested Rust SDK using PyO3 bindings, it delivers native performance with a Python-friendly API. | See also the SDK for Java
Table of Contents
- Disclaimer
- Features
- Requirements
- Quick Start User Guide
- Usage Examples
- Authentication
- Configuration
- Error Handling
- API Reference
- Best Practices
- Handling Stream Failures
- Performance Tips
- Debugging
Features
- Rust-backed performance: Native Rust implementation with Python bindings for maximum throughput and minimal latency
- High-throughput ingestion: Optimized for high-volume data ingestion with native async/await support
- Automatic recovery: Built-in retry and recovery mechanisms from the Rust SDK
- Flexible configuration: Customizable stream behavior and timeouts
- Multiple serialization formats: Support for JSON and Protocol Buffers
- OAuth 2.0 authentication: Secure authentication with client credentials
- Type safety: Rust's type system ensures reliability and correctness
- Sync and Async support: Both synchronous and asynchronous Python APIs
- Zero-copy operations: Efficient data handling with minimal overhead
Architecture
The Python SDK is a thin wrapper around the Databricks Zerobus Rust SDK, built using PyO3 bindings:
┌─────────────────────────────────────────┐
│ Python Application Code │
│ (Your code using the Python SDK API) │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Python SDK (Thin Wrapper) │
│ • API compatibility layer │
│ • Python types & error handling │
└─────────────────────────────────────────┘
│
▼ (PyO3 bindings)
┌─────────────────────────────────────────┐
│ Rust Core Implementation │
│ • gRPC communication │
│ • OAuth 2.0 authentication │
│ • Stream management & recovery │
│ • Protocol encoding/decoding │
└─────────────────────────────────────────┘
This architecture provides:
- Native performance through Rust's zero-cost abstractions
- Memory safety without garbage collection overhead
- Single source of truth for all SDK implementations
- Python-friendly API with full type hints and IDE support
Requirements
Runtime Requirements
- Python: 3.9 or higher
- Databricks workspace with Zerobus access enabled
Dependencies
protobuf>= 4.25.0, < 7.0 (for Protocol Buffer schema handling)requests>= 2.28.1, < 3 (only for thegenerate_protoutility tool)
Note: All core ingestion functionality (gRPC, OAuth authentication, stream management) is handled by the native Rust implementation. The requests dependency is only used by the optional generate_proto.py tool for fetching table schemas from Unity Catalog.
Quick Start User Guide
Prerequisites
Before using the SDK, you'll need the following:
1. Workspace URL and Workspace ID
After logging into your Databricks workspace, look at the browser URL:
https://<databricks-instance>.cloud.databricks.com/o=<workspace-id>
- Workspace URL: The part before
/o=→https://<databricks-instance>.cloud.databricks.com - Workspace ID: The part after
/o=→<workspace-id>
Note: The examples above show AWS endpoints (
.cloud.databricks.com). For Azure deployments, the workspace URL will behttps://<databricks-instance>.azuredatabricks.net.
Example:
- Full URL:
https://dbc-a1b2c3d4-e5f6.cloud.databricks.com/o=1234567890123456 - Workspace URL:
https://dbc-a1b2c3d4-e5f6.cloud.databricks.com - Workspace ID:
1234567890123456
2. Create a Delta Table
Create a table using Databricks SQL:
CREATE TABLE <catalog_name>.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;
Replace <catalog_name> with your catalog name (e.g., main).
3. Create a Service Principal
- Navigate to Settings > Identity and Access in your Databricks workspace
- Click Service principals and create a new service principal
- Generate a new secret for the service principal and save it securely
- Grant the following permissions:
USE_CATALOGon the catalog (e.g.,main)USE_SCHEMAon the schema (e.g.,default)MODIFYandSELECTon the table (e.g.,air_quality)
Grant permissions using SQL:
-- Grant catalog permission
GRANT USE CATALOG ON CATALOG <catalog_name> TO `<service-principal-application-id>`;
-- Grant schema permission
GRANT USE SCHEMA ON SCHEMA <catalog_name>.default TO `<service-principal-application-id>`;
-- Grant table permissions
GRANT SELECT, MODIFY ON TABLE <catalog_name>.default.air_quality TO `<service-principal-application-id>`;
Installation
From PyPI (Recommended)
Install the latest stable version using pip:
pip install databricks-zerobus-ingest-sdk
Pre-built wheels are available for:
- Linux: x86_64, aarch64 (manylinux)
- macOS: x86_64, arm64 (universal2)
- Windows: x86_64
From Source
Building from source requires the Rust toolchain (install from rustup.rs).
# Install Rust (if not already installed)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Clone and install
git clone https://github.com/databricks/zerobus-sdk-py.git
cd zerobus-sdk-py
pip install -e .
The SDK uses maturin to build Python bindings for the Rust implementation. Installation via pip install -e . automatically:
- Installs maturin if needed
- Compiles the Rust extension
- Installs the package in editable mode
For active development, see CONTRIBUTING.md for detailed build instructions and development workflows.
Choose Your Serialization Format
The SDK supports two serialization formats:
- JSON - Simple, no schema compilation needed. Good for getting started.
- Protocol Buffers (Default to maintain backwards compatibility) - Strongly-typed schemas. More efficient over the wire.
Option 1: Using JSON
Write Your Client Code (JSON)
Synchronous Example:
import json
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Configuration
# For AWS:
server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
# For Azure:
# server_endpoint = "https://1234567890123456.zerobus.us-west-2.azuredatabricks.net"
# workspace_url = "https://dbc-a1b2c3d4-e5f6.azuredatabricks.net"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties
table_properties = TableProperties(table_name)
# Configure stream with JSON record type
options = StreamConfigurationOptions(record_type=RecordType.JSON)
# Create stream
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
try:
# Ingest records
for i in range(100):
# Option 1: Pass a dict (SDK serializes to JSON)
record_dict = {
"device_name": f"sensor-{i % 10}",
"temp": 20 + (i % 15),
"humidity": 50 + (i % 40)
}
ack = stream.ingest_record(record_dict)
# Option 2: Pass a pre-serialized JSON string (client controls serialization)
# json_string = json.dumps(record_dict)
# ack = stream.ingest_record(json_string)
# Optional: Wait for durability confirmation
ack.wait_for_ack()
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
stream.close()
Asynchronous Example:
import asyncio
import json
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
async def main():
# Configuration
# For AWS:
server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
# For Azure:
# server_endpoint = "1234567890123456.zerobus.us-west-2.azuredatabricks.net"
# workspace_url = "https://dbc-a1b2c3d4-e5f6.azuredatabricks.net"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties
table_properties = TableProperties(table_name)
# Configure stream with JSON record type
options = StreamConfigurationOptions(record_type=RecordType.JSON)
# Create stream
stream = await sdk.create_stream(client_id, client_secret, table_properties, options)
try:
# Ingest records
for i in range(100):
# Option 1: Pass a dict (SDK serializes to JSON)
record_dict = {
"device_name": f"sensor-{i % 10}",
"temp": 20 + (i % 15),
"humidity": 50 + (i % 40)
}
future = await stream.ingest_record(record_dict)
# Option 2: Pass a pre-serialized JSON string (client controls serialization)
# json_string = json.dumps(record_dict)
# future = await stream.ingest_record(json_string)
# Optional: Wait for durability confirmation
await future
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
await stream.close()
asyncio.run(main())
Option 2: Using Protocol Buffers
You'll need to define and compile a protobuf schema.
Define Your Protocol Buffer Schema
Create a file named record.proto:
syntax = "proto2";
message AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}
Compile the protobuf:
pip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto
This generates a record_pb2.py file compatible with protobuf 6.x.
Generate Protocol Buffer Schema from Unity Catalog (Alternative)
Instead of manually writing your protobuf schema, you can automatically generate it from an existing Unity Catalog table using the included generate_proto.py tool.
Basic Usage:
python -m zerobus.tools.generate_proto \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "your-service-principal-application-id" \
--client-secret "your-service-principal-secret" \
--table "main.default.air_quality" \
--output "record.proto" \
--proto-msg "AirQuality"
Parameters:
--uc-endpoint: Your workspace URL (required)--client-id: Service principal application ID (required)--client-secret: Service principal secret (required)--table: Fully qualified table name in format catalog.schema.table (required)--output: Output path for the generated proto file (required)--proto-msg: Name of the protobuf message (optional, defaults to table name)
After generating, compile it as shown above.
Type Mappings:
| Delta Type | Proto2 Type |
|---|---|
| TINYINT, BYTE, INT, SMALLINT, SHORT | int32 |
| BIGINT, LONG | int64 |
| FLOAT | float |
| DOUBLE | double |
| STRING, VARCHAR | string |
| BOOLEAN | bool |
| BINARY | bytes |
| DATE | int32 |
| TIMESTAMP | int64 |
| TIMESTAMP_NTZ | int64 |
| ARRAY<type> | repeated type |
| MAP<key, value> | map<key, value> |
| STRUCT<fields> | nested message |
| VARIANT | string (unshredded, JSON string) |
Write Your Client Code (Protocol Buffers)
Synchronous Example:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Configuration
# For AWS:
server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
# For Azure:
# server_endpoint = "https://1234567890123456.zerobus.us-west-2.azuredatabricks.net"
# workspace_url = "https://dbc-a1b2c3d4-e5f6.azuredatabricks.net"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties with protobuf descriptor
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
# Create stream
stream = sdk.create_stream(client_id, client_secret, table_properties)
try:
# Ingest records
for i in range(100):
# Option 1: Pass a Message object (SDK serializes to bytes)
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
ack = stream.ingest_record(record)
# Option 2: Pass pre-serialized bytes (client controls serialization)
# serialized_bytes = record.SerializeToString()
# ack = stream.ingest_record(serialized_bytes)
# Optional: Wait for durability confirmation
ack.wait_for_ack()
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
stream.close()
Asynchronous Example:
import asyncio
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
async def main():
# Configuration
# For AWS:
server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
# For Azure:
# server_endpoint = "https://1234567890123456.zerobus.us-west-2.azuredatabricks.net"
# workspace_url = "https://dbc-a1b2c3d4-e5f6.azuredatabricks.net"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties with protobuf descriptor
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
# Create stream
stream = await sdk.create_stream(client_id, client_secret, table_properties)
try:
# Ingest records
for i in range(100):
# Option 1: Pass a Message object (SDK serializes to bytes)
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
future = await stream.ingest_record(record)
# Option 2: Pass pre-serialized bytes (client controls serialization)
# serialized_bytes = record.SerializeToString()
# future = await stream.ingest_record(serialized_bytes)
# Optional: Wait for durability confirmation
await future
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
await stream.close()
asyncio.run(main())
Usage Examples
See the examples/ directory for complete, runnable examples in both JSON and protobuf formats (sync and async variants). See examples/README.md for detailed instructions.
JSON Examples
Blocking Ingestion (JSON)
import json
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
logging.basicConfig(level=logging.INFO)
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
try:
for i in range(1000):
# Pass a dict (SDK serializes) or a pre-serialized JSON string
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
ack = stream.ingest_record(record_dict)
# Optional: Wait for durability confirmation
ack.wait_for_ack()
finally:
stream.close()
Non-Blocking Ingestion (JSON)
import asyncio
import json
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties, AckCallback
logging.basicConfig(level=logging.INFO)
async def main():
# Create a custom callback class
class MyCallback(AckCallback):
def on_ack(self, offset: int):
print(f"Acknowledged offset: {offset}")
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
max_inflight_records=50000,
ack_callback=MyCallback()
)
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
stream = await sdk.create_stream(client_id, client_secret, table_properties, options)
futures = []
try:
for i in range(100000):
# Pass a dict (SDK serializes) or a pre-serialized JSON string
record_dict = {
"device_name": f"sensor-{i % 10}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
future = await stream.ingest_record(record_dict)
futures.append(future)
await stream.flush()
await asyncio.gather(*futures)
finally:
await stream.close()
asyncio.run(main())
Protocol Buffer Examples
Blocking Ingestion (Protobuf)
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
logging.basicConfig(level=logging.INFO)
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
stream = sdk.create_stream(client_id, client_secret, table_properties)
try:
for i in range(1000):
# Pass a Message object (SDK serializes) or pre-serialized bytes
record = record_pb2.AirQuality(
device_name=f"sensor-{i}",
temp=20 + i % 15,
humidity=50 + i % 40
)
ack = stream.ingest_record(record)
# Optional: Wait for durability confirmation
ack.wait_for_ack()
finally:
stream.close()
Non-Blocking Ingestion (Protobuf)
import asyncio
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import TableProperties, StreamConfigurationOptions, AckCallback
import record_pb2
logging.basicConfig(level=logging.INFO)
async def main():
# Create a custom callback class
class MyCallback(AckCallback):
def on_ack(self, offset: int):
print(f"Acknowledged offset: {offset}")
options = StreamConfigurationOptions(
max_inflight_records=50000,
ack_callback=MyCallback()
)
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
stream = await sdk.create_stream(client_id, client_secret, table_properties, options)
futures = []
try:
for i in range(100000):
# Pass a Message object (SDK serializes) or pre-serialized bytes
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + i % 15,
humidity=50 + i % 40
)
future = await stream.ingest_record(record)
futures.append(future)
await stream.flush()
await asyncio.gather(*futures)
finally:
await stream.close()
asyncio.run(main())
Authentication
The SDK uses OAuth 2.0 Client Credentials for authentication:
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
# Create stream with OAuth authentication
stream = sdk.create_stream(client_id, client_secret, table_properties)
The SDK automatically handles OAuth 2.0 authentication and uses secure TLS connections by default.
For advanced use cases requiring custom authentication headers, see the HeadersProvider section in the API Reference below.
Configuration
Stream Configuration Options
Configure stream behavior by passing a StreamConfigurationOptions object to create_stream():
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import StreamConfigurationOptions, RecordType, TableProperties
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
# Optional: Create a custom callback class
class MyCallback(AckCallback):
def on_ack(self, offset: int):
print(f"Ack: {offset}")
# Create options with custom configuration
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
max_inflight_records=10000,
recovery=True,
recovery_timeout_ms=20000,
ack_callback=MyCallback() # Optional - can be None
)
# Pass options when creating the stream
stream = sdk.create_stream(
client_id,
client_secret,
table_properties,
options # <-- Configuration options passed here
)
All options are optional - if not specified, defaults will be used.
Available Options
| Option | Type | Default | Description |
|---|---|---|---|
record_type |
RecordType |
RecordType.PROTO |
Serialization format: RecordType.PROTO or RecordType.JSON |
max_inflight_records |
int |
50000 |
Maximum number of unacknowledged records |
recovery |
bool |
True |
Enable automatic stream recovery |
recovery_timeout_ms |
int |
15000 |
Timeout for recovery operations (ms) |
recovery_backoff_ms |
int |
2000 |
Delay between recovery attempts (ms) |
recovery_retries |
int |
3 |
Maximum number of recovery attempts |
flush_timeout_ms |
int |
300000 |
Timeout for flush operations (ms) |
server_lack_of_ack_timeout_ms |
int |
60000 |
Server acknowledgment timeout (ms) |
stream_paused_max_wait_time_ms |
Optional[int] |
None |
Max time (ms) to wait during graceful stream close. None = wait for full server duration, 0 = immediate, x = wait up to min(x, server_duration) |
callback_max_wait_time_ms |
Optional[int] |
5000 |
Max time (ms) to wait for callbacks to finish after close(). None = wait forever, x = wait up to x ms |
ack_callback |
AckCallback |
None |
Callback invoked on record acknowledgment (must be a class extending AckCallback) |
Acknowledgment Callbacks
The ack_callback parameter requires a custom class extending AckCallback:
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions
class MyCallback(AckCallback):
def on_ack(self, offset: int):
# Called when a record is successfully acknowledged
print(f"Record at offset {offset} was acknowledged")
# You can track metrics, update UI, etc.
def on_error(self, offset: int, error_message: str):
# Called when a record encounters an error
print(f"Record at offset {offset} failed: {error_message}")
# Handle errors, log, retry, etc.
# Create options with the callback
options = StreamConfigurationOptions(
ack_callback=MyCallback()
)
# Use the options when creating a stream
stream = sdk.create_stream(
client_id,
client_secret,
table_properties,
options
)
Error Handling
The SDK raises two types of exceptions:
ZerobusException: Retriable errors (e.g., network issues, temporary server errors)NonRetriableException: Non-retriable errors (e.g., invalid credentials, missing table)
from zerobus.sdk.shared import ZerobusException, NonRetriableException
try:
stream.ingest_record(record)
except NonRetriableException as e:
# Fatal error - do not retry
print(f"Non-retriable error: {e}")
raise
except ZerobusException as e:
# Retriable error - can retry with backoff
print(f"Retriable error: {e}")
# Implement retry logic
API Reference
ZerobusSdk
Main entry point for the SDK.
Synchronous API:
from zerobus.sdk.sync import ZerobusSdk
sdk = ZerobusSdk(server_endpoint, unity_catalog_endpoint)
Constructor Parameters:
server_endpoint(str) - The Zerobus gRPC endpoint (e.g.,<workspace-id>.zerobus.<region>.cloud.databricks.comfor AWS, or<workspace-id>.zerobus.<region>.azuredatabricks.netfor Azure)unity_catalog_endpoint(str) - The Unity Catalog endpoint (your workspace URL)
Methods:
def create_stream(
client_id: str,
client_secret: str,
table_properties: TableProperties,
options: StreamConfigurationOptions = None,
headers_provider: HeadersProvider = None
) -> ZerobusStream
Creates a new ingestion stream using OAuth 2.0 Client Credentials authentication.
Parameters:
client_id(str) - OAuth client ID (ignored ifheaders_provideris provided)client_secret(str) - OAuth client secret (ignored ifheaders_provideris provided)table_properties(TableProperties) - Target table configurationoptions(StreamConfigurationOptions) - Stream behavior configuration (optional)headers_provider(HeadersProvider) - Custom headers provider (optional, defaults to OAuth)
Automatically includes these headers (when using default OAuth):
"authorization": "Bearer <oauth_token>"(fetched via OAuth 2.0 Client Credentials flow)"x-databricks-zerobus-table-name": "<table_name>"
Returns a ZerobusStream instance.
Asynchronous API:
from zerobus.sdk.aio import ZerobusSdk
sdk = ZerobusSdk(server_endpoint, unity_catalog_endpoint)
Methods:
async def create_stream(
client_id: str,
client_secret: str,
table_properties: TableProperties,
options: StreamConfigurationOptions = None,
headers_provider: HeadersProvider = None
) -> ZerobusStream
Creates a new ingestion stream using OAuth 2.0 Client Credentials authentication.
Parameters:
client_id(str) - OAuth client ID (ignored ifheaders_provideris provided)client_secret(str) - OAuth client secret (ignored ifheaders_provideris provided)table_properties(TableProperties) - Target table configurationoptions(StreamConfigurationOptions) - Stream behavior configuration (optional)headers_provider(HeadersProvider) - Custom headers provider (optional, defaults to OAuth)
Automatically includes these headers (when using default OAuth):
"authorization": "Bearer <oauth_token>"(fetched via OAuth 2.0 Client Credentials flow)"x-databricks-zerobus-table-name": "<table_name>"
Returns a ZerobusStream instance.
ZerobusStream
Represents an active ingestion stream.
Synchronous Methods:
Single Record Ingestion:
def ingest_record_offset(record: Union[Message, dict, bytes, str]) -> int
RECOMMENDED - Ingests a single record and returns the offset after queueing.
def ingest_record_nowait(record: Union[Message, dict, bytes, str]) -> None
RECOMMENDED - Fire-and-forget ingestion. Submits the record without waiting or returning an offset. Best for maximum throughput.
def ingest_record(record: Union[Message, dict, bytes, str]) -> RecordAcknowledgment
DEPRECATED since v0.3.0 - Use ingest_record_offset() or ingest_record_nowait() instead for better performance.
Batch Ingestion:
def ingest_records_offset(records: List[Union[Message, dict, bytes, str]]) -> int
Ingests a batch of records and returns the final offset immediately. More efficient than individual calls for bulk ingestion.
def ingest_records_nowait(records: List[Union[Message, dict, bytes, str]]) -> None
Fire-and-forget batch ingestion. Submits all records without waiting. Most efficient for bulk ingestion.
def get_unacked_records() -> List[bytes]
Returns a list of unacknowledged records (as raw bytes). These are records that have been ingested but not yet acknowledged by the server.
Important: Records are returned in their serialized form:
- JSON mode: Decode with
json.loads(record.decode('utf-8')) - Protobuf mode: Deserialize with
YourMessage.FromString(record)or use as-is if pre-serialized
Useful for recovery and monitoring.
def get_unacked_batches() -> List[List[bytes]]
Returns a list of unacknowledged batches, where each batch is a list of records (as raw bytes). These are batches that have been sent but not yet acknowledged by the server.
Important: Records are returned in their serialized form (see get_unacked_records() for decoding).
Useful for batch retry logic.
Stream Management:
def flush() -> None
Flushes all pending records and waits for server acknowledgment. Does not close the stream.
def close() -> None
Flushes and closes the stream gracefully. Always call in a finally block.
Accepted Record Types (all methods):
- JSON mode:
dict(SDK serializes) orstr(pre-serialized JSON string) - Protobuf mode:
Messageobject (SDK serializes) orbytes(pre-serialized)
Asynchronous Methods:
Single Record Ingestion:
async def ingest_record_offset(record: Union[Message, dict, bytes, str]) -> int
RECOMMENDED - Ingests a single record and returns the offset after queueing.
def ingest_record_nowait(record: Union[Message, dict, bytes, str]) -> None
RECOMMENDED - Fire-and-forget ingestion. Submits the record without waiting. Not async (don't use await). Best for maximum throughput.
async def ingest_record(record: Union[Message, dict, bytes, str]) -> Awaitable
DEPRECATED since v0.3.0 - Use ingest_record_offset() or ingest_record_nowait() instead for better performance.
Batch Ingestion:
async def ingest_records_offset(records: List[Union[Message, dict, bytes, str]]) -> int
Ingests a batch of records and returns the final offset immediately. More efficient than individual calls for bulk ingestion.
def ingest_records_nowait(records: List[Union[Message, dict, bytes, str]]) -> None
Fire-and-forget batch ingestion. Submits all records without waiting. Not async (don't use await). Most efficient for bulk ingestion.
Offset Tracking:
async def wait_for_offset(offset: int) -> None
Waits for a specific offset to be acknowledged by the server. Useful when you have an offset from ingest_record_offset() and want to ensure it's durably written:
offset = await stream.ingest_record_offset(record)
# Do other work...
await stream.wait_for_offset(offset) # Ensure this offset is acknowledged
Stream Monitoring:
async def get_unacked_records() -> List[bytes]
Returns a list of unacknowledged records (as raw bytes). These are records that have been ingested but not yet acknowledged by the server.
Important: Records are returned in their serialized form:
- JSON mode: Decode with
json.loads(record.decode('utf-8')) - Protobuf mode: Deserialize with
YourMessage.FromString(record)or use as-is if pre-serialized
Useful for recovery and monitoring.
async def get_unacked_batches() -> List[List[bytes]]
Returns a list of unacknowledged batches, where each batch is a list of records (as raw bytes). These are batches that have been sent but not yet acknowledged by the server.
Important: Records are returned in their serialized form (see get_unacked_records() for decoding).
Useful for batch retry logic.
Stream Management:
async def flush() -> None
Flushes all pending records and waits for server acknowledgment. Does not close the stream.
async def close() -> None
Flushes and closes the stream gracefully. Always call in a finally block.
Returns the unique stream ID assigned by the server.
Accepted Record Types (all methods):
- JSON mode:
dict(SDK serializes) orstr(pre-serialized JSON string) - Protobuf mode:
Messageobject (SDK serializes) orbytes(pre-serialized)
TableProperties
Configuration for the target table.
Constructor:
TableProperties(table_name: str, descriptor: Descriptor = None)
Parameters:
table_name(str) - Fully qualified table name (e.g.,catalog.schema.table)descriptor(Descriptor) - Protobuf message descriptor (e.g.,MyMessage.DESCRIPTOR). Required for protobuf mode, not needed for JSON mode.
Examples:
# JSON mode
table_properties = TableProperties("catalog.schema.table")
# Protobuf mode (default)
table_properties = TableProperties("catalog.schema.table", record_pb2.MyMessage.DESCRIPTOR)
HeadersProvider
Abstract base class for providing custom authentication headers to gRPC streams.
Default: The SDK handles OAuth 2.0 Client Credentials authentication internally when you provide client_id and client_secret to create_stream(). You don't need to implement any headers provider for standard OAuth authentication.
Custom Implementation: For advanced use cases (e.g., custom token providers, non-OAuth authentication), you can implement a custom HeadersProvider by extending the base class and implementing the get_headers() method. Custom providers must include both the authorization and x-databricks-zerobus-table-name headers. See example files for implementation details.
StreamConfigurationOptions
Configuration options for stream behavior.
Constructor:
StreamConfigurationOptions(
record_type: RecordType = RecordType.PROTO,
max_inflight_records: int = 50000,
recovery: bool = True,
recovery_timeout_ms: int = 15000,
recovery_backoff_ms: int = 2000,
recovery_retries: int = 3,
flush_timeout_ms: int = 300000,
server_lack_of_ack_timeout_ms: int = 60000,
stream_paused_max_wait_time_ms: Optional[int] = None,
callback_max_wait_time_ms: Optional[int] = 5000,
ack_callback: AckCallback = None
)
Parameters:
record_type(RecordType) - Serialization format:RecordType.PROTO(default) orRecordType.JSONmax_inflight_records(int) - Maximum number of unacknowledged records (default: 50000)recovery(bool) - Enable or disable automatic stream recovery (default: True)recovery_timeout_ms(int) - Recovery operation timeout in milliseconds (default: 15000)recovery_backoff_ms(int) - Delay between recovery attempts in milliseconds (default: 2000)recovery_retries(int) - Maximum number of recovery attempts (default: 3)flush_timeout_ms(int) - Flush operation timeout in milliseconds (default: 300000)server_lack_of_ack_timeout_ms(int) - Server acknowledgment timeout in milliseconds (default: 60000)stream_paused_max_wait_time_ms(Optional[int]) - Maximum time in milliseconds to wait during graceful stream close. When the server signals stream closure, the SDK can pause and wait for in-flight records to be acknowledged.None= wait for full server-specified duration (most graceful),0= immediate recovery,x= wait up to min(x, server_duration) milliseconds (default: None)callback_max_wait_time_ms(Optional[int]) - Maximum time in milliseconds to wait for callbacks to finish after callingclose()on the stream.None= wait forever,x= wait up to x milliseconds (default: 5000)ack_callback(AckCallback) - Callback to be invoked when records are acknowledged or encounter errors. Must be a custom class extendingAckCallbackthat implementson_ack()and optionallyon_error()methods. (default: None)
Example:
from zerobus.sdk.shared import StreamConfigurationOptions, RecordType, AckCallback
# Create a custom callback class
class MyCallback(AckCallback):
def on_ack(self, offset: int):
print(f"Ack: {offset}")
def on_error(self, offset: int, error_message: str):
print(f"Error at {offset}: {error_message}")
# Use the callback in options
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
max_inflight_records=10000,
ack_callback=MyCallback()
)
# Pass to create_stream()
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
AckCallback
Abstract base class for custom acknowledgment callbacks.
Usage:
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions
class MyCallback(AckCallback):
def on_ack(self, offset: int):
# Called when a record is acknowledged by the server
print(f"Record at offset {offset} acknowledged")
# Add custom logic: metrics, logging, UI updates, etc.
def on_error(self, offset: int, error_message: str):
# Called when a record encounters an error
print(f"Record at offset {offset} failed: {error_message}")
# Add custom error handling
# Use in StreamConfigurationOptions
options = StreamConfigurationOptions(ack_callback=MyCallback())
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
Methods:
def on_ack(self, offset: int) -> None
Called when a record is successfully acknowledged by the server.
Parameters:
offset(int) - The offset of the acknowledged record
def on_error(self, offset: int, error_message: str) -> None
Called when a record encounters an error.
Parameters:
offset(int) - The offset of the failed recorderror_message(str) - Description of the error
RecordAcknowledgment (Sync API only)
Future-like object for waiting on acknowledgments.
Methods:
def wait_for_ack(timeout_sec: float = None) -> None
Blocks until the record is acknowledged or timeout is reached.
def add_done_callback(callback: Callable) -> None
Adds a callback to be invoked when the record is acknowledged.
def is_done() -> bool
Returns True if the record has been acknowledged.
ZerobusException
Base exception for retriable errors.
Constructor:
ZerobusException(message: str, cause: Exception = None)
NonRetriableException
Exception for non-retriable errors (extends ZerobusException).
Constructor:
NonRetriableException(message: str, cause: Exception = None)
Best Practices
- Reuse SDK instances: Create one
ZerobusSdkinstance per application - Stream lifecycle: Always close streams in a
finallyblock to ensure all records are flushed - Batch size: Adjust
max_inflight_recordsbased on your throughput requirements - Error handling: Implement proper retry logic for retriable errors
- Monitoring: Use
ack_callbackto track ingestion progress - Choose the right API: Use sync API for low-volume, async API for high-volume ingestion
- Token refresh: Tokens are automatically refreshed on stream creation and recovery
Handling Stream Failures
Note: The SDK automatically handles retries and recovery for transient errors. These methods are only needed when a stream has permanently failed (e.g., non-retriable error, max retries exceeded, or stream closed).
When a stream permanently fails, you can retrieve unacknowledged records to save them or retry with a new stream.
When to Use These Methods
Use get_unacked_records() and get_unacked_batches() when:
- Stream closed due to non-retriable error
- Maximum retry attempts exceeded
- You need to abandon the stream and save pending data
- Implementing custom failure handling logic
Retrieving Unacknowledged Records After Failure
Synchronous:
from zerobus import NonRetriableException
try:
for i in range(10000):
stream.ingest_record_offset(record)
stream.flush()
except NonRetriableException as e:
# Stream failed permanently - retrieve unacked records
print(f"Stream failed: {e}")
unacked_records = stream.get_unacked_records() # Returns List[bytes]
unacked_batches = stream.get_unacked_batches() # Returns List[List[bytes]]
print(f"Lost {len(unacked_records)} unacknowledged records")
# Save to file or database for later retry
with open('failed_records.bin', 'wb') as f:
for record in unacked_records:
f.write(len(record).to_bytes(4, 'big')) # Write length prefix
f.write(record)
Asynchronous:
from zerobus import NonRetriableException
try:
for i in range(10000):
await stream.ingest_record_offset(record)
await stream.flush()
except NonRetriableException as e:
# Stream failed permanently - retrieve unacked records
print(f"Stream failed: {e}")
unacked_records = await stream.get_unacked_records() # Returns List[bytes]
unacked_batches = await stream.get_unacked_batches() # Returns List[List[bytes]]
print(f"Lost {len(unacked_records)} unacknowledged records")
# Save for later retry with a new stream
import pickle
with open('failed_records.pkl', 'wb') as f:
pickle.dump(unacked_records, f)
Retrying with a New Stream
After retrieving unacked records, create a new stream to retry them:
JSON Mode:
import json
# After stream failure, get unacked records
unacked_records = stream.get_unacked_records()
print(f"Retrieved {len(unacked_records)} unacked records")
# Close the failed stream
stream.close()
# Create a new stream
new_stream = sdk.create_stream(client_id, client_secret, table_properties, options)
# Retry unacked records with the new stream
for record_bytes in unacked_records:
# Option 1: Pass bytes directly (most efficient)
new_stream.ingest_record_offset(record_bytes)
# Option 2: Decode and inspect before retrying
# record_dict = json.loads(record_bytes.decode('utf-8'))
# print(f"Retrying: {record_dict}")
# new_stream.ingest_record_offset(record_dict)
new_stream.flush()
new_stream.close()
Protobuf Mode:
import your_proto_pb2
# After stream failure, get unacked records
unacked_records = stream.get_unacked_records()
print(f"Retrieved {len(unacked_records)} unacked records")
# Close the failed stream
stream.close()
# Create a new stream
new_stream = sdk.create_stream(client_id, client_secret, table_properties, options)
# Retry unacked records with the new stream
for record_bytes in unacked_records:
# Option 1: Pass bytes directly (most efficient)
new_stream.ingest_record_offset(record_bytes)
# Option 2: Deserialize and inspect before retrying
# record = your_proto_pb2.YourMessage()
# record.ParseFromString(record_bytes)
# print(f"Retrying: {record}")
# new_stream.ingest_record_offset(record)
new_stream.flush()
new_stream.close()
Batch Retry
# Get unacknowledged batches from failed stream
unacked_batches = stream.get_unacked_batches()
print(f"Retrieved {len(unacked_batches)} unacked batches")
# Close the failed stream
stream.close()
# Create a new stream
new_stream = sdk.create_stream(client_id, client_secret, table_properties, options)
# Retry entire batches at once
for batch in unacked_batches:
new_stream.ingest_records_offset(batch) # Batch retry
new_stream.flush()
new_stream.close()
Performance Tips
The SDK provides multiple ingestion methods optimized for different use cases:
Method Comparison
| Method | Throughput | Acknowledgment | Use Case |
|---|---|---|---|
ingest_record() |
Low | Yes, tracked | When you need individual record tracking |
ingest_record_offset() |
Medium | Returns offset | When you need offsets but not full tracking |
ingest_record_nowait() |
Highest | No | Maximum throughput, fire-and-forget |
Performance Comparison
Benchmarked with 100k records on a local connection:
| Record Size | ingest_record (sequential) |
ingest_record_nowait |
|---|---|---|
| 20 bytes | 0.35 MB/s | 7.55 MB/s (20x faster) |
| 220 bytes | 2.00 MB/s | 77 MB/s (38x faster) |
| 750 bytes | 16 MB/s | 257 MB/s (16x faster) |
| 10 KB | 188 MB/s | 382 MB/s (2x faster) |
Key Insight: The performance gap is largest for small records due to context switching overhead in sequential awaits. Use batched submission or nowait methods for optimal throughput.
Debugging
Enabling Debug Logs
The SDK uses Rust's tracing framework for logging. You can control log levels using the RUST_LOG environment variable:
# Set log level to debug for all components
export RUST_LOG=debug
# Set log level to trace for detailed debugging
export RUST_LOG=trace
# Set log level only for zerobus SDK components
export RUST_LOG=zerobus_sdk=debug
# Multiple targets with different levels
export RUST_LOG=zerobus_sdk=trace,tokio=info
Log Levels (from least to most verbose):
error- Only errorswarn- Warnings and errorsinfo- Informational messages (default)debug- Detailed debugging informationtrace- Very detailed trace information
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file databricks_zerobus_ingest_sdk-0.3.0.tar.gz.
File metadata
- Download URL: databricks_zerobus_ingest_sdk-0.3.0.tar.gz
- Upload date:
- Size: 61.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3e0405f2c1db7d5da787ab4f4bbe995f3f46fbc8b616bc9f69dae74c2d37113b
|
|
| MD5 |
cc2425e7dd51cb68faaaf53d6becc97d
|
|
| BLAKE2b-256 |
50c01f170f70c64506e41434d3e8f10644d5db4131e76f75d288841aadbb123d
|
Provenance
The following attestation bundles were made for databricks_zerobus_ingest_sdk-0.3.0.tar.gz:
Publisher:
release.yml on databricks/zerobus-sdk-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databricks_zerobus_ingest_sdk-0.3.0.tar.gz -
Subject digest:
3e0405f2c1db7d5da787ab4f4bbe995f3f46fbc8b616bc9f69dae74c2d37113b - Sigstore transparency entry: 969489513
- Sigstore integration time:
-
Permalink:
databricks/zerobus-sdk-py@f1e490c63d59ca18b6029a90a810e5b0c156be81 -
Branch / Tag:
refs/heads/elenagaljak-db-release_fix2 - Owner: https://github.com/databricks
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@f1e490c63d59ca18b6029a90a810e5b0c156be81 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-win_amd64.whl.
File metadata
- Download URL: databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-win_amd64.whl
- Upload date:
- Size: 3.6 MB
- Tags: CPython 3.9+, Windows x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
94e76c6cdd919aff2a286d0f97a9d91dfee36882634e7a4cb292067930e4590d
|
|
| MD5 |
46f11502f312317a3cdabdb5711d7347
|
|
| BLAKE2b-256 |
6b0db3a7971bef49a8a204cabd3d4173985a71838a6a0b7ca05b829e9b385938
|
Provenance
The following attestation bundles were made for databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-win_amd64.whl:
Publisher:
release.yml on databricks/zerobus-sdk-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-win_amd64.whl -
Subject digest:
94e76c6cdd919aff2a286d0f97a9d91dfee36882634e7a4cb292067930e4590d - Sigstore transparency entry: 963038508
- Sigstore integration time:
-
Permalink:
databricks/zerobus-sdk-py@7e702265fda3a70ca8ea1c6cf08d91366f8d9f3e -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/databricks
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7e702265fda3a70ca8ea1c6cf08d91366f8d9f3e -
Trigger Event:
push
-
Statement type:
File details
Details for the file databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_34_x86_64.whl.
File metadata
- Download URL: databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_34_x86_64.whl
- Upload date:
- Size: 4.2 MB
- Tags: CPython 3.9+, manylinux: glibc 2.34+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ec21d5118d2acb2053c098c2247c9371a2b83aa168c4d9c06a4272e05bc11c74
|
|
| MD5 |
d3761cdb1913b5583608aeb5ad1e8f6e
|
|
| BLAKE2b-256 |
f59208c64e373728de74d9238637004f18ef2c2069c02d99c89a6c40fafb7832
|
Provenance
The following attestation bundles were made for databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_34_x86_64.whl:
Publisher:
release.yml on databricks/zerobus-sdk-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_34_x86_64.whl -
Subject digest:
ec21d5118d2acb2053c098c2247c9371a2b83aa168c4d9c06a4272e05bc11c74 - Sigstore transparency entry: 963038512
- Sigstore integration time:
-
Permalink:
databricks/zerobus-sdk-py@7e702265fda3a70ca8ea1c6cf08d91366f8d9f3e -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/databricks
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7e702265fda3a70ca8ea1c6cf08d91366f8d9f3e -
Trigger Event:
push
-
Statement type:
File details
Details for the file databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.
File metadata
- Download URL: databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
- Upload date:
- Size: 3.5 MB
- Tags: CPython 3.9+, manylinux: glibc 2.17+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ff9d78991d87d81a25c846411a6c8b96ac763e728fe3ff65d6289fdddcfd32f
|
|
| MD5 |
6bbef851f15e922064853f30309a991d
|
|
| BLAKE2b-256 |
32b4a8cd821d4f10d4993ba399ea92ed1132ff699cff1919cc8ff0c0131a5966
|
Provenance
The following attestation bundles were made for databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:
Publisher:
release.yml on databricks/zerobus-sdk-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl -
Subject digest:
8ff9d78991d87d81a25c846411a6c8b96ac763e728fe3ff65d6289fdddcfd32f - Sigstore transparency entry: 963038519
- Sigstore integration time:
-
Permalink:
databricks/zerobus-sdk-py@7e702265fda3a70ca8ea1c6cf08d91366f8d9f3e -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/databricks
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7e702265fda3a70ca8ea1c6cf08d91366f8d9f3e -
Trigger Event:
push
-
Statement type:
File details
Details for the file databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 4.0 MB
- Tags: CPython 3.9+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f315d916b073e56b339a96b1aa25b94fdbc0a36f47028b2898294a0fbf5b7102
|
|
| MD5 |
ed7b18a03ae4fb8508fc0a1754bf6b6e
|
|
| BLAKE2b-256 |
f31af3e6711b96f19ec3e5b427d327f09a89df362114fdec2dbfafbebe065fe2
|
File details
Details for the file databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-macosx_10_12_x86_64.whl.
File metadata
- Download URL: databricks_zerobus_ingest_sdk-0.3.0-cp39-abi3-macosx_10_12_x86_64.whl
- Upload date:
- Size: 4.1 MB
- Tags: CPython 3.9+, macOS 10.12+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e8cba8d20d702cac5625d9ba851d396e71ec1f67def13714cf305cdf705ac9fb
|
|
| MD5 |
04138d9b80c61bd905b0eeb849b8f4dd
|
|
| BLAKE2b-256 |
3e4881a599e18296b3865d7416324f7f1c2c8941407d3916a42aa868ff758a03
|