Skip to main content

DB-API v2 compliant driver for Confluent Cloud Flink SQL

Project description

confluent-sql

A DB-API v2 compliant Python driver for Confluent Cloud Flink SQL services.

Overview

The confluent-sql library provides a standard DB-API v2 interface for connecting to and executing SQL queries against Confluent Cloud Flink SQL services. This allows you to use familiar database programming patterns with Confluent's streaming SQL capabilities.

Status

This is pre-production code mainly developed as the lower level portion of a dbt adaptor for Confluent Cloud Flink, but is aimed to also be a reasonable standalone dbapi+ driver for python programs to interact with Confluent Flink SQL.

The behavior of snapshot-mode cursors, complying with dbapi semantics, are well stable. The streaming query extensions are more of a work in progress at this time. Feedback and suggestions are welcome!

⚠️ Early Access: Snapshot queries on Confluent Cloud Flink SQL are currently in Early Access and may be subject to change. The driver defaults to snapshot mode for all queries unless streaming mode is explicitly requested.

Prerequisites

  • Confluent Cloud account with Flink environment
  • Active Flink compute pool (must be pre-created)
  • Flink Compute Pool API credentials for Flink SQL API access: a user or service account API key and secret for the compute pool (used as HTTP Basic Auth, for example via flink_api_key and flink_api_secret).

Installation

# Using pip
pip install confluent-sql

# Using uv (recommended)
uv add confluent-sql

Quick Start

Setup the connection:

import confluent_sql

# Connect to Confluent Cloud Flink SQL
connection = confluent_sql.connect(
    flink_api_key="your-flink-api-key",
    flink_api_secret="your-flink-api-secret",
    organization_id="your-org-uuid",
    environment="env-123456",
    compute_pool_id="lfcp-789012",
    cloud_provider="aws",
    cloud_region="us-east-2",
    dbname="your-database-name"
)

Create a cursor and run a point-in-time SNAPSHOT query:

cursor = connection.cursor()
cursor.execute("SELECT customer_id, name FROM customers")

Fetch results using fetchone(), fetchmany() and fetchall():

print(cursor.fetchone())
print(cursor.fetchmany(2))
print(cursor.fetchall())

Fetch results using the cursor as an iterator:

for row in cursor:
    print(row)

Clean up:

cursor.close()
connection.close()

Dictionary Result Rows:

...
cursor = connection.cursor(as_dict=True)
cursor.execute("SELECT customer_id, name, email FROM customers WHERE customer_id = %s", (123,))
row = cursor.fetchone()
print(row["customer_id"])  # Access by column name

Streaming Queries:

import time

...

# Execute a streaming statement, runs and produces results indefinitely until
# we stop consuming its results or the statement is stopped or deleted via API ...
cursor = connection.streaming_cursor()
cursor.execute("SELECT * FROM orders_stream WHERE total > %s", (1000,))

while cursor.may_have_results:
    rows = cursor.fetchmany(10)
    if rows:
        for row in rows:
            print(row)
    else:
        time.sleep(0.1)

Parameterized Statement and Flink to Python Value Support

This driver supports all Flink types, some with caveats. Please consult the type support documentation for more details.

DB-API Extensions

This driver extends the standard DB-API v2 interface with additional features:

  • Dictionary result rows - Access columns by name instead of position
  • Streaming cursors - Non-blocking result consumption from continuous queries
  • Changelog compression - Automatic state management for aggregations and joins
  • Statement lifecycle management - Named statements, labels, and resource management
  • Type system - Full support for all Flink SQL types including streaming-specific types
  • Performance monitoring - Built-in fetch metrics and introspection

Architecture & How It Works

The confluent-sql driver communicates with Confluent Cloud Flink SQL through HTTP-based APIs. Unlike in traditional databases, statements are first-class entities on the server with their own lifecycle, allowing features like:

  • Named statements - Identify and recover queries across connections
  • Persistent execution - Statements survive connection close and can be resumed
  • Batch management - Label related statements for group operations

For an in-depth explanation of the HTTP architecture and statement lifecycle, see ARCHITECTURE.md.

Complete Documentation

For comprehensive documentation of all DB-API extensions, see DBAPI_EXTENSIONS.md.

For detailed streaming query guidance, see STREAMING.md.

For type support and examples, see TYPES.md.

Development

Setup

# Clone repository
git clone <repository-url>
cd confluent-sql

# Install uv if needed
pip install uv

# Install dependencies
uv sync

# Install in development mode
uv pip install -e .

Running Tests

Set required environment variables for integration tests. If any of the variables is not set, integration tests will be skipped.

export CONFLUENT_FLINK_API_KEY="your-key"
export CONFLUENT_FLINK_API_SECRET="your-secret"
export CONFLUENT_ENV_ID="env-123456"
export CONFLUENT_ORG_ID="org-123456"
export CONFLUENT_COMPUTE_POOL_ID="lfcp-789012"
export CONFLUENT_CLOUD_PROVIDER="aws"
export CONFLUENT_CLOUD_REGION="us-east-2"
export CONFLUENT_TEST_DBNAME="test-db"

Run tests:

uv run pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

confluent_sql-0.1.2.tar.gz (174.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

confluent_sql-0.1.2-py3-none-any.whl (69.0 kB view details)

Uploaded Python 3

File details

Details for the file confluent_sql-0.1.2.tar.gz.

File metadata

  • Download URL: confluent_sql-0.1.2.tar.gz
  • Upload date:
  • Size: 174.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for confluent_sql-0.1.2.tar.gz
Algorithm Hash digest
SHA256 a848f1cd141641a1cf790523c6227575bc6d0e6afe8fe0b051e6b765ad59ddef
MD5 830fd30bb431475676a74021ac1bd7e7
BLAKE2b-256 70b2509d6649708e41ae0a5711c7ac094c8799c1447880277291b2256a1871a9

See more details on using hashes here.

File details

Details for the file confluent_sql-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: confluent_sql-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 69.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for confluent_sql-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 901565805055f5e83bcbe6a3a3b6b2a831fcfd19d529f69554093ad6b3c9419b
MD5 e3b60afebb204b0deaa1a2c20f08d773
BLAKE2b-256 defd423cf1fdc200b104154ea4244be6b4d5151e2e4db3ad323cd8d2385fa0c0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page