Skip to main content

A client library for accessing Flink SQL Gateway REST API

Project description

Flink SQL Gateway Python Client

A client library for accessing Flink SQL Gateway REST API (mostly generated by OpenAPI Generator)

Usage

First, create a client:

from flink_gateway_api import Client

client = Client(base_url="http://localhost:8083")

If the endpoints you're going to hit require authentication, use AuthenticatedClient instead:

from flink_gateway_api import AuthenticatedClient

client = AuthenticatedClient(base_url="http://localhost:80083", token="SuperSecretToken")

Now call your endpoint and use your models:

import json
import time
from flink_gateway_api import Client
from flink_gateway_api.api.default import (
   open_session,
   close_session,
   execute_statement,
   fetch_results,
)
from flink_gateway_api.models import (
   OpenSessionRequestBody,
   ExecuteStatementResponseBody,
   RowFormat,
)

with Client('http://localhost:8083') as client:
   responses = open_session.sync(client=client, body=OpenSessionRequestBody.from_dict({
      "properties": {
         "idle-timeout": "10s"
      },
      "sessionName": "test_session"
   }))
   print(f"Open session response: {responses}")

   select_result = execute_statement.sync(responses.session_handle, client=client,
                                          body=ExecuteStatementResponseBody.from_dict({
                                             "statement": "SELECT 23 as age, 'Alice Liddel' as name;",
                                          }))

   print(f"Select result: {select_result}")
   time.sleep(1)
   fetch_return = fetch_results.sync(
      responses.session_handle,
      select_result.operation_handle,
      0,
      client=client,
      row_format=RowFormat.JSON,
   )
   print(f"Fetch return: {json.dumps(fetch_return.to_dict())}")

   close_session.sync(responses.session_handle, client=client)
   print(f"Session closed")

Or do the same thing with an async version:

import json
import asyncio
from flink_gateway_api import Client
from flink_gateway_api.api.default import (
   open_session,
   close_session,
   execute_statement,
   fetch_results,
)
from flink_gateway_api.models import (
   OpenSessionRequestBody,
   ExecuteStatementResponseBody,
   RowFormat,
)

async with Client('http://localhost:8083') as client:
   responses = await open_session.asyncio(client=client, body=OpenSessionRequestBody.from_dict({
      "properties": {
         "idle-timeout": "10s"
      },
      "sessionName": "test_session"
   }))
   print(f"Open session response: {responses}")

   select_result = await execute_statement.asyncio(responses.session_handle, client=client,
                                                   body=ExecuteStatementResponseBody.from_dict({
                                                      "statement": "SELECT 23 as age, 'Alice Liddel' as name;",
                                                   }))

   print(f"Select result: {select_result}")
   await asyncio.sleep(1)  # Changed time.sleep to asyncio.sleep
   fetch_return = await fetch_results.asyncio(
      responses.session_handle,
      select_result.operation_handle,
      0,
      client=client,
      row_format=RowFormat.JSON,
   )
   print(f"Fetch return: {json.dumps(fetch_return.to_dict())}")

   await close_session.asyncio(responses.session_handle, client=client)
   print(f"Session closed")

The returned results will be like:

{
  "isQueryResult": true,
  "jobID": "a0ad286b7259d4755327ce4969a8ec97",
  "nextResultUri": "/v2/sessions/7625ad82-b23b-4118-9683-4a46b7c5022a/operations/353994b9-532d-4c0e-9258-688ec777f948/result/1?rowFormat=JSON",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "resultType": "PAYLOAD",
  "results": {
    "columns": [
      {
        "name": "age",
        "logicalType": {
          "type": "INTEGER",
          "nullable": false
        },
        "comment": null
      },
      {
        "name": "name",
        "logicalType": {
          "type": "CHAR",
          "nullable": false,
          "length": 12
        },
        "comment": null
      }
    ],
    "columnInfos": [],
    "data": [
      {
        "kind": "INSERT",
        "fields": [
          23,
          "Alice Liddel"
        ]
      }
    ],
    "fieldGetters": [],
    "rowFormat": "JSON"
  }
}

By default, when you're calling an HTTPS API it will attempt to verify that SSL is working correctly. Using certificate verification is highly recommended most of the time, but sometimes you may need to authenticate to a server (especially an internal server) using a custom certificate bundle.

client = AuthenticatedClient(
    base_url="https://internal_api.example.com", 
    token="SuperSecretToken",
    verify_ssl="/path/to/certificate_bundle.pem",
)

You can also disable certificate validation altogether, but beware that this is a security risk.

client = AuthenticatedClient(
    base_url="https://internal_api.example.com", 
    token="SuperSecretToken", 
    verify_ssl=False
)

Things to know:

  1. Every path/method combo becomes a Python module with four functions:

    1. sync: Blocking request that returns parsed data (if successful) or None
    2. sync_detailed: Blocking request that always returns a Request, optionally with parsed set if the request was successful.
    3. asyncio: Like sync but async instead of blocking
    4. asyncio_detailed: Like sync_detailed but async instead of blocking
  2. All path/query params, and bodies become method arguments.

  3. If your endpoint had any tags on it, the first tag will be used as a module name for the function (my_tag above)

  4. Any endpoint which did not have a tag will be in flink_gateway_api.api.default

Advanced customizations

There are more settings on the generated Client class which let you control more runtime behavior, check out the docstring on that class for more info. You can also customize the underlying httpx.Client or httpx.AsyncClient (depending on your use-case):

from flink_gateway_api import Client

def log_request(request):
    print(f"Request event hook: {request.method} {request.url} - Waiting for response")

def log_response(response):
    request = response.request
    print(f"Response event hook: {request.method} {request.url} - Status {response.status_code}")

client = Client(
    base_url="http://localhost:80083",
    httpx_args={"event_hooks": {"request": [log_request], "response": [log_response]}},
)

# Or get the underlying httpx client to modify directly with client.get_httpx_client() or client.get_async_httpx_client()

You can even set the httpx client directly, but beware that this will override any existing settings (e.g., base_url):

import httpx
from flink_gateway_api import Client

client = Client(
    base_url="http://localhost:80083",
)
# Note that base_url needs to be re-set, as would any shared cookies, headers, etc.
client.set_httpx_client(httpx.Client(base_url="http://localhost:80083"))

Developer

  1. Quick start
# code gen
make py_119

# test
cd flink-sql-gateway-client
pytest tests

# tag & release
git tag release-1.19.alpha20241225
git push origin release-1.19.alpha20241225

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flink_sql_gateway_api-1.19a20241224.tar.gz (24.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flink_sql_gateway_api-1.19a20241224-py3-none-any.whl (54.4 kB view details)

Uploaded Python 3

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page