Skip to main content

Volcano Engine Flink Stream Computing Python Client CLI

Project description

Volcano Engine Flink AI Skills + volc-flink-cli

本仓库将 volc-flink-cliskills 技能体系整合在一起,目标是沉淀面向 AI 场景的“火山引擎 Flink 技能项目”:

  • volc_flink:可执行的 CLI(同时包含 Python client),用于真实调用火山引擎 Flink API
  • skills/:面向 AI 的技能定义与评测脚本,用于让 AI 以“技能路由 + 工具调用”的方式完成 Flink 运维与开发任务

目录

安装

环境要求

  • Python 3.7+

安装方式

方式 A:从 PyPI 安装(推荐:无代码仓库也可用)

PyPI 地址:https://pypi.org/project/volc-flink-cli/

pip install volc-flink-cli

# 验证安装
volc_flink --version
volc_flink --help
volc_flink login status

需要 SQL/CDC 模板示例时:建议直接从你的业务仓库/本地文件提供 --sql-file / --cdc-file;如果确实需要本项目示例文件,可选择克隆仓库后参考 skills/examples/(示例仅用于演示结构,发布生产任务前请替换为真实参数与连接信息)。

如需使用 conn kafka 能力:本项目已在依赖中处理 confluent-kafka 的 Python 版本差异;若你使用 Python 3.12,建议使用 confluent-kafka>=2.3.0(通常可直接安装 wheel,避免本地编译 librdkafka)。

方式 B:从源码安装(本仓库)

pip install .

如需使用 conn kafka 能力,请确保已安装 confluent-kafka(本仓库 pyproject.toml 已包含依赖;若你在 Python 3.12 环境下手动安装,建议 confluent-kafka>=2.3.0 以获得 wheel,避免本地编译 librdkafka)。

开发模式(可编辑安装):

pip install -e .

安装完成后获得命令:volc_flink

快速开始

1)登录

volc_flink login --region cn-beijing

也支持从环境变量提供登录信息(credentials 文件缺失/无效时兜底):

export VOLCENGINE_ACCESS_KEY="xxx"
export VOLCENGINE_SECRET_KEY="yyy"
export VOLCENGINE_REGION="cn-beijing"  # optional

查看当前登录与默认配置状态:

volc_flink login status
volc_flink config show

2)选定项目(推荐)

先列举项目:

volc_flink projects list

设置默认项目(之后大部分命令可省略 -p/--project--project-id):

volc_flink config set-default-project --project-id YOUR_PROJECT_ID

也可以按项目名设置:

volc_flink config set-default-project --name YOUR_PROJECT_NAME

3)查看帮助

volc_flink --help
volc_flink drafts --help
volc_flink jobs --help

常用场景

草稿(Drafts)

1)列目录 / 列草稿

列草稿目录(用于创建草稿时选择目录):

volc_flink drafts dirs

创建草稿目录:

volc_flink drafts mkdir --name "/a/b/c"

删除草稿目录:

volc_flink drafts rmr --name "/a/b/c"
volc_flink drafts rmr --id "2037507470509535234"

删除草稿:

volc_flink drafts rm --id "2037518350102020097"
volc_flink drafts rm --name "my_draft"
volc_flink drafts rm --name "/a/b/c/my_draft.sql"

列草稿(用于查询草稿 ID、类型、目录等):

volc_flink drafts apps --directory "/a/b/c"

兼容命令(等价于 drafts apps):

volc_flink drafts list --directory "/a/b/c"

2)校验 SQL 草稿(Validate)

说明:Validate 不只做语法解析,也可能会触发 connector/catalog 的校验,因此失败不一定是 SQL 语法错误。

# 通过草稿 ID 校验
volc_flink drafts validate --draft-id YOUR_DRAFT_ID

# 通过草稿名称或路径校验(自动解析草稿ID)
volc_flink drafts validate --draft "my_draft"
volc_flink drafts validate --draft "/a/b/c/my_draft.sql"

3)创建草稿(SQL)

volc_flink drafts create \
  --name "test-streaming-sql" \
  --directory "/a/b/c" \
  --job-type FLINK_STREAMING_SQL \
  --engine-version FLINK_VERSION_1_17 \
  --sql "SELECT 1;"

4)创建草稿(JAR:本地上传到 TOS)

说明:如果你是通过 PyPI 安装且本机没有本仓库代码,请将 --jar-path 替换为你自己的 JAR 文件路径;或自行下载一个可运行的 Flink 示例 JAR 到本地后再上传。

volc_flink drafts create \
  --name "test-batch-jar" \
  --directory "/a/b/c" \
  --job-type FLINK_BATCH_JAR \
  --engine-version FLINK_VERSION_1_20 \
  --jar-path ./WordCount-flink-1-20.jar \
  --main-class org.apache.flink.streaming.examples.wordcount.WordCount

