A dagster plugin for the DingTalk
Project description
钉钉与 Dagster 集成
介绍
该 Dagster 集成是为了更便捷的调用钉钉(DingTalk)的API,集成提供了两个 Dagster Resource。
DingTalkWebhookResource
该资源允许定义单个钉钉自定义机器人的 Webhook 端点,以便于发送文本、Markdown、Link、 ActionCard、FeedCard 消息,消息具体样式可参考 钉钉开放平台 | 自定义机器人发送消息的消息类型。
配置项:
- access_token (str): 机器人 Webhook 地址中的 access_token 值。
- secret (str, optional): 如使用加签安全配置,则需传签名密钥。默认值为 None。
- alias (str, optional):
如提供别名,可以在使用
MultiDingTalkWebhookResource中使用别名进行 webhook 选择。默认值为 None。 - base_url (str, optional): 通用地址,一般无需更改。默认值为 “https://oapi.dingtalk.com/robot/send”。
用例:
1. 使用单个资源:
from dagster_dingtalk import DingTalkWebhookResource
from dagster import op, In, OpExecutionContext, job, Definitions
@op(required_resource_keys={"dingtalk_webhook"}, ins={"text": In(str)})
def op_send_text(context:OpExecutionContext, text:str):
dingtalk_webhook:DingTalkWebhookResource = context.resources.dingtalk_webhook
dingtalk_webhook.send_text(text)
@job
def job_send_text():
op_send_text()
defs = Definitions(
jobs=[job_send_text],
resources={"dingtalk_webhook": DingTalkWebhookResource(access_token = "<access_token>", secret = "<secret>")}
)
2. 启动时动态构建企业内部应用资源, 可参考 Dagster文档 | 在启动时配置资源
from dagster_dingtalk import DingTalkWebhookResource
from dagster import op, In, OpExecutionContext, job, Definitions, schedule, RunRequest, RunConfig
@op(required_resource_keys={"dingtalk_webhook"}, ins={"text": In(str)})
def op_send_text(context:OpExecutionContext, text:str):
dingtalk_webhook:DingTalkWebhookResource = context.resources.dingtalk_webhook
dingtalk_webhook.send_text(text)
@job
def job_send_text():
op_send_text()
dingtalk_webhooks = {
"Group1" : DingTalkWebhookResource(access_token="<access_token>", secret="<secret>", alias="Group1"),
"Group2" : DingTalkWebhookResource(access_token="<access_token>", secret="<secret>", alias="Group2")
}
defs = Definitions(
jobs=[job_send_text],
resources={"dingtalk_webhook": DingTalkWebhookResource.configure_at_launch()}
)
@schedule(cron_schedule="20 9 * * *", job=job_send_text)
def schedule_user_info():
return RunRequest(run_config=RunConfig(
ops={"op_send_text": {"inputs": {"text": "This a test text."}}},
resources={"dingtalk": dingtalk_webhooks["Group1"]},
))
注意:
应该永远避免直接将密钥字符串直接配置给资源,这会导致在 dagster 前端用户界面暴露密钥。 应当从环境变量中读取密钥。你可以在代码中注册临时的环境变量,或从系统中引入环境变量。
import os
from dagster import EnvVar
from dagster_dingtalk import DingTalkWebhookResource
# 直接在代码中注册临时的环境变量
os.environ.update({'access_token_name': "<your-access_token>"})
os.environ.update({'secret_name': "<your-secret>"})
webhook = DingTalkWebhookResource(access_token=EnvVar("access_token_name"), secret=EnvVar("secret_name"))
DingTalkAppResource
该 Dagster 资源允许定义一个钉钉的 API Client,更加便捷地调用钉钉服务端企业内部应用 API
钉钉服务端 API 企业内部应用部分的第三方封装。
通过此资源,可以调用部分钉钉服务端 API。具体封装的 API 可以在 IDE 中通过引入 DingTalkAppClient 类来查看 IDE 提示:
from dagster_dingtalk import DingTalkAppClient
dingtalk: DingTalkAppClient
配置项:
- AppID (str): 应用应用唯一标识 AppID,作为缓存标识符使用。不传入则不缓存鉴权。
- AgentID (int, optional): 原企业内部应用 AgentId ,部分 API 会使用到。默认值为 None
- AppName (str, optional): 应用名。
- ClientId (str): 应用的 Client ID ,原 AppKey 和 SuiteKey
- ClientSecret (str): 应用的 Client Secret ,原 AppSecret 和 SuiteSecret
用例
1. 使用单一的企业内部应用资源。
from dagster_dingtalk import DingTalkAppResource, DingTalkAppClient
from dagster import op, In, OpExecutionContext, job, Definitions, EnvVar
@op(required_resource_keys={"dingtalk"}, ins={"user_id": In(str)})
def op_user_info(context:OpExecutionContext, user_id:str):
dingtalk:DingTalkAppClient = context.resources.dingtalk
result = dingtalk.通讯录管理.用户管理.查询用户详情(user_id).get('result')
context.log.info(result)
@job
def job_user_info():
op_user_info()
defs = Definitions(
jobs=[job_user_info],
resources={"dingtalk": DingTalkAppResource(
AppID = "<the-app-id>",
ClientId = "<the-client-id>",
ClientSecret = EnvVar("<the-client-secret-env-name>"),
)})
2. 启动时动态构建企业内部应用资源, 可参考 Dagster文档 | 在启动时配置资源
from dagster_dingtalk import DingTalkAppResource, DingTalkAppClient
from dagster import op, In, OpExecutionContext, job, Definitions, schedule, RunRequest, RunConfig, EnvVar
@op(required_resource_keys={"dingtalk"}, ins={"user_id": In(str)})
def op_user_info(context:OpExecutionContext, user_id:str):
dingtalk:DingTalkAppClient = context.resources.dingtalk
result = dingtalk.通讯录管理.用户管理.查询用户详情(user_id).get('result')
context.log.info(result)
@job
def job_user_info():
op_user_info()
dingtalk_apps = {
"App1" : DingTalkAppResource(
AppID = "<app-1-app-id>",
ClientId = "<app-1-client-id>",
ClientSecret = EnvVar("<app-1-client-secret-env-name>"),
),
"App2" : DingTalkAppResource(
AppID = "<app-2-app-id>",
ClientId = "<app-2-client-id>",
ClientSecret = EnvVar("<app-2-client-secret-env-name>"),
)
}
defs = Definitions(jobs=[job_user_info], resources={"dingtalk": DingTalkAppResource.configure_at_launch()})
@schedule(cron_schedule="20 9 * * *", job=job_user_info)
def schedule_user_info():
return RunRequest(run_config=RunConfig(
ops={"op_user_info": {"inputs": {"user_id": "<the-user-id>"}}},
resources={"dingtalk": dingtalk_apps["App1"]},
))
注意:
应该永远避免直接将密钥字符串直接配置给资源,这会导致在 dagster 前端用户界面暴露密钥。你可以在代码中注册临时的环境变量,或从系统中引入环境变量。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_dingtalk-0.1.14.tar.gz.
File metadata
- Download URL: dagster_dingtalk-0.1.14.tar.gz
- Upload date:
- Size: 10.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a59fe11fed29507bf30aa2d068bc679087d799bd291bac77a88347651f46e10a
|
|
| MD5 |
50d3109ae2b5c6ff3151f3e389d9ab23
|
|
| BLAKE2b-256 |
8e2553958d9ad486cb80ee5c518272bb0d004156f13408686c2eb11dab23e8e3
|
File details
Details for the file dagster_dingtalk-0.1.14-py3-none-any.whl.
File metadata
- Download URL: dagster_dingtalk-0.1.14-py3-none-any.whl
- Upload date:
- Size: 13.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78df0dce0d6bc22cd74e30ed5553492a50e45ae2bf0222d78df617d7788bc6b4
|
|
| MD5 |
9bc4f3cff05c0ec4bdde05f19585a4e5
|
|
| BLAKE2b-256 |
d5e969b496a64037ff66cf21fa43d2aa3bd6c414813f55dcab06d133a9c9dfb2
|