Skip to main content

A lightweight Python driver for the Apache Flink SQL Gateway, implementing PEP 249 (DB-API 2.0).

Project description

py-flink-sql-gateway

CI PyPI version Python

A lightweight Python driver for the Apache Flink SQL Gateway, implementing PEP 249 (DB-API 2.0).

Installation

pip install py-flink-sql-gateway

Quick start

from flink_gateway import connect

with connect("http://localhost:8083") as conn:
    # Create a streaming source
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE orders (
                id INT NOT NULL,
                item STRING NOT NULL,
                created_at TIMESTAMP(6) NOT NULL,
                customer ROW<
                    first_name STRING NOT NULL,
                    last_name STRING NOT NULL,
                    age INT NOT NULL
                > NOT NULL,
                notes STRING
            ) WITH (
                'connector' = 'datagen',
                'rows-per-second' = '5',
                'fields.id.kind' = 'sequence',
                'fields.id.start' = '1',
                'fields.id.end' = '100',
                'fields.item.length' = '12',
                'fields.customer.first_name.length' = '8',
                'fields.customer.last_name.length' = '10',
                'fields.customer.age.min' = '21',
                'fields.customer.age.max' = '65',
                'fields.notes.length' = '12'
            )
        """)

    # Query and iterate
    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, item, created_at, customer, notes AS note
            FROM orders
        """)
        for i, row in enumerate(cur):
            id_, item, created_at, customer, note = row
            print(
                f"{id_}\t{item}\t{created_at.isoformat()}\t"
                f"{customer['first_name']} {customer['last_name']} ({customer['age']})\t"
                f"{note or ''}"
            )
            if i >= 4:
                break

Tip: ROW and MAP types arrive as Python dict, ARRAY arrives as list. Binary data is passed through as-is.

Connection options

# Pass Flink session properties
with connect("http://localhost:8083", properties={"pipeline.name": "my-job"}) as conn:
    ...

# Provide a custom HTTP client (e.g. for SSL, auth, proxies)
import httpx
client = httpx.Client(verify="/path/to/ca.crt", timeout=60.0)
with connect("https://localhost:8083", http_client=client) as conn:
    ...

Timeouts

Both timeouts are None by default — no implicit interruptions.

from flink_gateway import connect, TimeoutError

# query_timeout: hard wall-clock limit on the entire query (including streaming iteration).
# Raise TimeoutError once the deadline is exceeded, regardless of whether rows are arriving.
with connect("http://localhost:8083", query_timeout=60.0) as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM orders")
        for row in cur:
            process(row)

# idle_timeout: raise TimeoutError if no new rows arrive within the given window.
# Useful for streaming queries where silence means the source is done or stuck.
with connect("http://localhost:8083", idle_timeout=30.0) as conn:
    with conn.cursor() as cur:
        count = 0
        try:
            cur.execute("SELECT * FROM orders")
            for row in cur:
                process(row)
                count += 1
        except TimeoutError:
            print(f"no new rows for 30s, processed {count} rows so far")

# Both can be combined: stop as soon as either limit is hit.
with connect("http://localhost:8083", query_timeout=300.0, idle_timeout=30.0) as conn:
    ...

Low-level REST access

If you need features beyond DB-API, use the exported client directly:

from flink_gateway import FlinkSqlGatewayClient

with FlinkSqlGatewayClient("http://localhost:8083") as client:
    status = client.get_operation_status("session-handle", "operation-handle")
    print("current status:", status)

Type mapping

Python uses None for SQL NULLs natively — no wrapper types needed.

Flink Type Python Type
TINYINT / SMALLINT / INT int
BIGINT / INTERVAL int
FLOAT / DOUBLE float
BOOLEAN bool
CHAR / VARCHAR / STRING str
DECIMAL decimal.Decimal
DATE datetime.date
TIME datetime.time
TIMESTAMP / TIMESTAMP_LTZ datetime.datetime
BINARY / VARBINARY raw (passthrough)
ROW dict
MAP dict
ARRAY list

Nested complex types (e.g. MAP<STRING, ROW<..., MAP<STRING, ROW<TIMESTAMP>>>>) are recursively decoded to the correct Python types.


Development & tests

Requires Python 3.11+ and uv.

# Install dependencies
uv sync --group dev

# Set up pre-commit hooks (runs on every push)
pre-commit install --hook-type pre-push

# Run unit tests (no Docker needed)
uv run pytest tests/test_client.py tests/test_dbapi.py -v

# Run integration tests (requires Docker)
uv run pytest tests/test_integration.py -v -s -m integration

# Run all pre-commit checks manually
pre-commit run --all-files

Integration tests spin up a Flink cluster (JobManager + TaskManager + SQL Gateway) via testcontainers-python.


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_flink_sql_gateway-0.1.4.tar.gz (45.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_flink_sql_gateway-0.1.4-py3-none-any.whl (17.2 kB view details)

Uploaded Python 3

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page