Databricks Zerobus Ingest SDK for Python
Project description
Databricks Zerobus Ingest SDK for Python
Public Preview: This SDK is supported for production use cases and is available to all customers. Databricks is actively working on stabilizing the Zerobus Ingest SDK for Python. Minor version updates may include backwards-incompatible changes.
We are keen to hear feedback from you on this SDK. Please file issues, and we will address them.
The Databricks Zerobus Ingest SDK for Python provides a high-performance client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol. | See also the SDK for Rust | See also the SDK for Java
Table of Contents
- Disclaimer
- Features
- Requirements
- Quick Start User Guide
- Usage Examples
- Configuration
- Error Handling
- API Reference
- Best Practices
Features
- High-throughput ingestion: Optimized for high-volume data ingestion
- Automatic recovery: Built-in retry and recovery mechanisms
- Flexible configuration: Customizable stream behavior and timeouts
- Protocol Buffers: Strongly-typed schema using protobuf
- OAuth 2.0 authentication: Secure authentication with client credentials
- Sync and Async support: Both synchronous and asynchronous APIs
- Comprehensive logging: Detailed logging using Python's standard logging framework
Requirements
Runtime Requirements
- Python: 3.9 or higher
- Databricks workspace with Zerobus access enabled
Dependencies
protobuf>= 6.31.0, < 7.0grpcio>= 1.60.0, < 2.0requests>= 2.28.1, < 3
Quick Start User Guide
Prerequisites
Before using the SDK, you'll need the following:
1. Workspace URL and Workspace ID
After logging into your Databricks workspace, look at the browser URL:
https://<databricks-instance>.cloud.databricks.com/o=<workspace-id>
- Workspace URL: The part before
/o=→https://<databricks-instance>.cloud.databricks.com - Workspace ID: The part after
/o=→<workspace-id>
Example:
- Full URL:
https://dbc-a1b2c3d4-e5f6.cloud.databricks.com/o=1234567890123456 - Workspace URL:
https://dbc-a1b2c3d4-e5f6.cloud.databricks.com - Workspace ID:
1234567890123456
2. Create a Delta Table
Create a table using Databricks SQL:
CREATE TABLE <catalog_name>.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;
Replace <catalog_name> with your catalog name (e.g., main).
3. Create a Service Principal
- Navigate to Settings > Identity and Access in your Databricks workspace
- Click Service principals and create a new service principal
- Generate a new secret for the service principal and save it securely
- Grant the following permissions:
USE_CATALOGon the catalog (e.g.,main)USE_SCHEMAon the schema (e.g.,default)MODIFYandSELECTon the table (e.g.,air_quality)
Grant permissions using SQL:
-- Grant catalog permission
GRANT USE CATALOG ON CATALOG <catalog_name> TO `<service-principal-application-id>`;
-- Grant schema permission
GRANT USE SCHEMA ON SCHEMA <catalog_name>.default TO `<service-principal-application-id>`;
-- Grant table permissions
GRANT SELECT, MODIFY ON TABLE <catalog_name>.default.air_quality TO `<service-principal-application-id>`;
Installation
From PyPI
Install the latest stable version using pip:
pip install databricks-zerobus-ingest
From Source
Clone the repository and install from source:
git clone https://github.com/databricks/zerobus-sdk-py.git
cd zerobus-sdk-py
pip install -e .
Define Your Protocol Buffer Schema
Create a file named record.proto:
syntax = "proto2";
message AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}
Compile the protobuf:
pip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto
This generates a record_pb2.py file compatible with protobuf 6.x.
Generate Protocol Buffer Schema from Unity Catalog (Alternative)
Instead of manually writing and compiling your protobuf schema, you can automatically generate it from an existing Unity Catalog table schema using the included generate_proto.py tool.
Using the Proto Generation Tool
The generate_proto.py tool fetches your table schema from Unity Catalog and generates a corresponding proto2 definition file with the correct type mappings.
Basic Usage:
python -m zerobus.tools.generate_proto \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "your-service-principal-application-id" \
--client-secret "your-service-principal-secret" \
--table "main.default.air_quality" \
--output "record.proto"
Parameters:
--uc-endpoint: Your workspace URL (e.g.,https://dbc-a1b2c3d4-e5f6.cloud.databricks.com)--client-id: Service principal application ID--client-secret: Service principal secret--table: Fully qualified table name (catalog.schema.table)--output: Output path for the generated proto file--proto-msg: (Optional) Name for the protobuf message (defaults to table name)
Example:
For a table defined as:
CREATE TABLE main.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;
Running the generation tool will create record.proto:
syntax = "proto2";
message air_quality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}
After generating the proto file, compile it as shown above:
pip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto
Type Mappings:
The tool automatically maps Unity Catalog types to proto2 types:
| Delta Type | Proto2 Type |
|---|---|
| INT, SMALLINT, SHORT | int32 |
| BIGINT, LONG | int64 |
| FLOAT | float |
| DOUBLE | double |
| STRING, VARCHAR | string |
| BOOLEAN | bool |
| BINARY | bytes |
| DATE | int32 |
| TIMESTAMP | int64 |
| ARRAY<type> | repeated type |
| MAP<key, value> | map<key, value> |
| STRUCT<fields> | nested message |
Benefits:
- No manual schema creation required
- Ensures schema consistency between your table and protobuf definitions
- Automatically handles complex types (arrays, maps, structs)
- Reduces errors from manual type mapping
Write Your Client Code
Synchronous Example
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Configuration
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties
table_properties = TableProperties(
table_name,
record_pb2.AirQuality.DESCRIPTOR
)
# Create stream
stream = sdk.create_stream(
client_id,
client_secret,
table_properties
)
try:
# Ingest records
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
ack = stream.ingest_record(record)
ack.wait_for_ack() # Wait for durability
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
stream.close()
Asynchronous Example
import asyncio
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
async def main():
# Configuration
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties
table_properties = TableProperties(
table_name,
record_pb2.AirQuality.DESCRIPTOR
)
# Create stream
stream = await sdk.create_stream(
client_id,
client_secret,
table_properties
)
try:
# Ingest records
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
future = await stream.ingest_record(record)
await future # Wait for durability
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
await stream.close()
asyncio.run(main())
Usage Examples
See the examples/ directory for complete, runnable examples:
- sync_example.py - Synchronous ingestion with progress tracking and all SDK features
- async_example.py - Asynchronous ingestion using asyncio with acknowledgment callbacks
Both examples are fully functional and demonstrate:
- SDK initialization and configuration
- Stream creation and management
- Record ingestion (sync/async)
- Progress tracking and callbacks
- Error handling
- Performance metrics
- Proper resource cleanup
To run the examples, set your credentials as environment variables and execute the scripts. See examples/README.md for detailed instructions.
Blocking Ingestion
Ingest records synchronously, waiting for each record to be acknowledged:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
stream = sdk.create_stream(client_id, client_secret, table_properties)
try:
for i in range(1000):
record = record_pb2.AirQuality(
device_name=f"sensor-{i}",
temp=20 + i % 15,
humidity=50 + i % 40
)
ack = stream.ingest_record(record)
ack.wait_for_ack() # Wait for durability
finally:
stream.close()
Non-Blocking Ingestion
Ingest records asynchronously for maximum throughput:
import asyncio
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import TableProperties, StreamConfigurationOptions
import record_pb2
# Configure logging (optional but recommended)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
async def main():
options = StreamConfigurationOptions(
max_inflight_records=50000,
ack_callback=lambda response: print(
f"Acknowledged offset: {response.durability_ack_up_to_offset}"
)
)
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
stream = await sdk.create_stream(
client_id,
client_secret,
table_properties,
options
)
futures = []
try:
for i in range(100000):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + i % 15,
humidity=50 + i % 40
)
future = await stream.ingest_record(record)
futures.append(future)
# Flush and wait for all records
await stream.flush()
await asyncio.gather(*futures)
finally:
await stream.close()
asyncio.run(main())
Configuration
Stream Configuration Options
| Option | Default | Description |
|---|---|---|
max_inflight_records |
50000 | Maximum number of unacknowledged records |
recovery |
True | Enable automatic stream recovery |
recovery_timeout_ms |
15000 | Timeout for recovery operations (ms) |
recovery_backoff_ms |
2000 | Delay between recovery attempts (ms) |
recovery_retries |
3 | Maximum number of recovery attempts |
flush_timeout_ms |
300000 | Timeout for flush operations (ms) |
server_lack_of_ack_timeout_ms |
60000 | Server acknowledgment timeout (ms) |
ack_callback |
None | Callback invoked on record acknowledgment |
Example Configuration
from zerobus.sdk.shared import StreamConfigurationOptions
options = StreamConfigurationOptions(
max_inflight_records=10000,
recovery=True,
recovery_timeout_ms=20000,
ack_callback=lambda response: print(
f"Ack: {response.durability_ack_up_to_offset}"
)
)
stream = sdk.create_stream(
client_id,
client_secret,
table_properties,
options
)
Error Handling
The SDK raises two types of exceptions:
ZerobusException: Retriable errors (e.g., network issues, temporary server errors)NonRetriableException: Non-retriable errors (e.g., invalid credentials, missing table)
from zerobus.sdk.shared import ZerobusException, NonRetriableException
try:
stream.ingest_record(record)
except NonRetriableException as e:
# Fatal error - do not retry
print(f"Non-retriable error: {e}")
raise
except ZerobusException as e:
# Retriable error - can retry with backoff
print(f"Retriable error: {e}")
# Implement retry logic
API Reference
ZerobusSdk
Main entry point for the SDK.
Synchronous API:
from zerobus.sdk.sync import ZerobusSdk
sdk = ZerobusSdk(server_endpoint, unity_catalog_endpoint)
Constructor Parameters:
server_endpoint(str) - The Zerobus gRPC endpoint (e.g.,<workspace-id>.zerobus.region.cloud.databricks.com)unity_catalog_endpoint(str) - The Unity Catalog endpoint (your workspace URL)
Methods:
def create_stream(
client_id: str,
client_secret: str,
table_properties: TableProperties,
options: StreamConfigurationOptions = None
) -> ZerobusStream
Creates a new ingestion stream. Returns a ZerobusStream instance.
Asynchronous API:
from zerobus.sdk.aio import ZerobusSdk
sdk = ZerobusSdk(server_endpoint, unity_catalog_endpoint)
Methods:
async def create_stream(
client_id: str,
client_secret: str,
table_properties: TableProperties,
options: StreamConfigurationOptions = None
) -> ZerobusStream
Creates a new ingestion stream. Returns a ZerobusStream instance.
ZerobusStream
Represents an active ingestion stream.
Synchronous Methods:
def ingest_record(record: Message) -> RecordAcknowledgment
Ingests a single record into the stream. Returns a RecordAcknowledgment for tracking.
def flush() -> None
Flushes all pending records and waits for server acknowledgment. Does not close the stream.
def close() -> None
Flushes and closes the stream gracefully. Always call in a finally block.
def get_state() -> StreamState
Returns the current stream state.
@property
def stream_id() -> str
Returns the unique stream ID assigned by the server.
Asynchronous Methods:
async def ingest_record(record: Message) -> Awaitable
Ingests a single record into the stream. Returns an awaitable that completes when the record is durably written.
async def flush() -> None
Flushes all pending records and waits for server acknowledgment. Does not close the stream.
async def close() -> None
Flushes and closes the stream gracefully. Always call in a finally block.
def get_state() -> StreamState
Returns the current stream state.
@property
def stream_id() -> str
Returns the unique stream ID assigned by the server.
TableProperties
Configuration for the target table.
Constructor:
TableProperties(table_name: str, descriptor: Descriptor)
Parameters:
table_name(str) - Fully qualified table name (e.g.,catalog.schema.table)descriptor(Descriptor) - Protobuf message descriptor (e.g.,MyMessage.DESCRIPTOR)
Properties:
@property
def table_name() -> str
Returns the table name.
@property
def descriptor() -> Descriptor
Returns the protobuf message descriptor.
StreamConfigurationOptions
Configuration options for stream behavior.
Constructor:
StreamConfigurationOptions(
max_inflight_records: int = 50000,
recovery: bool = True,
recovery_timeout_ms: int = 15000,
recovery_backoff_ms: int = 2000,
recovery_retries: int = 3,
flush_timeout_ms: int = 300000,
server_lack_of_ack_timeout_ms: int = 60000,
ack_callback: Callable = None
)
Parameters:
max_inflight_records(int) - Maximum number of unacknowledged records (default: 50000)recovery(bool) - Enable or disable automatic stream recovery (default: True)recovery_timeout_ms(int) - Recovery operation timeout in milliseconds (default: 15000)recovery_backoff_ms(int) - Delay between recovery attempts in milliseconds (default: 2000)recovery_retries(int) - Maximum number of recovery attempts (default: 3)flush_timeout_ms(int) - Flush operation timeout in milliseconds (default: 300000)server_lack_of_ack_timeout_ms(int) - Server acknowledgment timeout in milliseconds (default: 60000)ack_callback(Callable) - Callback to be invoked when records are acknowledged by the server (default: None)
RecordAcknowledgment (Sync API only)
Future-like object for waiting on acknowledgments.
Methods:
def wait_for_ack(timeout_sec: float = None) -> None
Blocks until the record is acknowledged or timeout is reached.
def add_done_callback(callback: Callable) -> None
Adds a callback to be invoked when the record is acknowledged.
def is_done() -> bool
Returns True if the record has been acknowledged.
StreamState (Enum)
Represents the lifecycle state of a stream.
Values:
UNINITIALIZED- Stream created but not yet initializedOPENED- Stream is open and accepting recordsFLUSHING- Stream is flushing pending recordsRECOVERING- Stream is recovering from a failureCLOSED- Stream has been gracefully closedFAILED- Stream has failed and cannot be recovered
ZerobusException
Base exception for retriable errors.
Constructor:
ZerobusException(message: str, cause: Exception = None)
NonRetriableException
Exception for non-retriable errors (extends ZerobusException).
Constructor:
NonRetriableException(message: str, cause: Exception = None)
Best Practices
- Reuse SDK instances: Create one
ZerobusSdkinstance per application - Stream lifecycle: Always close streams in a
finallyblock to ensure all records are flushed - Batch size: Adjust
max_inflight_recordsbased on your throughput requirements - Error handling: Implement proper retry logic for retriable errors
- Monitoring: Use
ack_callbackto track ingestion progress - Choose the right API: Use sync API for low-volume, async API for high-volume ingestion
- Token refresh: Tokens are automatically refreshed on stream creation and recovery
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file databricks_zerobus_ingest_sdk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: databricks_zerobus_ingest_sdk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 37.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5e4ff772c5525c49120d53b96ab680e3b3025e3889af8b759e8fc88fb34a6477
|
|
| MD5 |
92c5876d520d9113913d2c18e648dd73
|
|
| BLAKE2b-256 |
1cb00c77b068ace3494c7404d9142f49e96f4b64695f75f7a5e61788e87a11c7
|