Shared utilities for dlt data pipelines with multi-company support
Project description
dlt_utils
Shared utilities for dlt data pipelines with multi-company support.
Features
- PartitionedIncremental: Incremental state tracking per partition key (e.g., company_id)
- Date utilities: Generate (year, week) and (year, month) tuples for time-based partitioning
- Schema utilities: Ensure tables exist in destination database
- CLI arguments: Reusable argument parser with common dlt pipeline options
- Logging: Consistent logging configuration across pipelines
- Pipeline utilities: Resource filtering, validation, and helper functions
Installation
# From PyPI
pip install dlt_utils
# For development
pip install -e ".[dev]"
Usage
PartitionedIncremental
Track incremental state per company (or any partition key):
import dlt
from dlt_utils import PartitionedIncremental
@dlt.resource
def sync_resource():
state = dlt.current.resource_state()
inc = PartitionedIncremental(
state=state,
state_key="sequences",
cursor_path="sequenceNumber",
initial_value=0,
)
for company_id in ["company_a", "company_b"]:
start_seq = inc.get_last_value(company_id)
for record in fetch_data(company_id, since=start_seq):
inc.track(company_id, record["sequenceNumber"])
yield record
Date utilities
Generate time periods for partitioned data extraction:
from dlt_utils import generate_year_weeks, generate_year_months
# Generate weeks from 2024 to now + 52 weeks
weeks = generate_year_weeks(start_year=2024)
# [(2024, 1), (2024, 2), ..., (2025, 52)]
# Generate months from October 2024 to February 2025
months = generate_year_months(2024, 10, 2025, 2)
# [(2024, 10), (2024, 11), (2024, 12), (2025, 1), (2025, 2)]
Schema utilities
Ensure tables exist before running pipeline:
from dlt_utils import ensure_all_tables_exist, ensure_tables_for_resources
# Create all tables from schema
ensure_all_tables_exist(pipeline)
# Create only specific resource tables (including child tables)
ensure_tables_for_resources(pipeline, ["trade_items", "organizations"])
CLI Arguments
Reusable argument parser with common dlt pipeline options:
from dlt_utils import create_base_parser, add_common_args, get_common_args
# Option 1: Use pre-configured base parser
parser = create_base_parser(description="My pipeline")
parser.add_argument("--my-custom-arg", type=int, default=10)
args = parser.parse_args()
# Option 2: Add common args to existing parser
import argparse
parser = argparse.ArgumentParser(description="My pipeline")
add_common_args(parser, default_destination="duckdb")
args = parser.parse_args()
# Extract common args into a typed dataclass
common = get_common_args(args)
print(common.debug) # bool
print(common.destination) # "mssql" or "duckdb"
print(common.resources) # list[str] | None
Available arguments:
--debug: Enable debug logging--destination: Target database (duckdb, mssql)--dev-mode: Reset schema between runs--resources: List of resources to load--refresh: Refresh mode (drop_sources, drop_resources)--list-resources: Show available resources and exit
Logging
Consistent logging configuration across pipelines:
from dlt_utils import configure_logging, get_logger
# Configure logging based on debug flag
configure_logging(debug=args.debug)
# Get a logger for your module
logger = get_logger(__name__)
logger.info("Pipeline started")
Pipeline Utilities
Helper functions for running dlt pipelines:
from dlt_utils import (
validate_refresh_args,
filter_resources,
print_resources,
get_resource_names,
mask_sensitive_value,
)
# Validate refresh args (prevents drop_sources with resource filter)
validate_refresh_args(refresh=args.refresh, resources_filter=args.resources)
# Filter resources based on user selection
available = ["orders", "products", "customers"]
to_load = filter_resources(available, args.resources)
# Print available resources from a dlt source
if args.list_resources:
print_resources(source)
exit(0)
# Get resource names from a source
names = get_resource_names(source)
# Mask sensitive values for logging
api_key = "sk-abc123xyz789"
print(mask_sensitive_value(api_key)) # "sk..12..***"
Development
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run linter
ruff check dlt_utils/
CI/CD Pipeline
De pipeline draait automatisch bij:
- Push naar
main: Voert tests uit - Tag met
v*prefix: Voert tests uit én publiceert naar PyPI
Pipeline Workflow
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Push/Tag │────▶│ Test Stage │────▶│ Publish Stage │
│ naar repo │ │ (altijd) │ │ (alleen tags) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
- Install deps - Build package
- Run pytest - Upload to PyPI
- Publish results
Nieuwe Versie Releasen
Optie 1: Via Git CLI
# 1. Zorg dat alle changes gecommit zijn
git add .
git commit -m "Release v0.2.0"
# 2. Maak een tag aan
git tag v0.2.0
# 3. Push commit én tag naar remote
git push origin main
git push origin v0.2.0
Optie 2: Via Azure DevOps
- Ga naar Repos → Tags
- Klik op New tag
- Vul in:
- Name:
v0.2.0(moet beginnen metv) - Based on: selecteer de commit of branch (bijv.
main) - Description: optioneel, bijv. "Added new feature X"
- Name:
- Klik op Create
De pipeline wordt automatisch getriggered en publiceert naar PyPI.
Versienummering
Gebruik Semantic Versioning:
vMAJOR.MINOR.PATCH(bijv.v1.2.3)- MAJOR: Breaking changes
- MINOR: Nieuwe features (backwards compatible)
- PATCH: Bugfixes
⚠️ Belangrijk: Vergeet niet de versie in
pyproject.tomlbij te werken vóór het taggen!
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dlt_utils-0.7.2.tar.gz.
File metadata
- Download URL: dlt_utils-0.7.2.tar.gz
- Upload date:
- Size: 23.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d6005c6ce4aefbe4ffe382ffa750b9b179a5077af674bc25bc90138ae320f335
|
|
| MD5 |
3a569f4ea88967cba7209eb93ecc94e7
|
|
| BLAKE2b-256 |
aeac7a5aeed9cc55e03b9d944bc8e932fc4702859943153a215e4da11944fcd0
|
File details
Details for the file dlt_utils-0.7.2-py3-none-any.whl.
File metadata
- Download URL: dlt_utils-0.7.2-py3-none-any.whl
- Upload date:
- Size: 20.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
50534eb63202b7866a67dfde2dbc1f5a3260e6bd19eb1a2254c7247582f6f782
|
|
| MD5 |
69e4035519cc76cdd47e5d51e7f42ef5
|
|
| BLAKE2b-256 |
97dcbfd4ea01fa9a3816bbcee6905782d438d468e29320eb7326dfed3250c512
|