Skip to main content

JupyterNotebook Flink magics

Project description

Python Version License SemVer PyPI version Downloads

Streaming Jupyter Integrations

Streaming Jupyter Integrations project includes a set of magics for interactively running Flink SQL jobs in Jupyter Notebooks

Installation

In order to actually use these magics, you must install our PIP package along jupyterlab-lsp:

python3 -m pip install jupyterlab-lsp streaming-jupyter-integrations

Usage

Register in Jupyter with a running IPython in the first cell:

%load_ext streaming_jupyter_integrations.magics

Then you need to decide which execution mode and execution target to choose.

%flink_connect --execution-mode [mode] --execution-target [target]

By default, the streaming execution mode and local execution target are used.

%flink_connect

Execution mode

Currently, Flink supports two execution modes: batch and streaming. Please see Flink documentation for more details.

In order to specify execution mode, add --execution-mode parameter, for instance:

%flink_connect --execution-mode batch

Execution target

Streaming Jupyter Integrations supports 3 execution targets:

  • Local
  • Remote
  • YARN Session

Local execution target

Running Flink in local mode will start a MiniCluster in a local JVM with parallelism 1.

In order to run Flink locally, use:

%flink_connect --execution-target local

Alternatively, since the execution target is local by default, use:

%flink_connect

One can specify port of the local JobManager (8099 by default). This is useful especially if you run multiple Notebooks in a single JupyterLab.

%flink_connect --execution-target local --local-port 8123

Remote execution target

Running Flink in remote mode will connect to an existing Flink session cluster. Besides specifying --execution-target to be remote, you also need to specify --remote-hostname and --remote-port pointing to Flink Job Manager's REST API address.

%flink_connect \
    --execution-target remote \
    --remote-hostname example.com \
    --remote-port 8888

YARN session execution target

Running Flink in yarn-session mode will connect to an existing Flink session cluster running on YARN. You may specify the hostname and port of the YARN Resource Manager (--resource-manager-hostname and --resource-manager-port). If Resource Manager address is not provided, it is assumed that notebook runs on the same node as Resource Manager. You can also specify YARN applicationId (--yarn-application-id) to which the notebook will connect to. If --yarn-application-id is not specified and there is one YARN application running on the cluster, the notebook will try to connect to it. Otherwise, it will fail.

Connecting to a remote Flink session cluster running on a remote YARN cluster:

%flink_connect \
    --execution-target yarn-session \
    --resource-manager-hostname example.com \
    --resource-manager-port 8888 \
    --yarn-application-id application_1666172784500_0001

Connecting to a Flink session cluster running on a YARN cluster:

%flink_connect \
    --execution-target yarn-session \
    --yarn-application-id application_1666172784500_0001

Connecting to a Flink session cluster running on a dedicated YARN cluster:

%flink_connect --execution-target yarn-session

Variables

Magics allow for dynamic variable substitution in Flink SQL cells.

my_variable = 1
SELECT * FROM some_table WHERE product_id = {my_variable}

Moreover, you can mark sensitive variables like password so they will be read from environment variables or user input every time one runs the cell:

CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users',
   'username' = '${my_username}',
   'password' = '${my_password}'
);

%%flink_execute command

The command allows to use Python DataStream API and Table API. There are two handles exposed for each API: stream_env and table_env, respectively.

Table API example:

%%flink_execute
query = """
    SELECT   user_id, COUNT(*)
    FROM     orders
    GROUP BY user_id
"""
execution_output = table_env.execute_sql(query)

When Table API is used, the final result has to be assigned to execution_output variable.

DataStream API example:

%%flink_execute
from pyflink.common.typeinfo import Types

execution_output = stream_env.from_collection(
    collection=[(1, 'aaa'), (2, 'bb'), (3, 'cccc')],
    type_info=Types.ROW([Types.INT(), Types.STRING()])
)

When DataStream API is used, the final result has to be assigned to execution_output variable. Please note that the pipeline does not end with .execute(), the execution is triggered by the Jupyter magics under the hood.


Local development

There are currently 2 options for running streaming_jupyter_integrations for development. We can either use a Docker image or install it on our machine.

Docker image

You can build a Docker image of Jupyter Notebooks by running the command below. It will contain functionality that was developed in this project.

docker build --tag streaming_jupyter_integrations_image .

After the image is built, we can run it using this command.

docker run --name streaming_jupyter_integrations -p 8888:8888 streaming_jupyter_integrations_image

After that we should be able to reach our Jupyterhub running on Docker under: http://127.0.0.1:8888/

Local installation

Note: You will need NodeJS to build the extension package.

The jlpm command is JupyterLab's pinned version of yarn that is installed with JupyterLab. You may use yarn or npm in lieu of jlpm below. In order to use jlpm, you have to have jupyterlab installed (e.g., by brew install jupyterlab, if you use Homebrew as your package manager).

# Clone the repo to your local environment
# Change directory to the flink_sql_lsp_extension directory
# Install package in development mode
pip install -e .
# Link your development version of the extension with JupyterLab
jupyter labextension develop . --overwrite
# Rebuild extension Typescript source after making changes
jlpm build

pre-commit

The project uses pre-commit hooks to ensure code quality, mostly by linting. To use it, install pre-commit and then run

pre-commit install --install-hooks

From that moment, it will lint the files you have modified on every commit attempt.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streaming_jupyter_integrations-0.15.1.tar.gz (132.7 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file streaming_jupyter_integrations-0.15.1.tar.gz.

File metadata

File hashes

Hashes for streaming_jupyter_integrations-0.15.1.tar.gz
Algorithm Hash digest
SHA256 9603250debd8902e43c54a79b329d3eb45f9bed24dcbf1ff032a1f9f4d1245e8
MD5 9b2c470f3bc82ea8331095bee6064e43
BLAKE2b-256 3ef36efb694ee48d4345866649eec4a4b1a094f3b885a753cb1558e6c28e2609

See more details on using hashes here.

File details

Details for the file streaming_jupyter_integrations-0.15.1-py3-none-any.whl.

File metadata

File hashes

Hashes for streaming_jupyter_integrations-0.15.1-py3-none-any.whl
Algorithm Hash digest
SHA256 800a652fa323b69791a59089308d87cffe4eafdadad19a18c28bde39e93bed4e
MD5 a289a87b53d3c5247b8afdd04f2a950d
BLAKE2b-256 5cfc4cf5b6b4865f630616b96f04cdce0b4417f68198d87cef0d4c01fe91bafc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page