Flink SQL Job Management Website
Project description
Flink SQL Job Management Website
Display
require
- python3.6+
- flink 1.9.0+ installed (need set
FSQLFLY_FLINK_BIN_DIR
in ~/.fsqlfly or in env)
ps: if you want run multi fsqlfly in one computer, you can set FSQLFLY
in env , like
export FSQLFLY=/path/where/you/like
fsqlfly comand
you can generate a env template by fsqlfly echoenv [filename]
install
pip install fsqlfly
init database
fsqlfly initdb
run website
fsqlfly webserver [--jobdaemon]
ps: if you want daemon all flink sql job(need set publish and available), add --jobdaemon
in commands
reset database (warning it'll delete all data)
fsqlfly resetdb
support canal consumer(load mysql log data to kafka)
require install canal v1.1.4+
pip install fsqlfly[canal]
fsqlfly canal --bootstrap_servers=localhost:9092 --canal_host=localhost --canal_port=11111 --canal_destination=demo \
--canal_username=example --canal_password=pp --canal_client_id=1232
ps: you can use fsqlfly canal -h
get more information, you can set all varies in .fsqlfly
file
eg:
canal_username=root
canal_destination=example
canal_password=password
canal_client_id=123
bootstrap_servers=hadoop-1:9092,hadoop-2:9092
canal_table_filter="database\\..*"
canal_host=localhost
if you want to use canal data in flink , support load mysql database as mysql(both) |kafka(update and create)|elasticsearch(save) resource
pip install fsqlfly[canal]
fsqlfly loadmysql --host=localhost --database=fsqlfly --category=kafka --tables=* --password=password --username=root
you can set category as (kafka,mysql,es), it will create resource automatic by database
settings
you can change by write in env file
(~/.fsqlfly) or just in environment variables (eg: export name=value
)
name express default
FSQLFLY_PASSWORD admin password(if not set use a radom password) password
FSQLFLY_DB_URL database connection url(if you set then other is ignore) None
FSQLFLY_DB_TYPE use db type support: [sqlite|mysql|postgresql] sqlite
FSQLFLY_DB_FILE sqlite db file name db.sqlite3
FSQLFLY_DATABASE database test
FSQLFLY_DB_PASSWORD database passowrd xxx
FSQLFLY_DB_USER database user root
FSQLFLY_DB_PORT database port 3306
FSQLFLY_STATIC_ROOT the dir of static file(if not set then it will be fsqlfly/static) None
FSQLFLY_FLINK_BIN_DIR the dir of flink bin dir /opt/flink/bin
FSQLFLY_FLINK_MAX_TERMINAL the max value of living terminal 1000
FSQLFLY_DEBUG set web debug(if set then set True else False) None
FSQLFLY_WEB_PORT set http port 8082
FSQLFLY_FINK_HOST flink resetful api host http://localhost:8081
FSQLFLY_JOB_DAEMON_FREQUENCY each job check damon time second 30
FSQLFLY_JOB_DAEMON_MAX_TRY_ONE_DAY each job maxium try times in one day 3
FSQLFLY_JOB_LOG_DIR flink job damon log file /tmp/fsqlfly_job_log
FSQLFLY_UPLOAD_DIR upload dir ~/.fsqlfly_upload
connection url detail in FSQLFLY_DB_URL
ps: the admin token value is FSQLFLY_PASSWORD
md5 hex value, you can generate it by
import hashlib
md5 = hashlib.md5()
md5.update(b'password')
token = md5.hexdigest()
if you want control all flink sql job start and stop by api, you can add token in url or header without login
API
need login by token(in request params
token
)
-
jobs
url: /api/job method: get
-
job control
url: /api/transform/<mode(status|start|stop|restart)>/<id or job name> method: post
Beta you can set pt
in request body(json format), then will create a unique job
name for job, if you sql need other format value, we support jinja2
format
eg: insert into tablea select * from table where pt_field = {{ pt }};
you can send pt
value in request body.I recommend you control daily job by airflow
.
If you want kill all pt
job add kill_all_pt
in request json body.
PS: pt
only can contain '0-9a-zA-Z_-'
PS: status
api if no last_run_job_id
or multi running job , api will return FAILED
Airflow Support
use dag operator in
fsqlfly.airflow_plugins.FSQLFlyOperator
example:
from airflow.models import DAG
from fsqlfly.airflow_plugins import FSQLFlyOperator
dag = DAG(
dag_id='flink_hive_process',
default_args=args,
schedule_interval="2 1 * * *",
dagrun_timeout=timedelta(minutes=60),
max_active_runs=8,
concurrency=8
)
data = dict(pt="{{ ds_nodash }}")
http_conn_id = "fsqlplatform"
token = '{{ var.value.fsqlfly_token }}'
start_flink_job = FSQLFlyOperator(
task_id='fink_job',
job_name='flik_run_in_fsql_fly',
token=token,
http_conn_id=http_conn_id,
data=data,
dag=dag,
method='start', # support restart | start | stop
daemon=True,
poke_interval=5,
)
token
: fsqlfly token, you can real token, also you can save in variable
in airflow
HOST
: airflow connection id , see more in detail
data
: args in flink job
Quick Start
- unzip Flink 1.10.0 to /opt/flink
pip install fsqlfly
fsqlfly echoenv ~/.fsqlfly
- change the value
FSQLFLY_FLINK_BIN_DIR
in~/.fsqlfly
to your flink bin dir like/opt/flink/bin
fsqlfly initdb
fsqlfly runwebserver
- open your browser in
http://localhost:8082
the password ispassword
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for fsqlfly-3.0.6-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d82e2956f08f6288aa558c06f4f66b732ae924b12defaf0cb97d3e3fb9a473d3 |
|
MD5 | 912e05b15e9a44cf15fbbae6c6c91072 |
|
BLAKE2b-256 | a09b73c3915d468c98f98210299317443d25b8d2f7d72dea1a3dbf8c21567bb4 |