Skip to main content

A Dagster library for triggering Reverse ETL syncs in RudderStack.

Project description

dagster_rudderstack

A Dagster library for triggering Reverse ETL syncs and Profiles runs in RudderStack.

Installation

Use pip to install the library.

pip install dagster_rudderstack

Configuration

Setup RudderStack resource with your service access token.

[!NOTE] For production use cases, RudderStack recommends using a service access token instead of personal access token.

# resources.py
from dagster_rudderstack.resources.rudderstack import RudderStackRETLResource

rudderstack_retl_resource = RudderStackRETLResource(
            access_token="access_token")

RudderStackRETLResource exposes other configurable parameters as well. Mostly default values for them would be recommended.

  • rs_cloud_url: RudderStack cloud endpoint.
  • request_max_retries: The maximum number of times requests to the RudderStack API should be retried before failng.
  • request_retry_delay: Time (in seconds) to wait between each request retry.
  • request_timeout: Time (in seconds) after which the requests to RudderStack are declared timed out.
  • poll_interval: Time (in seconds) for polling status of triggered job.
  • poll_timeout: Time (in seconds) after which the polling for a triggered job is declared timed out.

Similarly if need to define ops and jobs for Profiles, can define a resource for profiles.

# resources.py
from dagster_rudderstack.resources.rudderstack import RudderStackProfilesResource

rudderstack_profiles_resource = RudderStackProfilesResource(
            access_token="access_token")

Ops and Jobs

Define ops and jobs with schedule. Provide the connection id for the sync job

# jobs.py
from dagster import job, ScheduleDefinition, ScheduleDefinition
from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig
from .resources import rudderstack_retl_resource

@job(
    resource_defs={
        "retl_resource": rudderstack_retl_resource
    }
)
def rs_retl_sync_job():
        rudderstack_sync_op()

rudderstack_sync_schedule = ScheduleDefinition(
    job=rs_retl_sync_job,
    cron_schedule="* * * * *",  # Runs every minute
    run_config={"ops": {"rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="connection_id")}},
    default_status=DefaultScheduleStatus.RUNNING
)

Similarly one can define ops for profiles job. Provide the profiles id for the profiles project to run.

from dagster_rudderstack.ops.profiles import rudderstack_profiles_op, RudderStackProfilesOpConfig
from .resources import rudderstack_profiles_resource
@job(
    resource_defs={
        "profiles_resource": rudderstack_profiles_resource
    }
)
def rs_profiles_job():
        rudderstack_profiles_op()

RudderStackProfilesOpConfig also supports passing extra parameters that can be used for profile run via profiles API

In case, one wants to define a job as sequence of ops e.g, a profile run and then reverse etl sync run. Note that, if one of the op fails, job will raise exception without running the next op. One can configure job as needed. For example, can use try/catch exception to ignore op failure and still run second op.

from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig
from dagster_rudderstack.ops.profiles import rudderstack_profiles_op, RudderStackProfilesOpConfig
from .resources import rudderstack_retl_resource, rudderstack_profiles_resource

@job(
    resource_defs={
        "profiles_resource": rudderstack_profiles_resource,
        "retl_resource": rudderstack_retl_resource
    }
)
def rs_profiles_then_retl_run():
    profiles_op = rudderstack_profiles_op()
    rudderstack_sync_op(start_after=profiles_op)

rudderstack_sync_schedule = ScheduleDefinition(
    job=rs_profiles_then_retl_run,
    cron_schedule="0 0 * * *",  # Runs day
    run_config=RunConfig(
                ops={
                    "rudderstack_profiles_op": RudderStackProfilesOpConfig(profile_id="profile_id", parameters=[<add list of optional parameters>]),
                    "rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="connection_id"),
                }
        )    
    default_status=DefaultScheduleStatus.RUNNING
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_rudderstack-1.2.1.tar.gz (16.1 kB view details)

Uploaded Source

Built Distribution

dagster_rudderstack-1.2.1-py3-none-any.whl (15.9 kB view details)

Uploaded Python 3

File details

Details for the file dagster_rudderstack-1.2.1.tar.gz.

File metadata

  • Download URL: dagster_rudderstack-1.2.1.tar.gz
  • Upload date:
  • Size: 16.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for dagster_rudderstack-1.2.1.tar.gz
Algorithm Hash digest
SHA256 96253aeb935abd218f9ddaea69709df17235d974e2a65280dcd1f8d15d62be57
MD5 c6091c36036d18f12497434d5cfa4671
BLAKE2b-256 0e3d93dd1abd23613e3796bed1ad037026e11eb9e88de83fc0a9a8afd18da056

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagster_rudderstack-1.2.1.tar.gz:

Publisher: release.yaml on rudderlabs/dagster-rudderstack

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagster_rudderstack-1.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_rudderstack-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5151627737f81b2e535afcc8e8999c0166c1aa5626d78ccfa3a5a85e8a387c88
MD5 9d568b719a4628e406291bfc3241c13d
BLAKE2b-256 5cccc41809717123a0ae5f1cd1938d86b242d6bbaa6175496ee2a575e9bdaec5

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagster_rudderstack-1.2.1-py3-none-any.whl:

Publisher: release.yaml on rudderlabs/dagster-rudderstack

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page