5)创建草稿(CDC:YAML Pipeline)

说明:

  • CDC 草稿类型为 FLINK_CDC_JAR(等价参数:--type cdc
  • 当前 CDC 仅支持 FLINK_VERSION_1_16,CDC 版本默认 v3.4
  • --directory 必须是已存在的草稿目录;可用 drafts dirs 查询,或用 drafts mkdir 创建
# 1) 选择/创建草稿目录
volc_flink drafts dirs
volc_flink drafts mkdir --name "/数据开发文件夹/CDC"

# 2) 创建 CDC 草稿(推荐:文件方式,最稳妥,不受 shell 转义影响)
volc_flink drafts create \
  --type cdc \
  --directory "/数据开发文件夹/CDC" \
  -n "mysql-to-paimon-cdc" \
  --engine-version FLINK_VERSION_1_16 \
  --cdc-version v3.4 \
  --cdc-file ./path/to/job.yml

# 3) 发布为 Job(仅提交发布,不自动启动)
volc_flink drafts publish --draft-id YOUR_DRAFT_ID --resource-pool YOUR_RESOURCE_POOL_NAME

可选:内联 YAML(适合复制粘贴;用 heredoc 避免 ${...} 被 shell 展开):

volc_flink drafts create \
  --type cdc \
  --directory "/数据开发文件夹/CDC" \
  -n "mysql-to-paimon-cdc" \
  --engine-version FLINK_VERSION_1_16 \
  --cdc-version v3.4 \
  --cdc "$(cat <<'YAML'
sources:
- source:
    type: mysql
    hostname: ${mysql_hostname}
    port: ${mysql_port}
    username: ${mysql_username}
    password: ${mysql_password}
    tables: fin_db.*\\.balance.*
    server-id: ${mysql_server_id}
    schema-change.enabled: true
sink:
  type: paimon-las
  commit.user: ${paimon_las_commit_user}
route:
- source-table: fin_db.*\\.balance.*
  sink-table: fin_db.balance
pipeline:
  name: ${pipeline_name}
  parallelism: ${pipeline_parallelism}
YAML
)"

MySQL CDC 额外提醒:需要用户自行上传 MySQL Driver(建议 8.0.27)到平台依赖资源中。

6)依赖与动态参数

Dependency 字段为 JSON 字符串,例如:

{"jars":["tos://flink-cwz-paimon/flink-client/resrouce/jars/WordCount-flink-1-20.jar"]}

给草稿添加依赖(支持 tos 路径或本地路径,本地会自动上传并输出 tos://... 路径):

volc_flink drafts dependency add \
  --draft "/a/b/c/jar_job" \
  --jar tos://YOUR_BUCKET/path/to/jars/dep1.jar \
  --jar ./path/to/dep2.jar

动态参数(DynamicOptions)为 String-String KV JSON,例如:

{"parallelism.default":"4","taskmanager.numberOfTaskSlots":"4"}

设置/删除动态参数:

volc_flink drafts params show --draft "/a/b/c/jar_job"
volc_flink drafts params set --draft "/a/b/c/jar_job" --kv parallelism.default=4
volc_flink drafts params unset --draft "/a/b/c/jar_job" --key parallelism.default

7)更新与发布

更新 SQL:

volc_flink drafts update --draft "/a/b/c/sql_job" --sql-file /path/to/job.sql

发布草稿:

volc_flink drafts publish --draft "/a/b/c/sql_job" --resource-pool YOUR_RESOURCE_POOL_NAME

资源池(Resource Pools)

发布草稿(drafts publish)需要指定资源池(--resource-pool--resource-pool-id)。

列举资源池:

volc_flink resource-pools list

# 指定项目(可选;不传则使用默认项目)
volc_flink resource-pools list -p "YOUR_PROJECT_NAME"
volc_flink resource-pools list --project-id "YOUR_PROJECT_ID"

查看资源池详情:

volc_flink resource-pools detail --resource-pool-id "YOUR_RESOURCE_POOL_ID"
volc_flink resource-pools detail --name "YOUR_RESOURCE_POOL_NAME"

任务(Jobs)

列举任务与详情:

volc_flink jobs list
volc_flink jobs detail --job-id YOUR_JOB_ID

查询任务运行记录/事件:

volc_flink jobs instances --job-id YOUR_JOB_ID --limit 20
volc_flink jobs events --job-id YOUR_JOB_ID --limit 50

启动/停止/重启(支持 --inspect 轮询状态):

