Flink SQL Job Management Website
Project description
Flink SQL Job Management Website
Display
require
- python3.6+
- flink 1.9.0+ installed (need set
FSQLFLY_FLINK_BIN_DIRin ~/.fsqlfly or in env)
ps: if you want run multi fsqlfly in one computer, you can set FSQLFLY in env , like
export FSQLFLY=/path/where/you/like
fsqlfly comand
you can generate a env template by fsqlfly echoenv [filename]
install
pip install fsqlfly
init database
fsqlfly initdb
run website
fsqlfly webserver [--jobdaemon]
ps: if you want daemon all flink sql job(need set publish and available), add --jobdaemon in commands
reset database (warning it'll delete all data)
fsqlfly resetdb
support canal consumer(load mysql log data to kafka)
require install canal v1.1.4+
pip install fsqlfly[canal]
fsqlfly runcanal [name or id]
settings
you can change by write in env file (~/.fsqlfly) or just in environment variables (eg: export name=value)
| Name | Description | Default |
|---|---|---|
| FSQLFLY_PASSWORD | admin password(if not set use a random password) | password |
| FSQLFLY_DB_URL | database connection url(if you set then other is ignore) | None |
| FSQLFLY_STATIC_ROOT | the dir of static file(if not set then it will be fsqlfly/static) | None |
| FSQLFLY_FLINK_BIN_DIR | the dir of flink bin dir | /opt/flink/bin |
| FSQLFLY_FLINK_MAX_TERMINAL | the max value of living terminal | 1000 |
| FSQLFLY_DEBUG | set web debug(if set then set True else False) | None |
| FSQLFLY_DEBUG | set web debug(if set then set True else False) | None |
| FSQLFLY_WEB_PORT | set http port | 8082 |
| FSQLFLY_FINK_HOST | flink REST api host | http://localhost:8081 |
| FSQLFLY_JOB_DAEMON_FREQUENCY | each job check damon time second | 30 |
| FSQLFLY_JOB_DAEMON_MAX_TRY_ONE_DAY | each job maximum try times in one day | 3 |
| FSQLFLY_JOB_LOG_DIR | flink job damon log file | /tmp/fsqlfly_job_log |
| FSQLFLY_UPLOAD_DIR | upload dir | ~/.fsqlfly_upload |
| FSQLFLY_SAVE_MODE_DISABLE | if set then support delete or otherwise | False |
| FSQLFLY_MAIL_ENABLE | send email or not | false |
| FSQLFLY_MAIL_HOST | smt email host | None |
| FSQLFLY_MAIL_USER | smt email user | None |
| FSQLFLY_MAIL_PASSWORD | smt email password | None |
| FSQLFLY_MAIL_RECEIVERS | smt email receivers sep by , | None |
connection url detail in FSQLFLY_DB_URL
ps: the admin token value is FSQLFLY_PASSWORD md5 hex value, you can generate it by
import hashlib
md5 = hashlib.md5()
md5.update(b'password')
token = md5.hexdigest()
if you want control all flink sql job start and stop by api, you can add token in url or header without login
API
need login by token(in request params
token)
-
jobs
url: /api/job method: get -
job control
url: /api/transform/<mode(status|start|stop|restart)>/<id or job name> method: post
Beta you can set pt in request body(json format), then will create a unique job
name for job, if you sql need other format value, we support jinja2 format
eg: insert into tablea select * from table where pt_field = {{ pt }};
you can send pt value in request body.I recommend you control daily job by airflow.
If you want kill all pt job add kill_all_pt in request json body.
PS: pt only can contain '0-9a-zA-Z_-'
PS: status api if no last_run_job_id or multi running job , api will return FAILED
PS: we will generate a dt variable in your sql template, you can set execution_date in request body(json format), eg: (2020-05-11 23:22:07 or 2020-05-11)
Airflow Support
use dag operator in
fsqlfly.airflow_plugins.FSQLFlyOperator
example:
from airflow.models import DAG
from fsqlfly.airflow_plugins import FSQLFlyOperator
dag = DAG(
dag_id='flink_hive_process',
default_args=args,
schedule_interval="2 1 * * *",
dagrun_timeout=timedelta(minutes=60),
max_active_runs=8,
concurrency=8
)
data = dict(pt="{{ ds_nodash }}")
http_conn_id = "fsqlplatform"
token = '{{ var.value.fsqlfly_token }}'
start_flink_job = FSQLFlyOperator(
task_id='fink_job',
job_name='flik_run_in_fsql_fly',
token=token,
http_conn_id=http_conn_id,
data=data,
dag=dag,
method='start', # support restart | start | stop
daemon=True,
parallelism=0, # if parallelism set not zero then will control the max running job one time
poke_interval=5,
)
token: fsqlfly token, you can real token, also you can save in variable in airflow
HOST: airflow connection id , see more in detail
data: args in flink job
if you want control connector by airflow you can use fsqlfly.airflow_plugins.FSQLFlyConnectorOperator same usage as upper.
Quick Start
- unzip Flink 1.10.0 to /opt/flink
pip install fsqlflyfsqlfly echoenv ~/.fsqlfly- change the value
FSQLFLY_FLINK_BIN_DIRin~/.fsqlflyto your flink bin dir like/opt/flink/bin fsqlfly initdbfsqlfly runwebserver- open your browser in
http://localhost:8082the password ispassword
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fsqlfly-5.0.1.tar.gz.
File metadata
- Download URL: fsqlfly-5.0.1.tar.gz
- Upload date:
- Size: 1.1 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c19d9fd995640138ed3d1fbe35c4b175ed89b022ef19d3a381e942e1db44288
|
|
| MD5 |
ed20ee65b8899fa8a09cfd791e9557a9
|
|
| BLAKE2b-256 |
e3b034e1b4f93e6ef5d57bb20e8febf58fd6fdb79cf293b2d58f88ca32b82e50
|
File details
Details for the file fsqlfly-5.0.1-py2.py3-none-any.whl.
File metadata
- Download URL: fsqlfly-5.0.1-py2.py3-none-any.whl
- Upload date:
- Size: 1.2 MB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9a65a532a86ab5bdd37976bb7377e65f000461fbfa5948b9df783b2f67cbbfcb
|
|
| MD5 |
d7ef85ed7a65d7146fd6f8f1f816d56a
|
|
| BLAKE2b-256 |
862dbc0fa957947cc97061e6231c4b3532e1f4d4d22e9f7d0adb2c392547d725
|