Skip to main content

Confluent Apache Flink Table API Python

Project description

Confluent Apache Flink Table API Python

This package contains the client library for running Apache Flink's Table API on Confluent Cloud.

The Table API enables a programmatic way of developing, testing, and submitting Flink pipelines for processing data streams. Streams can be finite or infinite, with insert-only or changelog data. The latter allows for dealing with Change Data Capture (CDC) events.

Within the API, you conceptually work with tables that change over time - inspired by relational databases. Write a Table Program as a declarative and structured graph of data transformations. Table API is inspired by SQL and complements it with additional tools for juggling real-time data. You can mix and match Flink SQL with Table API at any time as they go hand in hand.

Table API on Confluent Cloud

Table API on Confluent Cloud is a client-side library that delegates Flink API calls to Confluent’s public REST API. It submits Statements and retrieves StatementResults.

Table programs are implemented against Flink's open source Table API for Python. This package repackages Flink's Python API and bundles the Confluent-specific components for powering the TableEnvironment without the need for a local Flink cluster. While using those packages, Flink internal components such as CatalogStore, Catalog, Planner, Executor, and configuration are managed by the plugin and fully integrate with Confluent Cloud. Including access to Apache Kafka®, Schema Registry, and Flink Compute Pools.

Note: The Table API plugin is in Open Preview stage.

Motivating Example

The following code shows how a Table API program is structured. Subsequent sections will go into more details how you can use the examples of this repository to play around with Flink on Confluent Cloud.

from confluent_pyflink.table.utils import ConfluentSettings, ConfluentTools
from confluent_pyflink.table import TableEnvironment, Row
from confluent_pyflink.table.expressions import col, row


def run():
    # Setup connection properties to Confluent Cloud
    settings = ConfluentSettings.from_global_variables()
    env = TableEnvironment.create(settings)

    # Run your first Flink statement in Table API
    env.from_elements([row("Hello world!")]).execute().print()

    # Or use SQL
    env.sql_query("SELECT 'Hello world!'").execute().print()

    # Structure your code with Table objects - the main ingredient of Table API.
    table = (
        env.from_path("examples.marketplace.clicks")
        .filter(col("user_agent").like("Mozilla%"))
        .select(col("click_id"), col("user_id"))
    )

    table.print_schema()
    print(table.explain())

    # Use the provided tools to test on a subset of the streaming data
    expected = ConfluentTools.collect_materialized_limit(table, 50)
    actual = [Row(42, 500)]
    if expected != actual:
        print("Results don't match!")


if __name__ == "__main__":
    run()

Further Examples

For further examples, please see Confluent's Apache Flink® Table API on Confluent Cloud Examples repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

confluent_pyflink-1.0.0.tar.gz (23.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

confluent_pyflink-1.0.0-py3-none-any.whl (40.0 kB view details)

Uploaded Python 3

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page