Skip to main content

Shared utilities for dlt data pipelines with multi-company support

Project description

dlt_utils

PyPI version Python 3.13+ License: MIT

Shared utilities for dlt data pipelines with multi-company support.

Features

  • PartitionedIncremental: Incremental state tracking per partition key (e.g., company_id)
  • Date utilities: Generate (year, week) and (year, month) tuples for time-based partitioning
  • Schema utilities: Ensure tables exist in destination database
  • CLI arguments: Reusable argument parser with common dlt pipeline options
  • Logging: Consistent logging configuration across pipelines
  • Pipeline utilities: Resource filtering, validation, and helper functions

Installation

# From PyPI
pip install dlt_utils

# For development
pip install -e ".[dev]"

Usage

PartitionedIncremental

Track incremental state per company (or any partition key):

import dlt
from dlt_utils import PartitionedIncremental

@dlt.resource
def sync_resource():
    state = dlt.current.resource_state()
    inc = PartitionedIncremental(
        state=state,
        state_key="sequences",
        cursor_path="sequenceNumber",
        initial_value=0,
    )

    for company_id in ["company_a", "company_b"]:
        start_seq = inc.get_last_value(company_id)
        for record in fetch_data(company_id, since=start_seq):
            inc.track(company_id, record["sequenceNumber"])
            yield record

Date utilities

Generate time periods for partitioned data extraction:

from dlt_utils import generate_year_weeks, generate_year_months

# Generate weeks from 2024 to now + 52 weeks
weeks = generate_year_weeks(start_year=2024)
# [(2024, 1), (2024, 2), ..., (2025, 52)]

# Generate months from October 2024 to February 2025
months = generate_year_months(2024, 10, 2025, 2)
# [(2024, 10), (2024, 11), (2024, 12), (2025, 1), (2025, 2)]

Schema utilities

Ensure tables exist before running pipeline:

from dlt_utils import ensure_all_tables_exist, ensure_tables_for_resources

# Create all tables from schema
ensure_all_tables_exist(pipeline)

# Create only specific resource tables (including child tables)
ensure_tables_for_resources(pipeline, ["trade_items", "organizations"])

CLI Arguments

Reusable argument parser with common dlt pipeline options:

from dlt_utils import create_base_parser, add_common_args, get_common_args

# Option 1: Use pre-configured base parser
parser = create_base_parser(description="My pipeline")
parser.add_argument("--my-custom-arg", type=int, default=10)
args = parser.parse_args()

# Option 2: Add common args to existing parser
import argparse
parser = argparse.ArgumentParser(description="My pipeline")
add_common_args(parser, default_destination="duckdb")
args = parser.parse_args()

# Extract common args into a typed dataclass
common = get_common_args(args)
print(common.debug)        # bool
print(common.destination)  # "mssql" or "duckdb"
print(common.resources)    # list[str] | None

Available arguments:

  • --debug: Enable debug logging
  • --destination: Target database (duckdb, mssql)
  • --dev-mode: Reset schema between runs
  • --resources: List of resources to load
  • --refresh: Refresh mode (drop_sources, drop_resources)
  • --list-resources: Show available resources and exit

Logging

Consistent logging configuration across pipelines:

from dlt_utils import configure_logging, get_logger

# Configure logging based on debug flag
configure_logging(debug=args.debug)

# Get a logger for your module
logger = get_logger(__name__)
logger.info("Pipeline started")

Pipeline Utilities

Helper functions for running dlt pipelines:

from dlt_utils import (
    validate_refresh_args,
    filter_resources,
    print_resources,
    get_resource_names,
    mask_sensitive_value,
)

# Validate refresh args (prevents drop_sources with resource filter)
validate_refresh_args(refresh=args.refresh, resources_filter=args.resources)

# Filter resources based on user selection
available = ["orders", "products", "customers"]
to_load = filter_resources(available, args.resources)

# Print available resources from a dlt source
if args.list_resources:
    print_resources(source)
    exit(0)

# Get resource names from a source
names = get_resource_names(source)

# Mask sensitive values for logging
api_key = "sk-abc123xyz789"
print(mask_sensitive_value(api_key))  # "sk..12..***"

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check dlt_utils/

CI/CD Pipeline

De pipeline draait automatisch bij:

  • Push naar main: Voert tests uit
  • Tag met v* prefix: Voert tests uit én publiceert naar PyPI

Pipeline Workflow

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Push/Tag      │────▶│   Test Stage    │────▶│  Publish Stage  │
│   naar repo     │     │   (altijd)      │     │  (alleen tags)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                              │                        │
                              ▼                        ▼
                        - Install deps           - Build package
                        - Run pytest             - Upload to PyPI
                        - Publish results

Nieuwe Versie Releasen

Optie 1: Via Git CLI

# 1. Zorg dat alle changes gecommit zijn
git add .
git commit -m "Release v0.2.0"

# 2. Maak een tag aan
git tag v0.2.0

# 3. Push commit én tag naar remote
git push origin main
git push origin v0.2.0

Optie 2: Via Azure DevOps

  1. Ga naar ReposTags
  2. Klik op New tag
  3. Vul in:
    • Name: v0.2.0 (moet beginnen met v)
    • Based on: selecteer de commit of branch (bijv. main)
    • Description: optioneel, bijv. "Added new feature X"
  4. Klik op Create

De pipeline wordt automatisch getriggered en publiceert naar PyPI.

Versienummering

Gebruik Semantic Versioning:

  • vMAJOR.MINOR.PATCH (bijv. v1.2.3)
  • MAJOR: Breaking changes
  • MINOR: Nieuwe features (backwards compatible)
  • PATCH: Bugfixes

⚠️ Belangrijk: Vergeet niet de versie in pyproject.toml bij te werken vóór het taggen!

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dlt_utils-0.4.0.tar.gz (20.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dlt_utils-0.4.0-py3-none-any.whl (17.0 kB view details)

Uploaded Python 3

File details

Details for the file dlt_utils-0.4.0.tar.gz.

File metadata

  • Download URL: dlt_utils-0.4.0.tar.gz
  • Upload date:
  • Size: 20.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for dlt_utils-0.4.0.tar.gz
Algorithm Hash digest
SHA256 b41986474b394d79f03655c424f9849a900afc3e14c43d9b006acb0eef281b58
MD5 1091cd233da84d2ab3d41ab0ef069be3
BLAKE2b-256 62dce0f3122524dd00d0ac74799c35069e2eb7b92157df85ad202d2731236c60

See more details on using hashes here.

File details

Details for the file dlt_utils-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: dlt_utils-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 17.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for dlt_utils-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 745a426a6a91226642e926d6e9713a49c79d1100efe749da4c2e554c63a44b54
MD5 f79ad0d1f1cdf9c9a24602590da4540f
BLAKE2b-256 8849e3d4713fc17f61e5c4b7690e83ea31448fa9223ca1e6cbfee9008c4a2f5e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page