Skip to main content

Shared utilities for dlt data pipelines with multi-company support

Project description

dlt_utils

PyPI version Python 3.13+ License: MIT

Shared utilities for dlt data pipelines with multi-company support.

Features

  • PartitionedIncremental: Incremental state tracking per partition key (e.g., company_id)
  • Date utilities: Generate (year, week) and (year, month) tuples for time-based partitioning
  • Schema utilities: Ensure tables exist in destination database
  • CLI arguments: Reusable argument parser with common dlt pipeline options
  • Logging: Consistent logging configuration across pipelines
  • Pipeline utilities: Resource filtering, validation, and helper functions

Installation

# From PyPI
pip install dlt_utils

# For development
pip install -e ".[dev]"

Usage

PartitionedIncremental

Track incremental state per company (or any partition key):

import dlt
from dlt_utils import PartitionedIncremental

@dlt.resource
def sync_resource():
    state = dlt.current.resource_state()
    inc = PartitionedIncremental(
        state=state,
        state_key="sequences",
        cursor_path="sequenceNumber",
        initial_value=0,
    )

    for company_id in ["company_a", "company_b"]:
        start_seq = inc.get_last_value(company_id)
        for record in fetch_data(company_id, since=start_seq):
            inc.track(company_id, record["sequenceNumber"])
            yield record

Date utilities

Generate time periods for partitioned data extraction:

from dlt_utils import generate_year_weeks, generate_year_months

# Generate weeks from 2024 to now + 52 weeks
weeks = generate_year_weeks(start_year=2024)
# [(2024, 1), (2024, 2), ..., (2025, 52)]

# Generate months from October 2024 to February 2025
months = generate_year_months(2024, 10, 2025, 2)
# [(2024, 10), (2024, 11), (2024, 12), (2025, 1), (2025, 2)]

Schema utilities

Ensure tables exist before running pipeline:

from dlt_utils import ensure_all_tables_exist, ensure_tables_for_resources

# Create all tables from schema
ensure_all_tables_exist(pipeline)

# Create only specific resource tables (including child tables)
ensure_tables_for_resources(pipeline, ["trade_items", "organizations"])

CLI Arguments

Reusable argument parser with common dlt pipeline options:

from dlt_utils import create_base_parser, add_common_args, get_common_args

# Option 1: Use pre-configured base parser
parser = create_base_parser(description="My pipeline")
parser.add_argument("--my-custom-arg", type=int, default=10)
args = parser.parse_args()

# Option 2: Add common args to existing parser
import argparse
parser = argparse.ArgumentParser(description="My pipeline")
add_common_args(parser, default_destination="duckdb")
args = parser.parse_args()

# Extract common args into a typed dataclass
common = get_common_args(args)
print(common.debug)        # bool
print(common.destination)  # "mssql" or "duckdb"
print(common.resources)    # list[str] | None

Available arguments:

  • --debug: Enable debug logging
  • --destination: Target database (duckdb, mssql)
  • --dev-mode: Reset schema between runs
  • --resources: List of resources to load
  • --refresh: Refresh mode (drop_sources, drop_resources)
  • --list-resources: Show available resources and exit

Logging

Consistent logging configuration across pipelines:

from dlt_utils import configure_logging, get_logger

# Configure logging based on debug flag
configure_logging(debug=args.debug)

# Get a logger for your module
logger = get_logger(__name__)
logger.info("Pipeline started")

Pipeline Utilities

Helper functions for running dlt pipelines:

from dlt_utils import (
    validate_refresh_args,
    filter_resources,
    print_resources,
    get_resource_names,
    mask_sensitive_value,
)

# Validate refresh args (prevents drop_sources with resource filter)
validate_refresh_args(refresh=args.refresh, resources_filter=args.resources)

# Filter resources based on user selection
available = ["orders", "products", "customers"]
to_load = filter_resources(available, args.resources)

# Print available resources from a dlt source
if args.list_resources:
    print_resources(source)
    exit(0)

# Get resource names from a source
names = get_resource_names(source)

# Mask sensitive values for logging
api_key = "sk-abc123xyz789"
print(mask_sensitive_value(api_key))  # "sk..12..***"

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check dlt_utils/

CI/CD Pipeline

De pipeline draait automatisch bij:

  • Push naar main: Voert tests uit
  • Tag met v* prefix: Voert tests uit én publiceert naar PyPI

Pipeline Workflow

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Push/Tag      │────▶│   Test Stage    │────▶│  Publish Stage  │
│   naar repo     │     │   (altijd)      │     │  (alleen tags)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                              │                        │
                              ▼                        ▼
                        - Install deps           - Build package
                        - Run pytest             - Upload to PyPI
                        - Publish results

Nieuwe Versie Releasen

Optie 1: Via Git CLI

# 1. Zorg dat alle changes gecommit zijn
git add .
git commit -m "Release v0.2.0"

# 2. Maak een tag aan
git tag v0.2.0

# 3. Push commit én tag naar remote
git push origin main
git push origin v0.2.0

Optie 2: Via Azure DevOps

  1. Ga naar ReposTags
  2. Klik op New tag
  3. Vul in:
    • Name: v0.2.0 (moet beginnen met v)
    • Based on: selecteer de commit of branch (bijv. main)
    • Description: optioneel, bijv. "Added new feature X"
  4. Klik op Create

De pipeline wordt automatisch getriggered en publiceert naar PyPI.

Versienummering

Gebruik Semantic Versioning:

  • vMAJOR.MINOR.PATCH (bijv. v1.2.3)
  • MAJOR: Breaking changes
  • MINOR: Nieuwe features (backwards compatible)
  • PATCH: Bugfixes

⚠️ Belangrijk: Vergeet niet de versie in pyproject.toml bij te werken vóór het taggen!

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dlt_utils-0.8.0.tar.gz (27.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dlt_utils-0.8.0-py3-none-any.whl (22.1 kB view details)

Uploaded Python 3

File details

Details for the file dlt_utils-0.8.0.tar.gz.

File metadata

  • Download URL: dlt_utils-0.8.0.tar.gz
  • Upload date:
  • Size: 27.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for dlt_utils-0.8.0.tar.gz
Algorithm Hash digest
SHA256 e9c1b43ae35c85212e5e16f14845924ecbbba570cb087c0a6737c32cf94c3d53
MD5 b33e90acb084247f82a8d75e0dcf4f3c
BLAKE2b-256 2543ee760552fbacd5e530cbb8e98ec2ba9abc211b9d1f236744cb29d7adcab1

See more details on using hashes here.

File details

Details for the file dlt_utils-0.8.0-py3-none-any.whl.

File metadata

  • Download URL: dlt_utils-0.8.0-py3-none-any.whl
  • Upload date:
  • Size: 22.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for dlt_utils-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 013a3ec03cd7e765a62ed1915113c53c3b17ca43527ca773fd4e3161cb48af70
MD5 e4621691a4c799b9eb701319fd638551
BLAKE2b-256 362f4ad2b060131cc6c01372f97c48941210357a481945320aaef2a006bdae55

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page