Volcano Engine Flink Stream Computing Python Client CLI
Project description
Volcano Engine Flink AI Skills + volc-flink-cli
本仓库将 volc-flink-cli 与 skills 技能体系整合在一起,目标是沉淀面向 AI 场景的“火山引擎 Flink 技能项目”:
volc_flink:可执行的 CLI(同时包含 Python client),用于真实调用火山引擎 Flink APIskills/:面向 AI 的技能定义与评测脚本,用于让 AI 以“技能路由 + 工具调用”的方式完成 Flink 运维与开发任务
目录
安装
环境要求
- Python 3.7+
安装方式
方式 A:从 PyPI 安装(推荐:无代码仓库也可用)
PyPI 地址:https://pypi.org/project/volc-flink-cli/
pip install volc-flink-cli
# 验证安装
volc_flink --version
volc_flink --help
volc_flink login status
需要 SQL/CDC 模板示例时:建议直接从你的业务仓库/本地文件提供 --sql-file / --cdc-file;如果确实需要本项目示例文件,可选择克隆仓库后参考 skills/examples/(示例仅用于演示结构,发布生产任务前请替换为真实参数与连接信息)。
如需使用 conn kafka 能力:本项目已在依赖中处理 confluent-kafka 的 Python 版本差异;若你使用 Python 3.12,建议使用 confluent-kafka>=2.3.0(通常可直接安装 wheel,避免本地编译 librdkafka)。
方式 B:从源码安装(本仓库)
pip install .
如需使用 conn kafka 能力,请确保已安装 confluent-kafka(本仓库 pyproject.toml 已包含依赖;若你在 Python 3.12 环境下手动安装,建议 confluent-kafka>=2.3.0 以获得 wheel,避免本地编译 librdkafka)。
开发模式(可编辑安装):
pip install -e .
安装完成后获得命令:volc_flink。
快速开始
1)登录
volc_flink login --region cn-beijing
也支持从环境变量提供登录信息(credentials 文件缺失/无效时兜底):
export VOLCENGINE_ACCESS_KEY="xxx"
export VOLCENGINE_SECRET_KEY="yyy"
export VOLCENGINE_REGION="cn-beijing" # optional
查看当前登录与默认配置状态:
volc_flink login status
volc_flink config show
2)选定项目(推荐)
先列举项目:
volc_flink projects list
设置默认项目(之后大部分命令可省略 -p/--project 或 --project-id):
volc_flink config set-default-project --project-id YOUR_PROJECT_ID
也可以按项目名设置:
volc_flink config set-default-project --name YOUR_PROJECT_NAME
3)查看帮助
volc_flink --help
volc_flink drafts --help
volc_flink jobs --help
常用场景
草稿(Drafts)
1)列目录 / 列草稿
列草稿目录(用于创建草稿时选择目录):
volc_flink drafts dirs
创建草稿目录:
volc_flink drafts mkdir --name "/a/b/c"
删除草稿目录:
volc_flink drafts rmr --name "/a/b/c"
volc_flink drafts rmr --id "2037507470509535234"
删除草稿:
volc_flink drafts rm --id "2037518350102020097"
volc_flink drafts rm --name "my_draft"
volc_flink drafts rm --name "/a/b/c/my_draft.sql"
列草稿(用于查询草稿 ID、类型、目录等):
volc_flink drafts apps --directory "/a/b/c"
兼容命令(等价于 drafts apps):
volc_flink drafts list --directory "/a/b/c"
2)校验 SQL 草稿(Validate)
说明:Validate 不只做语法解析,也可能会触发 connector/catalog 的校验,因此失败不一定是 SQL 语法错误。
# 通过草稿 ID 校验
volc_flink drafts validate --draft-id YOUR_DRAFT_ID
# 通过草稿名称或路径校验(自动解析草稿ID)
volc_flink drafts validate --draft "my_draft"
volc_flink drafts validate --draft "/a/b/c/my_draft.sql"
3)创建草稿(SQL)
volc_flink drafts create \
--name "test-streaming-sql" \
--directory "/a/b/c" \
--job-type FLINK_STREAMING_SQL \
--engine-version FLINK_VERSION_1_17 \
--sql "SELECT 1;"
4)创建草稿(JAR:本地上传到 TOS)
说明:如果你是通过 PyPI 安装且本机没有本仓库代码,请将 --jar-path 替换为你自己的 JAR 文件路径;或自行下载一个可运行的 Flink 示例 JAR 到本地后再上传。
volc_flink drafts create \
--name "test-batch-jar" \
--directory "/a/b/c" \
--job-type FLINK_BATCH_JAR \
--engine-version FLINK_VERSION_1_20 \
--jar-path ./WordCount-flink-1-20.jar \
--main-class org.apache.flink.streaming.examples.wordcount.WordCount
5)创建草稿(CDC:YAML Pipeline)
说明:
- CDC 草稿类型为
FLINK_CDC_JAR(等价参数:--type cdc) - 当前 CDC 仅支持
FLINK_VERSION_1_16,CDC 版本默认v3.4 --directory必须是已存在的草稿目录;可用drafts dirs查询,或用drafts mkdir创建
# 1) 选择/创建草稿目录
volc_flink drafts dirs
volc_flink drafts mkdir --name "/数据开发文件夹/CDC"
# 2) 创建 CDC 草稿(推荐:文件方式,最稳妥,不受 shell 转义影响)
volc_flink drafts create \
--type cdc \
--directory "/数据开发文件夹/CDC" \
-n "mysql-to-paimon-cdc" \
--engine-version FLINK_VERSION_1_16 \
--cdc-version v3.4 \
--cdc-file ./path/to/job.yml
# 3) 发布为 Job(仅提交发布,不自动启动)
volc_flink drafts publish --draft-id YOUR_DRAFT_ID --resource-pool YOUR_RESOURCE_POOL_NAME
可选:内联 YAML(适合复制粘贴;用 heredoc 避免 ${...} 被 shell 展开):
volc_flink drafts create \
--type cdc \
--directory "/数据开发文件夹/CDC" \
-n "mysql-to-paimon-cdc" \
--engine-version FLINK_VERSION_1_16 \
--cdc-version v3.4 \
--cdc "$(cat <<'YAML'
sources:
- source:
type: mysql
hostname: ${mysql_hostname}
port: ${mysql_port}
username: ${mysql_username}
password: ${mysql_password}
tables: fin_db.*\\.balance.*
server-id: ${mysql_server_id}
schema-change.enabled: true
sink:
type: paimon-las
commit.user: ${paimon_las_commit_user}
route:
- source-table: fin_db.*\\.balance.*
sink-table: fin_db.balance
pipeline:
name: ${pipeline_name}
parallelism: ${pipeline_parallelism}
YAML
)"
MySQL CDC 额外提醒:需要用户自行上传 MySQL Driver(建议 8.0.27)到平台依赖资源中。
6)依赖与动态参数
Dependency 字段为 JSON 字符串,例如:
{"jars":["tos://flink-cwz-paimon/flink-client/resrouce/jars/WordCount-flink-1-20.jar"]}
给草稿添加依赖(支持 tos 路径或本地路径,本地会自动上传并输出 tos://... 路径):
volc_flink drafts dependency add \
--draft "/a/b/c/jar_job" \
--jar tos://YOUR_BUCKET/path/to/jars/dep1.jar \
--jar ./path/to/dep2.jar
动态参数(DynamicOptions)为 String-String KV JSON,例如:
{"parallelism.default":"4","taskmanager.numberOfTaskSlots":"4"}
设置/删除动态参数:
volc_flink drafts params show --draft "/a/b/c/jar_job"
volc_flink drafts params set --draft "/a/b/c/jar_job" --kv parallelism.default=4
volc_flink drafts params unset --draft "/a/b/c/jar_job" --key parallelism.default
7)更新与发布
更新 SQL:
volc_flink drafts update --draft "/a/b/c/sql_job" --sql-file /path/to/job.sql
发布草稿:
volc_flink drafts publish --draft "/a/b/c/sql_job" --resource-pool YOUR_RESOURCE_POOL_NAME
资源池(Resource Pools)
发布草稿(drafts publish)需要指定资源池(--resource-pool 或 --resource-pool-id)。
列举资源池:
volc_flink resource-pools list
# 指定项目(可选;不传则使用默认项目)
volc_flink resource-pools list -p "YOUR_PROJECT_NAME"
volc_flink resource-pools list --project-id "YOUR_PROJECT_ID"
查看资源池详情:
volc_flink resource-pools detail --resource-pool-id "YOUR_RESOURCE_POOL_ID"
volc_flink resource-pools detail --name "YOUR_RESOURCE_POOL_NAME"
任务(Jobs)
列举任务与详情:
volc_flink jobs list
volc_flink jobs detail --job-id YOUR_JOB_ID
查询任务运行记录/事件:
volc_flink jobs instances --job-id YOUR_JOB_ID --limit 20
volc_flink jobs events --job-id YOUR_JOB_ID --limit 50
启动/停止/重启(支持 --inspect 轮询状态):
volc_flink jobs start --job-id YOUR_JOB_ID --inspect --timeout 300
volc_flink jobs stop --job-id YOUR_JOB_ID --inspect --timeout 300
volc_flink jobs restart --job-id YOUR_JOB_ID --inspect --timeout 300
Rescale(调整规格并自动发布重启):
volc_flink jobs rescale --job-id YOUR_JOB_ID --parallelism 32 --tm-spec 4C16GB --tm-slots 4 --jm-spec 2C8GB --inspect --timeout 300
监控(Monitor)
查询事件:
volc_flink monitor events --job-id YOUR_JOB_ID --limit 50
查询日志(默认选择最新实例,支持时间范围、cursor、tail):
volc_flink monitor logs --job-id YOUR_JOB_ID --level ERROR --since "2025-01-01 12:00:00" --until "2025-01-01 13:00:00"
volc_flink monitor logs --job-id YOUR_JOB_ID --follow --interval 2
volc_flink monitor logs --job-id YOUR_JOB_ID --cursor YOUR_CURSOR
查询 FlinkUI:
volc_flink monitor flinkui url --job-id YOUR_JOB_ID
volc_flink monitor flinkui overview --job-id YOUR_JOB_ID
volc_flink monitor flinkui exceptions --job-id YOUR_JOB_ID
Catalog(元数据)
列出 Catalog 列表:
volc_flink catalog tree
展开某个 Catalog(以列表形式列出该 Catalog 下的 Database):
volc_flink catalog tree --catalog-id 45
展开某个 Database(列出 Table):
volc_flink catalog tree --catalog-id 45 --database test_db
查看详细信息:
volc_flink catalog catalogs list
volc_flink catalog catalogs show --catalog-id 45
volc_flink catalog databases show --catalog-id 45 --database test_db
volc_flink catalog tables show --catalog-id 45 --database test_db --table prds_pk1
其中 catalog tables show 默认会输出表基础信息,以及字段列表的 name、type、comment、nullable、primary-key 五列;如需排查接口原始返回,可追加 --raw。
Sessions(Session 集群)
列举 Session 集群:
volc_flink sessions list
创建 Session 集群:
volc_flink sessions create \
--name "abcdef" \
--resource-pool-id "YOUR_RESOURCE_POOL_ID" \
--vcu 10 \
--min-replica 1 \
--max-replica 10 \
--engine-version "FLINK_VERSION_1_17"
启动/停止:
volc_flink sessions start --id YOUR_SESSION_ID
volc_flink sessions stop --id YOUR_SESSION_ID
获取 FlinkUI(工具会自动获取 token 并通过网关拼接 URL):
volc_flink sessions ui --id YOUR_SESSION_ID
Files(TOS 文件)
说明:Files 模块使用 config set-tos-jar-prefix 配置的 tos://... 目录作为根目录,对其下对象进行管理。
列出文件:
volc_flink files list
volc_flink files list --prefix "sub/dir" --limit 50
上传/更新:
volc_flink files upload --file ./a.jar
volc_flink files upload --file ./a.jar --name "custom/a.jar"
volc_flink files update --name "custom/a.jar" --file ./a.jar
查看详情与获取下载链接:
volc_flink files detail --name "custom/a.jar"
volc_flink files url --name "custom/a.jar" --expires 3600
连接(Conn / Kafka)
说明:conn 与 connection 是等价命令别名,目前优先支持 Kafka 连接与消息消费能力。
新增/更新 Kafka instance(保存默认 topic/group-id,可选):
volc_flink conn kafka instance add \
--name demo_kafka \
--topic events \
--group-id test_group
为 instance 增加接入点(endpoint)。支持两种方式:
- 交互式(不传 endpoint 连接参数时会提示输入)
volc_flink conn kafka endpoint add --instance demo_kafka
- 参数式(不进入交互;至少需要
--bootstrap-servers)
volc_flink conn kafka endpoint add \
--instance demo_kafka \
--name private \
--bootstrap-servers broker1:9092,broker2:9092 \
--security-protocol PLAINTEXT
删除 instance / endpoint:
volc_flink conn kafka instance rm --instance demo_kafka
volc_flink conn kafka endpoint rm --instance demo_kafka --endpoint private
列出 Kafka instances:
volc_flink conn kafka instance
列出 Kafka endpoints:
volc_flink conn kafka endpoint
volc_flink conn kafka endpoint --instance demo_kafka
查看 Kafka endpoint 详情(默认不打印明文密码):
volc_flink conn kafka endpoint detail --instance demo_kafka --endpoint public
volc_flink conn kafka endpoint detail --instance demo_kafka --endpoint public --show-password
按时间范围消费消息:
volc_flink conn kafka messages consume \
--instance demo_kafka \
--endpoint private \
--limit 10
如果不传 --partition,默认会从所有 partition 消费;如果不传 --start,默认从最近 3 小时开始消费;如果不传 --limit,默认返回 10 条消息。
只消费单个 partition:
volc_flink conn kafka messages consume \
--partition 0 \
--instance demo_kafka \
--endpoint private \
--limit 10
指定结束时间:
volc_flink conn kafka messages consume \
--partition 0 \
--start 2024-07-01T00:00:00Z \
--end 2024-07-01T01:00:00Z \
--instance demo_kafka \
--endpoint private \
--limit 10
覆盖默认 Topic / Group:
volc_flink conn kafka messages consume \
--partition 0 \
--start 2024-07-01T00:00:00Z \
--instance demo_kafka \
--endpoint private \
--topic other_topic \
--group-id other_group \
--limit 10
输出原始 JSON:
volc_flink conn kafka messages consume \
--partition 0 \
--start 2024-07-01T00:00:00Z \
--instance demo_kafka \
--endpoint private \
--limit 10 \
--raw
调整超时时间:
volc_flink conn kafka messages consume \
--partition 0 \
--start 2024-07-01T00:00:00Z \
--instance demo_kafka \
--endpoint private \
--poll-timeout 5 \
--max-idle-polls 12 \
--offsets-timeout 30 \
--limit 10
使用别名命令:
volc_flink connection kafka messages consume \
--partition 0 \
--start 2024-07-01T00:00:00Z \
--instance demo_kafka \
--endpoint private \
--limit 10
说明:
consume使用保存的 Kafka instance 与 endpoint 建立消费者- 如存在多个 instance,建议通过
--instance显式指定 - 如一个 instance 下存在多个 endpoint,建议通过
--endpoint显式指定 --partition不传时默认从所有 partition 消费--start支持 ISO8601 时间或时间戳;不传时默认最近 3 小时--end不传时默认消费到当前时间--limit不传时默认返回 10 条消息--poll-timeout、--max-idle-polls、--offsets-timeout可用于调大超时时间- 如未配置 Kafka instance/endpoint,命令会提示先执行
conn kafka instance add与conn kafka endpoint add
AI Skills(skills)
skills/ 目录提供面向 AI 的 Flink 技能体系(统一入口 + 子技能路由 + 评测脚本),与 volc_flink CLI 配合使用:技能负责“理解/规划/路由”,CLI 负责“真实执行/查询”。
入口文档:
skills/README.md:技能体系说明与目录结构skills/SKILL.md:统一入口技能定义(路由)
配置与安全
配置文件位置
- Windows:
C:\Users\YourUsername\.volc_flink\credentials.json(兼容旧目录:.volcano_flink) - Linux/macOS:
~/.volc_flink/credentials.json(兼容旧目录:~/.volcano_flink/credentials.json)
也可通过 VOLC_FLINK_CONFIG_DIR 指定配置目录位置(便于容器/CI)。
常用环境变量(credentials 文件缺失/无效时兜底):
VOLCENGINE_ACCESS_KEY/VOLCENGINE_SECRET_KEY/VOLCENGINE_REGIONVOLCENGINE_FLINK_PROJECT/VOLCENGINE_FLINK_PROJECT_ID/VOLCENGINE_FLINK_TOS_PREFIX
安全说明
- 密钥信息存储在用户本地目录,文件权限设置为仅当前用户可读写(0600)
- 建议定期更换访问密钥
- 生产环境建议使用更安全的密钥管理方案
Python API
from volcano_flink_client.auth.auth import AuthManager
from volcano_flink_client.api import ApiClient
from volcano_flink_client.projects.projects import ProjectManager
auth_manager = AuthManager()
credentials = auth_manager.login(access_key="YOUR_AK", secret_key="YOUR_SK", region="cn-beijing")
api_client = ApiClient(credentials)
project_manager = ProjectManager(api_client)
projects = project_manager.list_projects()
for p in projects:
print(p.name, p.id)
开发说明
代码结构
skills/ # AI 技能定义与评测(与 volc_flink CLI 整合)
volcano_flink_client/
├── api.py # HTTP 请求与签名
├── auth/ # 认证管理
├── projects/ # 项目操作
├── resource_pools/ # 资源池操作
├── drafts/ # 草稿操作
├── jobs/ # 任务操作
├── catalogs/ # Catalog 元数据
└── sessions/ # Session 集群
volcano_flink_cli/ # CLI 命令实现(argparse 子命令)
volcano_flink.py # CLI 入口(volc_flink)
本地验证
python integration_test.py
常见问题
Q:如何获取 AK/SK?
进入火山引擎控制台的访问控制页面创建与管理访问密钥:https://console.volcengine.com/iam/accesskey/
Q:命令忘了怎么用?
volc_flink --help
volc_flink <module> --help
volc_flink <module> <cmd> --help
Q:登录失败怎么办?
- 确认 AK/SK 正确
- 确认网络可访问火山引擎 OpenAPI
- 确认账号具备对应权限
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file volc_flink_cli-1.2.0.tar.gz.
File metadata
- Download URL: volc_flink_cli-1.2.0.tar.gz
- Upload date:
- Size: 74.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
451faa0fd08299d2fea5e9e1eebbac76cd6e6cb50b5c7102493e3b402ac8ee20
|
|
| MD5 |
eb9c98d2b547e2a5f888b19ce5dee059
|
|
| BLAKE2b-256 |
f3ead99beeeba34a79a1a5e247af1f8b5f973f7ed1af7e6a7091c9b8a4284c74
|
File details
Details for the file volc_flink_cli-1.2.0-py3-none-any.whl.
File metadata
- Download URL: volc_flink_cli-1.2.0-py3-none-any.whl
- Upload date:
- Size: 90.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8dcf7622d347aac6c94339392d219f1ce6262fa148c8e0af6d18e7855a6ce5c8
|
|
| MD5 |
bd1362d73af5a8bbd02e8cef6b433255
|
|
| BLAKE2b-256 |
de5f771b6c58e0da9898fc3cb103f4fd3e16800cabb30e10c8878c2ef43055b1
|