Skip to main content

Shared utilities for dlt data pipelines with multi-company support

Project description

dlt_utils

PyPI version Python 3.13+ License: MIT

Shared utilities for dlt data pipelines with multi-company support.

Features

  • PartitionedIncremental: Incremental state tracking per partition key (e.g., company_id)
  • Date utilities: Generate (year, week) and (year, month) tuples for time-based partitioning
  • Schema utilities: Ensure tables exist in destination database
  • CLI arguments: Reusable argument parser with common dlt pipeline options
  • Logging: Consistent logging configuration across pipelines
  • Pipeline utilities: Resource filtering, validation, and helper functions

Installation

# From PyPI
pip install dlt_utils

# For development
pip install -e ".[dev]"

Usage

PartitionedIncremental

Track incremental state per company (or any partition key):

import dlt
from dlt_utils import PartitionedIncremental

@dlt.resource
def sync_resource():
    state = dlt.current.resource_state()
    inc = PartitionedIncremental(
        state=state,
        state_key="sequences",
        cursor_path="sequenceNumber",
        initial_value=0,
    )

    for company_id in ["company_a", "company_b"]:
        start_seq = inc.get_last_value(company_id)
        for record in fetch_data(company_id, since=start_seq):
            inc.track(company_id, record["sequenceNumber"])
            yield record

Date utilities

Generate time periods for partitioned data extraction:

from dlt_utils import generate_year_weeks, generate_year_months

# Generate weeks from 2024 to now + 52 weeks
weeks = generate_year_weeks(start_year=2024)
# [(2024, 1), (2024, 2), ..., (2025, 52)]

# Generate months from October 2024 to February 2025
months = generate_year_months(2024, 10, 2025, 2)
# [(2024, 10), (2024, 11), (2024, 12), (2025, 1), (2025, 2)]

Schema utilities

Ensure tables exist before running pipeline:

from dlt_utils import ensure_all_tables_exist, ensure_tables_for_resources

# Create all tables from schema
ensure_all_tables_exist(pipeline)

# Create only specific resource tables (including child tables)
ensure_tables_for_resources(pipeline, ["trade_items", "organizations"])

CLI Arguments

Reusable argument parser with common dlt pipeline options:

from dlt_utils import create_base_parser, add_common_args, get_common_args

# Option 1: Use pre-configured base parser
parser = create_base_parser(description="My pipeline")
parser.add_argument("--my-custom-arg", type=int, default=10)
args = parser.parse_args()

# Option 2: Add common args to existing parser
import argparse
parser = argparse.ArgumentParser(description="My pipeline")
add_common_args(parser, default_destination="duckdb")
args = parser.parse_args()

# Extract common args into a typed dataclass
common = get_common_args(args)
print(common.debug)        # bool
print(common.destination)  # "mssql" or "duckdb"
print(common.resources)    # list[str] | None

Available arguments:

  • --debug: Enable debug logging
  • --destination: Target database (duckdb, mssql)
  • --dev-mode: Reset schema between runs
  • --resources: List of resources to load
  • --refresh: Refresh mode (drop_sources, drop_resources)
  • --list-resources: Show available resources and exit

Logging

Consistent logging configuration across pipelines:

from dlt_utils import configure_logging, get_logger

# Configure logging based on debug flag
configure_logging(debug=args.debug)

# Get a logger for your module
logger = get_logger(__name__)
logger.info("Pipeline started")

Pipeline Utilities

Helper functions for running dlt pipelines:

from dlt_utils import (
    validate_refresh_args,
    filter_resources,
    print_resources,
    get_resource_names,
    mask_sensitive_value,
)

# Validate refresh args (prevents drop_sources with resource filter)
validate_refresh_args(refresh=args.refresh, resources_filter=args.resources)

# Filter resources based on user selection
available = ["orders", "products", "customers"]
to_load = filter_resources(available, args.resources)

# Print available resources from a dlt source
if args.list_resources:
    print_resources(source)
    exit(0)

# Get resource names from a source
names = get_resource_names(source)

# Mask sensitive values for logging
api_key = "sk-abc123xyz789"
print(mask_sensitive_value(api_key))  # "sk..12..***"

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check dlt_utils/

CI/CD Pipeline

De pipeline draait automatisch bij:

  • Push naar main: Voert tests uit
  • Tag met v* prefix: Voert tests uit én publiceert naar PyPI

Pipeline Workflow

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Push/Tag      │────▶│   Test Stage    │────▶│  Publish Stage  │
│   naar repo     │     │   (altijd)      │     │  (alleen tags)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                              │                        │
                              ▼                        ▼
                        - Install deps           - Build package
                        - Run pytest             - Upload to PyPI
                        - Publish results

Nieuwe Versie Releasen

Optie 1: Via Git CLI

# 1. Zorg dat alle changes gecommit zijn
git add .
git commit -m "Release v0.2.0"

# 2. Maak een tag aan
git tag v0.2.0

# 3. Push commit én tag naar remote
git push origin main
git push origin v0.2.0

Optie 2: Via Azure DevOps

  1. Ga naar ReposTags
  2. Klik op New tag
  3. Vul in:
    • Name: v0.2.0 (moet beginnen met v)
    • Based on: selecteer de commit of branch (bijv. main)
    • Description: optioneel, bijv. "Added new feature X"
  4. Klik op Create

De pipeline wordt automatisch getriggered en publiceert naar PyPI.

Versienummering

Gebruik Semantic Versioning:

  • vMAJOR.MINOR.PATCH (bijv. v1.2.3)
  • MAJOR: Breaking changes
  • MINOR: Nieuwe features (backwards compatible)
  • PATCH: Bugfixes

⚠️ Belangrijk: Vergeet niet de versie in pyproject.toml bij te werken vóór het taggen!

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dlt_utils-0.5.2.tar.gz (20.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dlt_utils-0.5.2-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file dlt_utils-0.5.2.tar.gz.

File metadata

  • Download URL: dlt_utils-0.5.2.tar.gz
  • Upload date:
  • Size: 20.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for dlt_utils-0.5.2.tar.gz
Algorithm Hash digest
SHA256 c173225208e0a95aceb0f977dc5ecfcc209c28679138ba91c981f16586e59521
MD5 95363e371cbb157e8b26c36962290a41
BLAKE2b-256 b890498967fd72a9b6b6178934fa80de85e84295e46cc56fcb8a443ff5856f20

See more details on using hashes here.

File details

Details for the file dlt_utils-0.5.2-py3-none-any.whl.

File metadata

  • Download URL: dlt_utils-0.5.2-py3-none-any.whl
  • Upload date:
  • Size: 17.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for dlt_utils-0.5.2-py3-none-any.whl
Algorithm Hash digest
SHA256 b12b27fb6e2dda380f5d7f1c3195b688995fd062d846b65b64074d01a4a6b4cb
MD5 13c9b8db9ae376e9811be5982e48b9b4
BLAKE2b-256 ba7fd978acfa00c25720d13f3accb680694351b6bf1a80709978b46747a5fd20

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page