SDK for authoring Improvado custom pipeline workflows on Temporal
Project description
improvado-pipeline-sdk
SDK for authoring custom data pipeline workflows on Temporal.
Provides type definitions, helper functions, and constants for building ETL/ELT pipelines that run on Improvado's Temporal infrastructure.
Install
pip install improvado-pipeline-sdk
Usage
Workflow code
import dataclasses
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from pipeline_sdk.types import (
DataRef,
DateRangeRequest,
DateRangeType,
)
from pipeline_sdk.activities import (
les_extract,
ch_load,
prepare_credentials,
cleanup_credentials,
)
@dataclasses.dataclass(frozen=True)
class MyPipelineParams:
connector_id: int
@workflow.defn(sandboxed=False, name="MyPipeline")
class MyPipelineWorkflow:
@workflow.run
async def run(self, params: MyPipelineParams) -> dict:
result = await les_extract(
connector_id=params.connector_id,
data_source="facebook",
report_type="ad_insights",
date_range=DateRangeRequest(
date_range_type=DateRangeType.MANUAL,
params={"date_from": "2026-01-01", "date_to": "2026-03-31"},
),
)
return {"data_ref": result.data_ref.uri}
Activity code
from temporalio import activity
from pipeline_sdk.runtime import (
read_pipeline_secret,
get_current_tenant,
PipelineState,
)
@activity.defn
async def my_activity(creds) -> dict:
tenant = get_current_tenant()
secret = await read_pipeline_secret(creds)
# secret.s3, secret.storage, secret.connections
return {"tenant": tenant.agency_uuid}
Worker / client setup
from pipeline_sdk.tenant import (
TenantInterceptor, # worker side
TenantClientInterceptor, # client side
TenantID,
)
Run a workflow locally against production Temporal
For inner-loop debugging — write your workflow on your laptop and run
it against production Temporal under your tenant identity. Activities
defined in your file run on the laptop; cross-queue activities such as
les_extract, ch_load, prepare_credentials are dispatched to
Improvado shared queues and execute on Improvado infra.
pip install improvado-pipeline-sdk
# Mint a Temporal JWT — ask your platform team for the runbook.
export TEMPORAL_API_URL=flow-api.tools.improvado.io:443
export TEMPORAL_AUTH_JWT=$(cat /tmp/temporal-token)
export TENANT_AGENCY_UUID=fc010783-7a09-449b-84ab-0c1660b15245
export TENANT_WORKSPACE_ID=42
# optional: TENANT_USER_ID=99 # ADR-010 user-scope tenants
# optional: TEMPORAL_TLS=false # debug only — local docker-compose
# optional: VAULT_ADDR / VAULT_MOUNT_POINT — point read_pipeline_secret at prod
One-shot mode (default) — point at a workflow file, the runner spins up a worker on the tenant queue, starts the workflow, prints the result, and exits:
python -m pipeline_sdk hello_sdk.py
python -m pipeline_sdk hello_sdk.py --params '{"label": "hi"}'
python -m pipeline_sdk ./pipelines/ --name HelloSdkWorkflow --params-file params.json
Worker-only mode — long-running worker that polls the tenant
queue tenant-{agency}-{workspace}[-{user}]. Trigger runs through
Workflow Manager API or Temporal CLI:
python -m pipeline_sdk ./pipelines/ --worker-only
# or use the dedicated entry: improvado-pipeline-run ./pipelines/ --worker-only
Ctrl-C drains and exits in both modes.
Type checking
pyright my_workflow.py
ruff check my_workflow.py
ruff format my_workflow.py
Both pyright and ruff are included as dependencies.
Package structure
| Module | Purpose |
|---|---|
pipeline_sdk.types |
Pure, Temporal-serializable data types and enums — DataRef, TenantID, Cluster, DateRangeRequest, DateRangeType, PipelineCredentials, PipelineSecret, S3Credentials, StorageCredentials, LesActivityWithS3Result, PipelineLoadResult. Safe to import from either workflow or activity code. |
pipeline_sdk.activities |
Predefined workflow-side wrappers — les_extract, ch_load, prepare_credentials, cleanup_credentials. Import inside workflow.unsafe.imports_passed_through(). Custom activities defined in your own module inherit the workflow's task queue — do not pass task_queue to workflow.execute_activity. |
pipeline_sdk.runtime |
Activity-side helpers — read_pipeline_secret, PipelineState, get_current_tenant, plus VAULT_ADDR / VAULT_MOUNT_POINT for Vault config overrides. |
pipeline_sdk.tenant |
Tenant propagation infrastructure — TenantInterceptor, TenantClientInterceptor, build_tenant_headers, plus TenantID / get_current_tenant re-exports. |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file improvado_pipeline_sdk-0.9.0.tar.gz.
File metadata
- Download URL: improvado_pipeline_sdk-0.9.0.tar.gz
- Upload date:
- Size: 38.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
81854d6c2c622464d4cc8b6ec7cb3714f3e675407d3b68f7eded4f4216609566
|
|
| MD5 |
870c22e9ebd2793502b804e96ebe5f20
|
|
| BLAKE2b-256 |
f07bc682802ab0c0eacc5c14ada459d6a7af5f5d21bf15453a6a8180335e76b6
|
File details
Details for the file improvado_pipeline_sdk-0.9.0-py3-none-any.whl.
File metadata
- Download URL: improvado_pipeline_sdk-0.9.0-py3-none-any.whl
- Upload date:
- Size: 50.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e010bf840f0563e1b2d97826921a7c0674333c672a4806e6bea0b454e706c10f
|
|
| MD5 |
2df7edae4941224e69d3049cc0aa16f7
|
|
| BLAKE2b-256 |
924ac34059d70b2fd2856ed0f25d6b55bfae48ba23547d5268adf05dc647bde1
|