volc_flink jobs start --job-id YOUR_JOB_ID --inspect --timeout 300
volc_flink jobs stop --job-id YOUR_JOB_ID --inspect --timeout 300
volc_flink jobs restart --job-id YOUR_JOB_ID --inspect --timeout 300

Rescale(调整规格并自动发布重启):

volc_flink jobs rescale --job-id YOUR_JOB_ID --parallelism 32 --tm-spec 4C16GB --tm-slots 4 --jm-spec 2C8GB --inspect --timeout 300

监控(Monitor)

查询事件:

volc_flink monitor events --job-id YOUR_JOB_ID --limit 50

查询日志(默认选择最新实例,支持时间范围、cursor、tail):

volc_flink monitor logs --job-id YOUR_JOB_ID --level ERROR --since "2025-01-01 12:00:00" --until "2025-01-01 13:00:00"
volc_flink monitor logs --job-id YOUR_JOB_ID --follow --interval 2
volc_flink monitor logs --job-id YOUR_JOB_ID --cursor YOUR_CURSOR

查询 FlinkUI:

volc_flink monitor flinkui url --job-id YOUR_JOB_ID
volc_flink monitor flinkui overview --job-id YOUR_JOB_ID
volc_flink monitor flinkui exceptions --job-id YOUR_JOB_ID

Catalog(元数据)

列出 Catalog 列表:

volc_flink catalog tree

展开某个 Catalog(以列表形式列出该 Catalog 下的 Database):

volc_flink catalog tree --catalog-id 45

展开某个 Database(列出 Table):

volc_flink catalog tree --catalog-id 45 --database test_db

查看详细信息:

volc_flink catalog catalogs list
volc_flink catalog catalogs show --catalog-id 45
volc_flink catalog databases show --catalog-id 45 --database test_db
volc_flink catalog tables show --catalog-id 45 --database test_db --table prds_pk1

其中 catalog tables show 默认会输出表基础信息,以及字段列表的 nametypecommentnullableprimary-key 五列;如需排查接口原始返回,可追加 --raw

Sessions(Session 集群)

列举 Session 集群:

volc_flink sessions list

创建 Session 集群:

volc_flink sessions create \
  --name "abcdef" \
  --resource-pool-id "YOUR_RESOURCE_POOL_ID" \
  --vcu 10 \
  --min-replica 1 \
  --max-replica 10 \
  --engine-version "FLINK_VERSION_1_17"

启动/停止:

volc_flink sessions start --id YOUR_SESSION_ID
volc_flink sessions stop --id YOUR_SESSION_ID

获取 FlinkUI(工具会自动获取 token 并通过网关拼接 URL):

volc_flink sessions ui --id YOUR_SESSION_ID

Files(TOS 文件)

说明:Files 模块使用 config set-tos-jar-prefix 配置的 tos://... 目录作为根目录,对其下对象进行管理。

列出文件:

volc_flink files list
volc_flink files list --prefix "sub/dir" --limit 50

上传/更新:

volc_flink files upload --file ./a.jar
volc_flink files upload --file ./a.jar --name "custom/a.jar"
volc_flink files update --name "custom/a.jar" --file ./a.jar

查看详情与获取下载链接:

volc_flink files detail --name "custom/a.jar"
volc_flink files url --name "custom/a.jar" --expires 3600

连接(Conn / Kafka)

说明:connconnection 是等价命令别名,目前优先支持 Kafka 连接与消息消费能力。

新增/更新 Kafka instance(保存默认 topic/group-id,可选):

volc_flink conn kafka instance add \
  --name demo_kafka \
  --topic events \
  --group-id test_group

为 instance 增加接入点(endpoint)。支持两种方式:

  1. 交互式(不传 endpoint 连接参数时会提示输入)
