Skip to main content

Shared utilities for dlt data pipelines with multi-company support

Project description

dlt_utils

PyPI version Python 3.13+ License: MIT

Shared utilities for dlt data pipelines with multi-company support.

Features

  • PartitionedIncremental: Incremental state tracking per partition key (e.g., company_id)
  • Date utilities: Generate (year, week) and (year, month) tuples for time-based partitioning
  • Schema utilities: Ensure tables exist in destination database
  • CLI arguments: Reusable argument parser with common dlt pipeline options
  • Logging: Consistent logging configuration across pipelines
  • Pipeline utilities: Resource filtering, validation, and helper functions

Installation

# From PyPI
pip install dlt_utils

# For development
pip install -e ".[dev]"

Usage

PartitionedIncremental

Track incremental state per company (or any partition key):

import dlt
from dlt_utils import PartitionedIncremental

@dlt.resource
def sync_resource():
    state = dlt.current.resource_state()
    inc = PartitionedIncremental(
        state=state,
        state_key="sequences",
        cursor_path="sequenceNumber",
        initial_value=0,
    )

    for company_id in ["company_a", "company_b"]:
        start_seq = inc.get_last_value(company_id)
        for record in fetch_data(company_id, since=start_seq):
            inc.track(company_id, record["sequenceNumber"])
            yield record

Date utilities

Generate time periods for partitioned data extraction:

from dlt_utils import generate_year_weeks, generate_year_months

# Generate weeks from 2024 to now + 52 weeks
weeks = generate_year_weeks(start_year=2024)
# [(2024, 1), (2024, 2), ..., (2025, 52)]

# Generate months from October 2024 to February 2025
months = generate_year_months(2024, 10, 2025, 2)
# [(2024, 10), (2024, 11), (2024, 12), (2025, 1), (2025, 2)]

Schema utilities

Ensure tables exist before running pipeline:

from dlt_utils import ensure_all_tables_exist, ensure_tables_for_resources

# Create all tables from schema
ensure_all_tables_exist(pipeline)

# Create only specific resource tables (including child tables)
ensure_tables_for_resources(pipeline, ["trade_items", "organizations"])

CLI Arguments

Reusable argument parser with common dlt pipeline options:

from dlt_utils import create_base_parser, add_common_args, get_common_args

# Option 1: Use pre-configured base parser
parser = create_base_parser(description="My pipeline")
parser.add_argument("--my-custom-arg", type=int, default=10)
args = parser.parse_args()

# Option 2: Add common args to existing parser
import argparse
parser = argparse.ArgumentParser(description="My pipeline")
add_common_args(parser, default_destination="duckdb")
args = parser.parse_args()

# Extract common args into a typed dataclass
common = get_common_args(args)
print(common.debug)        # bool
print(common.destination)  # "mssql" or "duckdb"
print(common.resources)    # list[str] | None

Available arguments:

  • --debug: Enable debug logging
  • --destination: Target database (duckdb, mssql)
  • --dev-mode: Reset schema between runs
  • --resources: List of resources to load
  • --refresh: Refresh mode (drop_sources, drop_resources)
  • --list-resources: Show available resources and exit

Logging

Consistent logging configuration across pipelines:

from dlt_utils import configure_logging, get_logger

# Configure logging based on debug flag
configure_logging(debug=args.debug)

# Get a logger for your module
logger = get_logger(__name__)
logger.info("Pipeline started")

Pipeline Utilities

Helper functions for running dlt pipelines:

from dlt_utils import (
    validate_refresh_args,
    filter_resources,
    print_resources,
    get_resource_names,
    mask_sensitive_value,
)

# Validate refresh args (prevents drop_sources with resource filter)
validate_refresh_args(refresh=args.refresh, resources_filter=args.resources)

# Filter resources based on user selection
available = ["orders", "products", "customers"]
to_load = filter_resources(available, args.resources)

# Print available resources from a dlt source
if args.list_resources:
    print_resources(source)
    exit(0)

# Get resource names from a source
names = get_resource_names(source)

# Mask sensitive values for logging
api_key = "sk-abc123xyz789"
print(mask_sensitive_value(api_key))  # "sk..12..***"

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check dlt_utils/

CI/CD Pipeline

De pipeline draait automatisch bij:

  • Push naar main: Voert tests uit
  • Tag met v* prefix: Voert tests uit én publiceert naar PyPI

Pipeline Workflow

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Push/Tag      │────▶│   Test Stage    │────▶│  Publish Stage  │
│   naar repo     │     │   (altijd)      │     │  (alleen tags)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                              │                        │
                              ▼                        ▼
                        - Install deps           - Build package
                        - Run pytest             - Upload to PyPI
                        - Publish results

Nieuwe Versie Releasen

Optie 1: Via Git CLI

# 1. Zorg dat alle changes gecommit zijn
git add .
git commit -m "Release v0.2.0"

# 2. Maak een tag aan
git tag v0.2.0

# 3. Push commit én tag naar remote
git push origin main
git push origin v0.2.0

Optie 2: Via Azure DevOps

  1. Ga naar ReposTags
  2. Klik op New tag
  3. Vul in:
    • Name: v0.2.0 (moet beginnen met v)
    • Based on: selecteer de commit of branch (bijv. main)
    • Description: optioneel, bijv. "Added new feature X"
  4. Klik op Create

De pipeline wordt automatisch getriggered en publiceert naar PyPI.

Versienummering

Gebruik Semantic Versioning:

  • vMAJOR.MINOR.PATCH (bijv. v1.2.3)
  • MAJOR: Breaking changes
  • MINOR: Nieuwe features (backwards compatible)
  • PATCH: Bugfixes

⚠️ Belangrijk: Vergeet niet de versie in pyproject.toml bij te werken vóór het taggen!

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dlt_utils-0.7.3.tar.gz (23.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dlt_utils-0.7.3-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file dlt_utils-0.7.3.tar.gz.

File metadata

  • Download URL: dlt_utils-0.7.3.tar.gz
  • Upload date:
  • Size: 23.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for dlt_utils-0.7.3.tar.gz
Algorithm Hash digest
SHA256 58383172a4f3d388e9b3f1fad2cb0540f5aaa5000e8f34a657510f2924ae94b2
MD5 dd4f80574ec3025c7d37ab78b487ae52
BLAKE2b-256 04d1802546c3a948c3f6149e3130bc5b20c29de059fed87bb4223e11bb96aaa1

See more details on using hashes here.

File details

Details for the file dlt_utils-0.7.3-py3-none-any.whl.

File metadata

  • Download URL: dlt_utils-0.7.3-py3-none-any.whl
  • Upload date:
  • Size: 20.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for dlt_utils-0.7.3-py3-none-any.whl
Algorithm Hash digest
SHA256 57bd58ec25037e2db7ddd9623f2ccaeefab67d54c978cb9df0e58314b763b1eb
MD5 41d294a9b7a52543da0a1376954d33f3
BLAKE2b-256 65092c8c0e531a15fec7c50bb6a0a2fc02a962d590734777699940a633757818

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page