A Dagster library for triggering Reverse ETL syncs in RudderStack.
Project description
dagster_rudderstack
A Dagster library for triggering Reverse ETL syncs and Profiles runs in RudderStack.
Installation
Use pip to install the library.
pip install dagster_rudderstack
Configuration
Setup RudderStack resource with your workspace access token.
# resources.py
from dagster_rudderstack.resources.rudderstack import RudderStackRETLResource
rudderstack_retl_resource = RudderStackRETLResource(
access_token="access_token")
RudderStackRETLResource exposes other configurable parameters as well. Mostly default values for them would be recommended.
- rs_cloud_url: RudderStack cloud endpoint.
- request_max_retries: The maximum number of times requests to the RudderStack API should be retried before failng.
- request_retry_delay: Time (in seconds) to wait between each request retry.
- request_timeout: Time (in seconds) after which the requests to RudderStack are declared timed out.
- poll_interval: Time (in seconds) for polling status of triggered job.
- poll_timeout: Time (in seconds) after which the polling for a triggered job is declared timed out.
Similarly if need to define ops and jobs for Profiles, can define a resource for profiles.
# resources.py
from dagster_rudderstack.resources.rudderstack import RudderStackProfilesResource
rudderstack_profiles_resource = RudderStackProfilesResource(
access_token="access_token")
Ops and Jobs
Define ops and jobs with schedule. Provide the connection id for the sync job
# jobs.py
from dagster import job, ScheduleDefinition, ScheduleDefinition
from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig
from .resources import rudderstack_retl_resource
@job(
resource_defs={
"retl_resource": rudderstack_retl_resource
}
)
def rs_retl_sync_job():
rudderstack_sync_op()
rudderstack_sync_schedule = ScheduleDefinition(
job=rs_retl_sync_job,
cron_schedule="* * * * *", # Runs every minute
run_config={"ops": {"rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="connection_id")}},
default_status=DefaultScheduleStatus.RUNNING
)
Similarly one can define ops for profiles job. Provide the profiles id for the profiles project to run.
from dagster_rudderstack.ops.profiles import rudderstack_profiles_op, RudderStackProfilesOpConfig
from .resources import rudderstack_profiles_resource
@job(
resource_defs={
"profiles_resource": rudderstack_profiles_resource
}
)
def rs_profiles_job():
rudderstack_profiles_op()
In case, one wants to define a job as sequence of ops e.g, a profile run and then reverse etl sync run. Note that, if one of the op fails, job will raise exception without running the next op. One can configure job as needed. For example, can use try/catch exception to ignore op failure and still run second op.
from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig
from dagster_rudderstack.ops.profiles import rudderstack_profiles_op, RudderStackProfilesOpConfig
from .resources import rudderstack_retl_resource, rudderstack_profiles_resource
@job(
resource_defs={
"profiles_resource": rudderstack_profiles_resource,
"retl_resource": rudderstack_retl_resource
}
)
def rs_profiles_then_retl_run():
profiles_op = rudderstack_profiles_op()
rudderstack_sync_op(start_after=profiles_op)
rudderstack_sync_schedule = ScheduleDefinition(
job=rs_profiles_then_retl_run,
cron_schedule="0 0 * * *", # Runs day
run_config=RunConfig(
ops={
"rudderstack_profiles_op": RudderStackProfilesOpConfig(profile_id="profile_id"),
"rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="connection_id"),
}
)
default_status=DefaultScheduleStatus.RUNNING
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file dagster_rudderstack-1.1.0.tar.gz
.
File metadata
- Download URL: dagster_rudderstack-1.1.0.tar.gz
- Upload date:
- Size: 15.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 07d853df79e9ebcc0b002bb72df649d3ff81ed79bd88c93baf38cae86f83998b |
|
MD5 | 9000be34dd1f86d89ee39215351d3553 |
|
BLAKE2b-256 | 84fb093690b5d1304827acf9673232e241961c9ec3ecb1a7f49b7a92a1457d0c |
File details
Details for the file dagster_rudderstack-1.1.0-py3-none-any.whl
.
File metadata
- Download URL: dagster_rudderstack-1.1.0-py3-none-any.whl
- Upload date:
- Size: 15.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cccc1ccdc7ea418c5cccfbe16cd22488a68e49ff58e6896f94ab9d7c4705592c |
|
MD5 | d1768db73548b691b615d3144dafd4b1 |
|
BLAKE2b-256 | 30b28c0d395ee6bf82ed0469bfae873493fd0fe9c8175f5d4d1f0c520e69e958 |