DB-API v2 compliant driver for Confluent Cloud Flink SQL
Project description
confluent-sql
A DB-API v2 compliant Python driver for Confluent Cloud Flink SQL services.
Overview
The confluent-sql library provides a standard DB-API v2 interface for connecting to and
executing SQL queries against Confluent Cloud Flink SQL services. This allows you to use
familiar database programming patterns with Confluent's streaming SQL capabilities.
Status
This is pre-production code mainly developed as the lower level portion of a dbt adaptor for Confluent Cloud Flink, but is aimed to also be a reasonable standalone dbapi+ driver for python programs to interact with Confluent Flink SQL.
The behavior of snapshot-mode cursors, complying with dbapi semantics, are well stable. The streaming query extensions are more of a work in progress at this time. Feedback and suggestions are welcome!
⚠️ Early Access: Snapshot queries on Confluent Cloud Flink SQL are currently in Early Access and may be subject to change. You will need to request access to snapshot queries for your organization from Confluent. The driver defaults to snapshot mode for all queries unless streaming mode is explicitly requested.
Prerequisites
- Confluent Cloud account with Flink environment
- Active Flink compute pool (must be pre-created)
- Existing Flink Database (Confluent Cloud Kafka cluster)
- Flink Region API credentials for Flink SQL Region API access: a user or service account Flink region API key and secret.
How to Obtain a Flink Region API Key
This driver requires a Flink Region API key (also called a Flink SQL API key), which is specific to your Flink compute pool and provides access to the regional Flink SQL API endpoints. This is distinct from a Confluent Cloud control-plane API key.
To create or find a Flink Region API key:
- Go to https://confluent.cloud/settings/api-keys
- Filter by resource 'Flink Region'
- Find an existing key, or follow the '+ Add API Key' path to create a new one
- When creating a new key, select either:
- My account - for development/testing
- Service account - for production applications (recommended) and then:
- The Environment, Cloud Provider and Cloud Region matching the Flink database(s) / Kafka cluster(s) you intend to use this driver against.
- Save both the API key and API secret securely (the secret cannot be retrieved later)
- Use these credentials as the
flink_api_keyandflink_api_secretparameters in theconnect()function
API keys may also be generated using the Confluent CLI or by API access, outside the scope of this document.
Installation
# Using pip
pip install confluent-sql
# Using uv (recommended)
uv add confluent-sql
Quick Start
Setup the connection:
import confluent_sql
# Connect to Confluent Cloud Flink SQL
connection = confluent_sql.connect(
organization_id="your-org-uuid",
environment_id="env-123456",
cloud_provider="aws",
cloud_region="us-east-2",
flink_api_key="your-flink-api-key",
flink_api_secret="your-flink-api-secret",
database="your-database-name",
compute_pool_id="lfcp-789012"
)
Create a cursor and run a point-in-time SNAPSHOT query:
cursor = connection.cursor()
cursor.execute("SELECT customer_id, name FROM customers")
Fetch results using fetchone(), fetchmany() and fetchall():
print(cursor.fetchone())
print(cursor.fetchmany(2))
print(cursor.fetchall())
Fetch results using the cursor as an iterator:
for row in cursor:
print(row)
Clean up:
cursor.close()
connection.close()
Dictionary Result Rows:
...
cursor = connection.cursor(as_dict=True)
cursor.execute("SELECT customer_id, name, email FROM customers WHERE customer_id = %s", (123,))
row = cursor.fetchone()
print(row["customer_id"]) # Access by column name
Streaming Queries:
import time
...
# Execute a streaming statement, runs and produces results indefinitely until
# we stop consuming its results or the statement is stopped or deleted via API ...
cursor = connection.streaming_cursor()
cursor.execute("SELECT * FROM orders_stream WHERE total > %s", (1000,))
while cursor.may_have_results:
rows = cursor.fetchmany(10)
if rows:
for row in rows:
print(row)
else:
time.sleep(0.1)
Parameterized Statement and Flink to Python Value Support
This driver supports all Flink types, some with caveats. Please consult the type support documentation for more details.
DB-API Extensions
This driver extends the standard DB-API v2 interface with additional features:
- Dictionary result rows - Access columns by name instead of position
- Streaming cursors - Non-blocking result consumption from continuous queries
- Changelog compression - Automatic state management for aggregations and joins
- Statement lifecycle management - Named statements, labels, and resource management
- Type system - Full support for all Flink SQL types including streaming-specific types
- Performance monitoring - Built-in fetch metrics and introspection
Architecture & How It Works
The confluent-sql driver communicates with Confluent Cloud Flink SQL through HTTP-based APIs. Unlike in traditional databases, statements are first-class entities on the server with their own lifecycle,
allowing features like:
- Named statements - Identify and recover queries across connections
- Persistent execution - Statements survive connection close and can be resumed
- Batch management - Label related statements for group operations
For an in-depth explanation of the HTTP architecture and statement lifecycle, see ARCHITECTURE.md.
Complete Documentation
For comprehensive documentation of all DB-API extensions, see DBAPI_EXTENSIONS.md.
For detailed streaming query guidance, see STREAMING.md.
For type support and examples, see TYPES.md.
Private Networking Considerations
By default, this driver uses the public Confluent Cloud API networking endpoint for the provided cloud provider and region. However, if the Flink database / Kafka cluster you intend to query requires private networking connectivity, then provide the appropriate Flink private networking base URL as the endpoint parameter
to connect() or Connection.__init__(). Refer to the Confluent Cloud Flink private networking documentation for more information on composing your endpoint URL.
Symptoms of using the public endpoint when private networking is required include:
- HTTP 429-related exceptions raised when submitting statements querying tables whose backing Kafka topics / clusters are configured for private networking only.
- Empty or surprisingly missing results when querying
INFORMATION_SCHEMAorSHOW TABLES, due to silent filtering of private-networking-only tables/topics when querying the system catalog.
Development
Setup
# Clone repository
git clone <repository-url>
cd confluent-sql
# Install uv if needed
pip install uv
# Install dependencies
uv sync
# Install in development mode
uv pip install -e .
Running Tests
Set required environment variables for integration tests. If any of the variables is not set, integration tests will be skipped.
export CONFLUENT_ORG_ID="org-123456"
export CONFLUENT_ENV_ID="env-123456"
export CONFLUENT_CLOUD_PROVIDER="aws"
export CONFLUENT_CLOUD_REGION="us-east-2"
export CONFLUENT_FLINK_API_KEY="your-key" # Flink Region API key for the above cloud/region ...
export CONFLUENT_FLINK_API_SECRET="your-secret" # and associated secret.
export CONFLUENT_COMPUTE_POOL_ID="lfcp-789012" # A compute pool within the above cloud/region.
export CONFLUENT_TEST_DBNAME="test-db" # A database/kafka cluster name within the above cloud/region.
Run tests:
uv run pytest
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file confluent_sql-0.3.0.tar.gz.
File metadata
- Download URL: confluent_sql-0.3.0.tar.gz
- Upload date:
- Size: 188.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9791ad7b66d43b2d4cbb8a13d4d45c4cb44e645818d051aa7a8d493a216974f7
|
|
| MD5 |
24d9ca719f7fbaf5176228350432bbd2
|
|
| BLAKE2b-256 |
2f80f2482406c87fb325546c38cabca9bc32ee26a03bc41b58b3ac8aa0cd6b73
|
File details
Details for the file confluent_sql-0.3.0-py3-none-any.whl.
File metadata
- Download URL: confluent_sql-0.3.0-py3-none-any.whl
- Upload date:
- Size: 73.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e591a6f1433d2ac6650dc4835b429cf4a69048a31fcde12be6fd00e724eb662c
|
|
| MD5 |
0dd4271bb3b3c10d779042df5463271c
|
|
| BLAKE2b-256 |
6692df5f3daa8eb7ca73a2ceb70baccf90c97e3fd12e70aa24a6f7fc52ef42b3
|