Skip to main content

SDK for authoring Improvado custom pipeline workflows on Temporal

Project description

improvado-pipeline-sdk

SDK for authoring custom data pipeline workflows on Temporal.

Provides type definitions, helper functions, and constants for building ETL/ELT pipelines that run on Improvado's Temporal infrastructure.

Install

pip install improvado-pipeline-sdk

Usage

Workflow code

import dataclasses
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from pipeline_sdk.workflow import (
        les_extract,
        DataRef,
        DateRangeRequest,
        DateRangeType,
        prepare_credentials,
        cleanup_credentials,
        CUSTOM_ACTIVITY_QUEUE,
    )


@dataclasses.dataclass(frozen=True)
class MyPipelineParams:
    connector_id: int


@workflow.defn(sandboxed=False, name="MyPipeline")
class MyPipelineWorkflow:
    @workflow.run
    async def run(self, params: MyPipelineParams) -> dict:
        result = await les_extract(
            connector_id=params.connector_id,
            data_source="facebook",
            report_type="ad_insights",
            date_range=DateRangeRequest(
                date_range_type=DateRangeType.MANUAL,
                params={"date_from": "2026-01-01", "date_to": "2026-03-31"},
            ),
        )
        return {"data_ref": result.data_ref.uri}

Activity code

from temporalio import activity
from pipeline_sdk.activity import read_pipeline_secret, get_current_tenant


@activity.defn
async def my_activity(creds) -> dict:
    tenant = get_current_tenant()
    secret = await read_pipeline_secret(creds)
    # secret.s3, secret.storage, secret.connections
    return {"tenant": tenant.agency_uuid}

Type checking

pyright my_workflow.py
ruff check my_workflow.py
ruff format my_workflow.py

Both pyright and ruff are included as dependencies.

Package structure

Module Purpose
pipeline_sdk.workflow Types and helpers for @workflow.defn code (sandbox-safe)
pipeline_sdk.activity Types and helpers for @activity.defn code (I/O)
pipeline_sdk.schedule @scheduled decorator for cron workflows
pipeline_sdk.types All shared dataclasses and enums
pipeline_sdk.credentials Credential types and Vault helpers

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

improvado_pipeline_sdk-0.1.0.tar.gz (7.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

improvado_pipeline_sdk-0.1.0-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file improvado_pipeline_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: improvado_pipeline_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 7.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for improvado_pipeline_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 052e5d08fefebef6e08c01783a66c338be620388e3c5442d06427aee95e6afbe
MD5 bf7e911a08138c3088ddf2e1e7fd3245
BLAKE2b-256 7d7a9c02395d2476a640eb8302892d9596eaeb17ad26250c78dcbc339d9f6f57

See more details on using hashes here.

File details

Details for the file improvado_pipeline_sdk-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for improvado_pipeline_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7644fef9598cfed4598ecd95739a2e5fbc1e99f817ecad5baa472fb182c75688
MD5 52bcad2d3cecc185a1a73ba2d2a9f487
BLAKE2b-256 f1fa61a84e1b93d5c090be7d7b60b279a9ceb84f9c294d474f61939b20232e1c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page