Skip to main content

A dagster plugin for the DingTalk

Project description

钉钉与 Dagster 集成

该 Dagster 集成是为了更便捷的调用钉钉(DingTalk)的API,集成提供了两个 Dagster Resource。

安装

要安装库,请在现有的Dagster环境中使用pip。

pip install dagster-dingtalk

资源

DingTalkWebhookResource


该资源允许定义单个钉钉自定义机器人的 Webhook 端点,以便于发送文本、Markdown、Link、 ActionCard、FeedCard 消息,消息具体样式可参考 钉钉开放平台 | 自定义机器人发送消息的消息类型

配置:

  • access_token (str): 机器人 Webhook 地址中的 access_token 值。
  • secret (str, optional): 如使用加签安全配置,则需传签名密钥。默认值为 None。
  • alias (str, optional): 别名,仅用作标记。默认值为 None。
  • base_url (str, optional): 通用地址,一般无需更改。默认值为 “https://oapi.dingtalk.com/robot/send”。

用例 1:使用单个资源

from dagster_dingtalk import DingTalkWebhookResource
from dagster import op, In, OpExecutionContext, job, Definitions

@op(required_resource_keys={"dingtalk_webhook"}, ins={"text": In(str)})
def op_send_text(context:OpExecutionContext, text:str):
    dingtalk_webhook:DingTalkWebhookResource = context.resources.dingtalk_webhook
    dingtalk_webhook.send_text(text)

@job
def job_send_text():
    op_send_text()

defs = Definitions(
    jobs=[job_send_text],
    resources={"dingtalk_webhook": DingTalkWebhookResource(access_token = "<access_token>", secret = "<secret>")}
)

用例 2:启动时动态构建 Webhook 资源

如果你事先不确定会用到哪个 webhook 机器人,或是需要根据代码逻辑动态选择 webhook ,dagster 提供了一种 在运行时配置资源 的原生支持。以下是示例:

from dagster_dingtalk import DingTalkWebhookResource
from dagster import op, In, OpExecutionContext, job, Definitions, schedule, RunRequest, RunConfig

@op(required_resource_keys={"dingtalk_webhook"}, ins={"text": In(str)})
def op_send_text(context:OpExecutionContext, text:str):
    dingtalk_webhook:DingTalkWebhookResource = context.resources.dingtalk_webhook
    dingtalk_webhook.send_text(text)

@job
def job_send_text():
    op_send_text()

dingtalk_webhooks = {
    "Group1" : DingTalkWebhookResource(access_token="<access_token>", secret="<secret>", alias="Group1"),
    "Group2" : DingTalkWebhookResource(access_token="<access_token>", secret="<secret>", alias="Group2")
}

defs = Definitions(
    jobs=[job_send_text], 
    resources={"dingtalk_webhook": DingTalkWebhookResource.configure_at_launch()}
)

@schedule(cron_schedule="20 9 * * *", job=job_send_text)
def schedule_user_info():
    return RunRequest(run_config=RunConfig(
        ops={"op_send_text": {"inputs": {"text": "This a test text."}}},
        resources={"dingtalk": dingtalk_webhooks["Group1"]},
    ))

DingTalkAppResource


该 Dagster 资源允许定义一个可以调用 钉钉服务端 API 的 Client, 具有一些常用 HTTP API 的封装。你可以在 IDE 中通过引入 DingTalkAppClient 类来查看 IDE 提示:

from dagster_dingtalk import DingTalkAppClient

dingtalk: DingTalkAppClient

请注意:DingTalkAppClient 未使用钉钉官方 SDK 实现,并采用了 ASCII 字符来命名实例方法。

这是为了与 钉钉服务端 API 文档 里的中文 API 保持完全一致命名,以便于更符合直觉地进行调用和快速查阅文档。因此,可以按 钉钉服务端 API 文档 中的层级,通过链式调用来发起 API 请求。例如:

dingtalk.智能人事.花名册.获取花名册元数据()

配置:

  • AppID (str): 应用应用唯一标识 AppID,作为缓存标识符使用。不传入则不缓存鉴权。
  • AgentID (int, optional): 原企业内部应用 AgentId ,部分 API 会使用到。默认值为 None
  • AppName (str, optional): 应用名。
  • ClientId (str): 应用的 Client ID ,原 AppKey 和 SuiteKey
  • ClientSecret (str): 应用的 Client Secret ,原 AppSecret 和 SuiteSecret

