Skip to main content

A Python client for the Apache Flink SQL Gateway REST API

Project description

py-flink-sql-gateway

A lightweight Python driver for the Apache Flink SQL Gateway, implementing PEP 249 (DB-API 2.0).

Installation

pip install py-flink-sql-gateway

Quick start

from flink_gateway import connect

with connect("http://localhost:8083") as conn:
    # Create a streaming source
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE orders (
                id INT NOT NULL,
                item STRING NOT NULL,
                created_at TIMESTAMP(6) NOT NULL,
                customer ROW<
                    first_name STRING NOT NULL,
                    last_name STRING NOT NULL,
                    age INT NOT NULL
                > NOT NULL,
                notes STRING
            ) WITH (
                'connector' = 'datagen',
                'rows-per-second' = '5',
                'fields.id.kind' = 'sequence',
                'fields.id.start' = '1',
                'fields.id.end' = '100',
                'fields.item.length' = '12',
                'fields.customer.first_name.length' = '8',
                'fields.customer.last_name.length' = '10',
                'fields.customer.age.min' = '21',
                'fields.customer.age.max' = '65',
                'fields.notes.length' = '12'
            )
        """)

    # Query and iterate
    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, item, created_at, customer, notes AS note
            FROM orders
        """)
        for i, row in enumerate(cur):
            id_, item, created_at, customer, note = row
            print(
                f"{id_}\t{item}\t{created_at.isoformat()}\t"
                f"{customer['first_name']} {customer['last_name']} ({customer['age']})\t"
                f"{note or ''}"
            )
            if i >= 4:
                break

Tip: ROW and MAP types arrive as Python dict, ARRAY arrives as list. Binary data is passed through as-is.

Low-level REST access

If you need features beyond DB-API, use the exported client directly:

from flink_gateway import FlinkSqlGatewayClient

with FlinkSqlGatewayClient("http://localhost:8083") as client:
    status = client.get_operation_status("session-handle", "operation-handle")
    print("current status:", status)

Type mapping

Python uses None for SQL NULLs natively — no wrapper types needed.

Flink Type Python Type
TINYINT / SMALLINT / INT int
BIGINT / INTERVAL int
FLOAT / DOUBLE float
BOOLEAN bool
CHAR / VARCHAR / STRING str
DECIMAL decimal.Decimal
DATE datetime.date
TIME datetime.time
TIMESTAMP / TIMESTAMP_LTZ datetime.datetime
BINARY / VARBINARY raw (passthrough)
ROW dict
MAP dict
ARRAY list

Development & tests

Requires Python 3.13+ and uv.

# Install dependencies
uv sync

# Run unit tests (no Docker needed)
uv run pytest tests/test_client.py tests/test_dbapi.py -v

# Run all tests including integration (requires Docker)
uv run pytest tests/ -v

Integration tests spin up a Flink cluster (JobManager + TaskManager + SQL Gateway) via testcontainers-python.


License

MIT (see LICENSE)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_flink_sql_gateway-0.1.0.dev0.tar.gz (42.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_flink_sql_gateway-0.1.0.dev0-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page