volc_flink conn kafka endpoint add --instance demo_kafka
  1. 参数式(不进入交互;至少需要 --bootstrap-servers
volc_flink conn kafka endpoint add \
  --instance demo_kafka \
  --name private \
  --bootstrap-servers broker1:9092,broker2:9092 \
  --security-protocol PLAINTEXT

删除 instance / endpoint:

volc_flink conn kafka instance rm --instance demo_kafka
volc_flink conn kafka endpoint rm --instance demo_kafka --endpoint private

列出 Kafka instances:

volc_flink conn kafka instance

列出 Kafka endpoints:

volc_flink conn kafka endpoint
volc_flink conn kafka endpoint --instance demo_kafka

查看 Kafka endpoint 详情(默认不打印明文密码):

volc_flink conn kafka endpoint detail --instance demo_kafka --endpoint public
volc_flink conn kafka endpoint detail --instance demo_kafka --endpoint public --show-password

按时间范围消费消息:

volc_flink conn kafka messages consume \
  --instance demo_kafka \
  --endpoint private \
  --limit 10

如果不传 --partition,默认会从所有 partition 消费;如果不传 --start,默认从最近 3 小时开始消费;如果不传 --limit,默认返回 10 条消息。

只消费单个 partition:

volc_flink conn kafka messages consume \
  --partition 0 \
  --instance demo_kafka \
  --endpoint private \
  --limit 10

指定结束时间:

volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --end 2024-07-01T01:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --limit 10

覆盖默认 Topic / Group:

volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --topic other_topic \
  --group-id other_group \
  --limit 10

输出原始 JSON:

volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --limit 10 \
  --raw

调整超时时间:

volc_flink conn kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --poll-timeout 5 \
  --max-idle-polls 12 \
  --offsets-timeout 30 \
  --limit 10

使用别名命令:

volc_flink connection kafka messages consume \
  --partition 0 \
  --start 2024-07-01T00:00:00Z \
  --instance demo_kafka \
  --endpoint private \
  --limit 10

说明:

  • consume 使用保存的 Kafka instance 与 endpoint 建立消费者
  • 如存在多个 instance,建议通过 --instance 显式指定
  • 如一个 instance 下存在多个 endpoint,建议通过 --endpoint 显式指定
  • --partition 不传时默认从所有 partition 消费
  • --start 支持 ISO8601 时间或时间戳;不传时默认最近 3 小时
  • --end 不传时默认消费到当前时间
  • --limit 不传时默认返回 10 条消息
  • --poll-timeout--max-idle-polls--offsets-timeout 可用于调大超时时间
  • 如未配置 Kafka instance/endpoint,命令会提示先执行 conn kafka instance addconn kafka endpoint add

AI Skills(skills)

skills/ 目录提供面向 AI 的 Flink 技能体系(统一入口 + 子技能路由 + 评测脚本),与 volc_flink CLI 配合使用:技能负责“理解/规划/路由”,CLI 负责“真实执行/查询”。

入口文档:

  • skills/README.md:技能体系说明与目录结构
  • skills/SKILL.md:统一入口技能定义(路由)

配置与安全

配置文件位置

  • Windows:C:\Users\YourUsername\.volc_flink\credentials.json(兼容旧目录:.volcano_flink
  • Linux/macOS:~/.volc_flink/credentials.json(兼容旧目录:~/.volcano_flink/credentials.json

也可通过 VOLC_FLINK_CONFIG_DIR 指定配置目录位置(便于容器/CI)。

常用环境变量(credentials 文件缺失/无效时兜底):

  • VOLCENGINE_ACCESS_KEY / VOLCENGINE_SECRET_KEY / VOLCENGINE_REGION
  • VOLCENGINE_FLINK_PROJECT / VOLCENGINE_FLINK_PROJECT_ID / VOLCENGINE_FLINK_TOS_PREFIX

安全说明

  • 密钥信息存储在用户本地目录,文件权限设置为仅当前用户可读写(0600)
  • 建议定期更换访问密钥
  • 生产环境建议使用更安全的密钥管理方案

Python API

from volcano_flink_client.auth.auth import AuthManager
from volcano_flink_client.api import ApiClient
from volcano_flink_client.projects.projects import ProjectManager

auth_manager = AuthManager()
credentials = auth_manager.login(access_key="YOUR_AK", secret_key="YOUR_SK", region="cn-beijing")
api_client = ApiClient(credentials)

project_manager = ProjectManager(api_client)
projects = project_manager.list_projects()
for p in projects:
    print(p.name, p.id)

开发说明

代码结构

skills/                  # AI 技能定义与评测(与 volc_flink CLI 整合)
volcano_flink_client/
├── api.py               # HTTP 请求与签名
├── auth/                # 认证管理
├── projects/            # 项目操作
├── resource_pools/      # 资源池操作
├── drafts/              # 草稿操作
├── jobs/                # 任务操作
├── catalogs/            # Catalog 元数据
└── sessions/            # Session 集群

volcano_flink_cli/       # CLI 命令实现(argparse 子命令)
volcano_flink.py         # CLI 入口(volc_flink)

本地验证

python integration_test.py

常见问题

Q:如何获取 AK/SK?

进入火山引擎控制台的访问控制页面创建与管理访问密钥:https://console.volcengine.com/iam/accesskey/

Q:命令忘了怎么用?

volc_flink --help
volc_flink <module> --help
volc_flink <module> <cmd> --help

Q:登录失败怎么办?

  • 确认 AK/SK 正确
  • 确认网络可访问火山引擎 OpenAPI
  • 确认账号具备对应权限

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

volc_flink_cli-1.2.0.tar.gz (74.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

volc_flink_cli-1.2.0-py3-none-any.whl (90.1 kB view details)

Uploaded Python 3

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page