用例 1:使用确定的企业内部应用配置资源

from dagster_dingtalk import DingTalkAppResource, DingTalkAppClient
from dagster import op, In, OpExecutionContext, job, Definitions, EnvVar

@op(required_resource_keys={"dingtalk"}, ins={"user_id": In(str)})
def op_user_info(context:OpExecutionContext, user_id:str):
    dingtalk:DingTalkAppClient = context.resources.dingtalk
    result = dingtalk.通讯录管理.用户管理.查询用户详情(user_id).get('result')
    context.log.info(result)

@job
def job_user_info():
    op_user_info()

defs = Definitions(
    jobs=[job_user_info], 
    resources={"dingtalk": DingTalkAppResource(
        AppID = "<the-app-id>", 
        ClientId = "<the-client-id>",
        ClientSecret = EnvVar("<the-client-secret-env-name>"),
    )})

用例 2:运行时动态构建企业内部应用资源

可参考 Dagster文档 | 在启动时配置资源

from dagster_dingtalk import DingTalkAppResource, DingTalkAppClient
from dagster import op, In, OpExecutionContext, job, Definitions, schedule, RunRequest, RunConfig, EnvVar

@op(required_resource_keys={"dingtalk"}, ins={"user_id": In(str)})
def op_user_info(context:OpExecutionContext, user_id:str):
    dingtalk:DingTalkAppClient = context.resources.dingtalk
    result = dingtalk.通讯录管理.用户管理.查询用户详情(user_id).get('result')
    context.log.info(result)

@job
def job_user_info():
    op_user_info()
    
dingtalk_apps = {
    "App1" : DingTalkAppResource(
        AppID = "<app-1-app-id>",
        ClientId = "<app-1-client-id>",
        ClientSecret = EnvVar("<app-1-client-secret-env-name>"),
    ),
    "App2" : DingTalkAppResource(
        AppID = "<app-2-app-id>",
        ClientId = "<app-2-client-id>",
        ClientSecret = EnvVar("<app-2-client-secret-env-name>"),
    )
}

defs = Definitions(jobs=[job_user_info], resources={"dingtalk": DingTalkAppResource.configure_at_launch()})

@schedule(cron_schedule="20 9 * * *", job=job_user_info)
def schedule_user_info():
    return RunRequest(run_config=RunConfig(
        ops={"op_user_info": {"inputs": {"user_id": "<the-user-id>"}}},
        resources={"dingtalk": dingtalk_apps["App1"]},
    ))

提醒:

应该永远避免直接将密钥字符串直接配置给资源,这会导致在 dagster 前端用户界面暴露密钥。 应当从环境变量中读取密钥。你可以在代码中注册临时的环境变量,或从系统中引入环境变量。

import os
from dagster import EnvVar
from dagster_dingtalk import DingTalkWebhookResource

# 直接在代码中注册临时的环境变量
os.environ.update({'access_token_name': "<your-access_token>"})
os.environ.update({'secret_name': "<your-secret>"})

webhook = DingTalkWebhookResource(access_token=EnvVar("access_token_name"), secret=EnvVar("secret_name"))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_dingtalk-0.1.16b2.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_dingtalk-0.1.16b2-py3-none-any.whl (16.1 kB view details)

Uploaded Python 3

File details

Details for the file dagster_dingtalk-0.1.16b2.tar.gz.

File metadata

  • Download URL: dagster_dingtalk-0.1.16b2.tar.gz
  • Upload date:
  • Size: 13.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for dagster_dingtalk-0.1.16b2.tar.gz
Algorithm Hash digest
SHA256 79901c1d7e0a24ebb02c3cbd663c26c6b331958f3ee51b9e7bbe148906bd5998
MD5 ccc2da5953ef25388ff89e154350c32a
BLAKE2b-256 af57347af442117db1b36ae9719a69b347558b42b5bfa0e879cdbb9ba2d28545

See more details on using hashes here.

File details

Details for the file dagster_dingtalk-0.1.16b2-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_dingtalk-0.1.16b2-py3-none-any.whl
Algorithm Hash digest
SHA256 c86df05e89154ec73c3a2e07e8ed2a2d69fb9701f29090c742f4931c5291e1ed
MD5 8b0b6c78158a03a4d283f67cd0351005
BLAKE2b-256 41a613fc7e34aa280c6376ff1a808528a997f10b1299c1cdb7abea837e990ba7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page