Flink SQL Job Management Website
Project description
Flink SQL Job Management Website
Display
require
- python3.6+
- flink 1.9.0+ installed (need set
FSQLFLY_FLINK_BIN_DIR
in ~/.fsqlfly or in env)
ps: if you want run multi fsqlfly in one computer, you can set FSQLFLY
in env , like
export FSQLFLY=/path/where/you/like
fsqlfly comand
you can generate a env template by fsqlfly echoenv [filename]
install
pip install fsqlfly
init database
fsqlfly initdb
run website
fsqlfly webserver [--jobdaemon]
ps: if you want daemon all flink sql job(need set publish and available), add --jobdaemon
in commands
reset database (warning it'll delete all data)
fsqlfly resetdb
support canal consumer(load mysql log data to kafka)
require install canal v1.1.4+
pip install fsqlfly[canal]
fsqlfly runcanal [name or id]
settings
you can change by write in env file
(~/.fsqlfly) or just in environment variables (eg: export name=value
)
Name | Description | Default |
---|---|---|
FSQLFLY_PASSWORD | admin password(if not set use a random password) | password |
FSQLFLY_DB_URL | database connection url(if you set then other is ignore) | None |
FSQLFLY_STATIC_ROOT | the dir of static file(if not set then it will be fsqlfly/static) | None |
FSQLFLY_FLINK_BIN_DIR | the dir of flink bin dir | /opt/flink/bin |
FSQLFLY_FLINK_MAX_TERMINAL | the max value of living terminal | 1000 |
FSQLFLY_DEBUG | set web debug(if set then set True else False) | None |
FSQLFLY_DEBUG | set web debug(if set then set True else False) | None |
FSQLFLY_WEB_PORT | set http port | 8082 |
FSQLFLY_FINK_HOST | flink REST api host | http://localhost:8081 |
FSQLFLY_JOB_DAEMON_FREQUENCY | each job check damon time second | 30 |
FSQLFLY_JOB_DAEMON_MAX_TRY_ONE_DAY | each job maximum try times in one day | 3 |
FSQLFLY_JOB_LOG_DIR | flink job damon log file | /tmp/fsqlfly_job_log |
FSQLFLY_UPLOAD_DIR | upload dir | ~/.fsqlfly_upload |
FSQLFLY_SAVE_MODE_DISABLE | if set then support delete or otherwise | False |
FSQLFLY_MAIL_ENABLE | send email or not | false |
FSQLFLY_MAIL_HOST | smt email host | None |
FSQLFLY_MAIL_USER | smt email user | None |
FSQLFLY_MAIL_PASSWORD | smt email password | None |
FSQLFLY_MAIL_RECEIVERS | smt email receivers sep by , | None |
connection url detail in FSQLFLY_DB_URL
ps: the admin token value is FSQLFLY_PASSWORD
md5 hex value, you can generate it by
import hashlib
md5 = hashlib.md5()
md5.update(b'password')
token = md5.hexdigest()
if you want control all flink sql job start and stop by api, you can add token in url or header without login
API
need login by token(in request params
token
)
-
jobs
url: /api/job method: get
-
job control
url: /api/transform/<mode(status|start|stop|restart)>/<id or job name> method: post
Beta you can set pt
in request body(json format), then will create a unique job
name for job, if you sql need other format value, we support jinja2
format
eg: insert into tablea select * from table where pt_field = {{ pt }};
you can send pt
value in request body.I recommend you control daily job by airflow
.
If you want kill all pt
job add kill_all_pt
in request json body.
PS: pt
only can contain '0-9a-zA-Z_-'
PS: status
api if no last_run_job_id
or multi running job , api will return FAILED
PS: we will generate a dt variable in your sql template, you can set execution_date
in request body(json format), eg: (2020-05-11 23:22:07
or 2020-05-11
)
Airflow Support
use dag operator in
fsqlfly.airflow_plugins.FSQLFlyOperator
example:
from airflow.models import DAG
from fsqlfly.airflow_plugins import FSQLFlyOperator
dag = DAG(
dag_id='flink_hive_process',
default_args=args,
schedule_interval="2 1 * * *",
dagrun_timeout=timedelta(minutes=60),
max_active_runs=8,
concurrency=8
)
data = dict(pt="{{ ds_nodash }}")
http_conn_id = "fsqlplatform"
token = '{{ var.value.fsqlfly_token }}'
start_flink_job = FSQLFlyOperator(
task_id='fink_job',
job_name='flik_run_in_fsql_fly',
token=token,
http_conn_id=http_conn_id,
data=data,
dag=dag,
method='start', # support restart | start | stop
daemon=True,
parallelism=0, # if parallelism set not zero then will control the max running job one time
poke_interval=5,
)
token
: fsqlfly token, you can real token, also you can save in variable
in airflow
HOST
: airflow connection id , see more in detail
data
: args in flink job
if you want control connector
by airflow you can use fsqlfly.airflow_plugins.FSQLFlyConnectorOperator
same usage as upper.
Quick Start
- unzip Flink 1.10.0 to /opt/flink
pip install fsqlfly
fsqlfly echoenv ~/.fsqlfly
- change the value
FSQLFLY_FLINK_BIN_DIR
in~/.fsqlfly
to your flink bin dir like/opt/flink/bin
fsqlfly initdb
fsqlfly runwebserver
- open your browser in
http://localhost:8082
the password ispassword
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for fsqlfly-5.0.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9a65a532a86ab5bdd37976bb7377e65f000461fbfa5948b9df783b2f67cbbfcb |
|
MD5 | d7ef85ed7a65d7146fd6f8f1f816d56a |
|
BLAKE2b-256 | 862dbc0fa957947cc97061e6231c4b3532e1f4d4d22e9f7d0adb2c392547d725 |