Skip to main content

Flink SQL Job Management Website

Project description

Flink SQL Job Management Website

Display

require

  1. python3.6+
  2. flink 1.9.0+ installed (need set FSQLFLY_FLINK_BIN_DIR in ~/.fsqlfly or in env)

ps: if you want run multi fsqlfly in one computer, you can set FSQLFLY in env , like

export FSQLFLY=/path/where/you/like
fsqlfly comand

you can generate a env template by fsqlfly echoenv [filename]

install

pip install fsqlfly

init database

fsqlfly initdb 

run website

fsqlfly webserver [--jobdaemon]

ps: if you want daemon all flink sql job(need set publish and available), add --jobdaemon in commands

reset database (warning it'll delete all data)

fsqlfly resetdb

support canal consumer(load mysql log data to kafka)

require install canal v1.1.4+

pip install fsqlfly[canal]

fsqlfly runcanal [name or id]

settings

you can change by write in env file (~/.fsqlfly) or just in environment variables (eg: export name=value)

Name Description Default
FSQLFLY_PASSWORD admin password(if not set use a random password) password
FSQLFLY_DB_URL database connection url(if you set then other is ignore) None
FSQLFLY_STATIC_ROOT the dir of static file(if not set then it will be fsqlfly/static) None
FSQLFLY_FLINK_BIN_DIR the dir of flink bin dir /opt/flink/bin
FSQLFLY_FLINK_MAX_TERMINAL the max value of living terminal 1000
FSQLFLY_DEBUG set web debug(if set then set True else False) None
FSQLFLY_DEBUG set web debug(if set then set True else False) None
FSQLFLY_WEB_PORT set http port 8082
FSQLFLY_FINK_HOST flink REST api host http://localhost:8081
FSQLFLY_JOB_DAEMON_FREQUENCY each job check damon time second 30
FSQLFLY_JOB_DAEMON_MAX_TRY_ONE_DAY each job maximum try times in one day 3
FSQLFLY_JOB_LOG_DIR flink job damon log file /tmp/fsqlfly_job_log
FSQLFLY_UPLOAD_DIR upload dir ~/.fsqlfly_upload
FSQLFLY_SAVE_MODE_DISABLE if set then support delete or otherwise False
FSQLFLY_MAIL_ENABLE send email or not false
FSQLFLY_MAIL_HOST smt email host None
FSQLFLY_MAIL_USER smt email user None
FSQLFLY_MAIL_PASSWORD smt email password None
FSQLFLY_MAIL_RECEIVERS smt email receivers sep by , None

connection url detail in FSQLFLY_DB_URL

ps: the admin token value is FSQLFLY_PASSWORD md5 hex value, you can generate it by

    import hashlib
    md5 = hashlib.md5()
    md5.update(b'password')
    token = md5.hexdigest()

if you want control all flink sql job start and stop by api, you can add token in url or header without login

API

need login by token(in request params token)

  • jobs

      url: /api/job
      method: get
    
  • job control

    url: /api/transform/<mode(status|start|stop|restart)>/<id or job name>
    method: post
    

Beta you can set pt in request body(json format), then will create a unique job name for job, if you sql need other format value, we support jinja2 format eg: insert into tablea select * from table where pt_field = {{ pt }}; you can send pt value in request body.I recommend you control daily job by airflow. If you want kill all pt job add kill_all_pt in request json body.

PS: pt only can contain '0-9a-zA-Z_-' PS: status api if no last_run_job_id or multi running job , api will return FAILED
PS: we will generate a dt variable in your sql template, you can set execution_date in request body(json format), eg: (2020-05-11 23:22:07 or 2020-05-11)

Airflow Support

use dag operator in fsqlfly.airflow_plugins.FSQLFlyOperator

example:

from airflow.models import DAG
from fsqlfly.airflow_plugins import FSQLFlyOperator

dag = DAG(
    dag_id='flink_hive_process',
    default_args=args,
    schedule_interval="2 1 * * *",
    dagrun_timeout=timedelta(minutes=60),
    max_active_runs=8,
    concurrency=8
)

data = dict(pt="{{ ds_nodash }}")
http_conn_id = "fsqlplatform"
token = '{{ var.value.fsqlfly_token }}'
start_flink_job = FSQLFlyOperator(
    task_id='fink_job',
    job_name='flik_run_in_fsql_fly',
    token=token,
    http_conn_id=http_conn_id,
    data=data,
    dag=dag,
    method='start',  # support restart | start | stop  
    daemon=True,
    parallelism=0,   # if parallelism set not zero then will control the max running job one time
    poke_interval=5,
)

token: fsqlfly token, you can real token, also you can save in variable in airflow HOST: airflow connection id , see more in detail data: args in flink job

if you want control connector by airflow you can use fsqlfly.airflow_plugins.FSQLFlyConnectorOperator same usage as upper.

Quick Start

  1. unzip Flink 1.10.0 to /opt/flink
  2. pip install fsqlfly
  3. fsqlfly echoenv ~/.fsqlfly
  4. change the value FSQLFLY_FLINK_BIN_DIR in ~/.fsqlfly to your flink bin dir like /opt/flink/bin
  5. fsqlfly initdb
  6. fsqlfly runwebserver
  7. open your browser in http://localhost:8082 the password is password

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fsqlfly-5.0.1.tar.gz (1.1 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fsqlfly-5.0.1-py2.py3-none-any.whl (1.2 MB view details)

Uploaded Python 2Python 3

File details

Details for the file fsqlfly-5.0.1.tar.gz.

File metadata

  • Download URL: fsqlfly-5.0.1.tar.gz
  • Upload date:
  • Size: 1.1 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.10

File hashes

Hashes for fsqlfly-5.0.1.tar.gz
Algorithm Hash digest
SHA256 4c19d9fd995640138ed3d1fbe35c4b175ed89b022ef19d3a381e942e1db44288
MD5 ed20ee65b8899fa8a09cfd791e9557a9
BLAKE2b-256 e3b034e1b4f93e6ef5d57bb20e8febf58fd6fdb79cf293b2d58f88ca32b82e50

See more details on using hashes here.

File details

Details for the file fsqlfly-5.0.1-py2.py3-none-any.whl.

File metadata

  • Download URL: fsqlfly-5.0.1-py2.py3-none-any.whl
  • Upload date:
  • Size: 1.2 MB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.10

File hashes

Hashes for fsqlfly-5.0.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 9a65a532a86ab5bdd37976bb7377e65f000461fbfa5948b9df783b2f67cbbfcb
MD5 d7ef85ed7a65d7146fd6f8f1f816d56a
BLAKE2b-256 862dbc0fa957947cc97061e6231c4b3532e1f4d4d22e9f7d0adb2c392547d725

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page