Skip to main content

HubSpot to BigQuery ETL pipeline using dlt and Dagster

Project description

DL Pipeline Connector

A data pipeline connector for ETL processes built with dlt (data load tool) and Dagster, focusing on extracting data from HubSpot CRM and loading it into BigQuery.

Features

  • HubSpot CRM data extraction with incremental loading
  • BigQuery destination support
  • Dagster orchestration integration
  • Multiple HubSpot resources: contacts, companies, deals, engagement activities, and more
  • Incremental data loading with state management
  • Automated pipeline scheduling and monitoring

Project Structure

dl-pipeline-connector/
├── src/
│   ├── pipelines/                 # ETL pipeline implementations
│   │   ├── hubspot.py            # HubSpot to BigQuery pipeline
│   │   └── __init__.py
│   ├── dagster/                   # Dagster orchestration
│   │   ├── assets.py             # Dagster assets definitions
│   │   ├── definitions.py        # Dagster definitions
│   │   ├── jobs.py               # Dagster jobs
│   │   ├── schedulers.py         # Dagster schedulers
│   │   ├── sensors.py            # Dagster sensors
│   │   └── __init__.py
│   ├── dagster_test/              # Dagster testing utilities
│   │   └── __init__.py
│   ├── constants/                 # Constants and configuration
│   │   └── urls.py               # API URLs
│   └── __init__.py
├── .github/                       # GitHub configuration
├── .venv/                         # Virtual environment
├── pyproject.toml                 # Project configuration & dependencies
├── .env.example                   # Environment variables template
├── .env                           # Environment variables (gitignored)
├── .gitignore                     # Git ignore rules
├── UV_GUIDE.md                    # UV package manager guide
└── README.md

Setup

Prerequisites

  • Python 3.10 or higher
  • uv package manager
  • HubSpot account with API access
  • Google Cloud Platform account with BigQuery enabled

Install uv

# On macOS/Linux
curl -LsSf https://astral.sh/uv/install.sh | sh

# On Windows
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"

Project Setup

  1. Clone the repository and navigate to the project directory:
cd dl-pipeline-connector
  1. Sync the project (creates venv and installs dependencies):
uv sync
  1. Install the core dependencies:
uv add "dlt[bigquery]" dagster dagster-embedded-elt dagster-webserver dagster-slack
  1. Install development dependencies:
uv add --dev pytest pytest-cov ruff mypy
  1. Set up environment variables:
cp .env.example .env
# Edit .env with your actual credentials

Required environment variables in .env:

  • HUBSPOT_PRIVATE_APP_ACCESS_TOKEN - Your HubSpot private app access token
  • DESTINATION__BIGQUERY__LOCATION - BigQuery dataset location (e.g., US, EU)
  • DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID - GCP project ID
  • DESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY - GCP service account private key
  • DESTINATION__BIGQUERY__CREDENTIALS__CLIENT_EMAIL - GCP service account email

Usage

Running the HubSpot Pipeline

Activate your virtual environment first:

# On Windows
.venv\Scripts\activate

# On macOS/Linux
source .venv/bin/activate

Run the HubSpot pipeline directly:

python -m pipelines.hubspot

Or import and run programmatically:

from pipelines.hubspot import pipeline_hubspot

# Run the pipeline
pipeline_hubspot()

Running with Dagster

Start the Dagster web server:

dagster dev

This will start the Dagster UI at http://localhost:3000 where you can:

  • View pipeline definitions and assets
  • Trigger manual runs
  • Monitor pipeline execution
  • View run history and logs
  • Configure schedules and sensors

Available HubSpot Resources

The pipeline extracts the following HubSpot resources:

Core Objects:

  • owners - HubSpot users and owners
  • companies - Company records (incremental)
  • contacts - Contact records (incremental)
  • deals - Deal records (incremental)
  • leads - Lead records (incremental)
  • quotes - Quote records (incremental)
  • tickets - Ticket records (incremental)
  • goal_targets - Goal target records (incremental)

Pipelines:

  • deals_pipelines - Deal pipeline configurations

Associations:

  • deals_contacts - Deal-to-contact associations
  • contacts_companies - Contact-to-company associations

Engagement Activities:

  • engagement_calls - Call activities (incremental)
  • engagement_communications - Communication activities (incremental)
  • engagement_meetings - Meeting activities (incremental)
  • engagement_notes - Note activities (incremental)
  • engagement_tasks - Task activities (incremental)

Pipeline Configuration

Incremental Loading

The pipeline uses dlt's incremental loading feature to only fetch records that have been updated since the last run. The state is automatically managed by dlt.

Incremental Key: Most resources use updatedAt as the incremental cursor field.

Batch Limits: The pipeline fetches up to 10,000 records per incremental run to avoid API rate limits and timeouts. If more records exist, the pipeline will run multiple times until all data is synchronized.

Write Disposition

All resources use write_disposition='merge' which means:

  • New records are inserted
  • Existing records (based on primary key) are updated
  • No data is deleted from the destination

BigQuery Dataset

Data is loaded into the hubspot_raw dataset in BigQuery. Each resource becomes a separate table.

Managing Dependencies

Current Dependencies

Core ETL:

  • dlt[bigquery] - Data load tool with BigQuery support
  • dagster - Orchestration framework
  • dagster-embedded-elt - Embedded ELT capabilities
  • dagster-webserver - Web UI for Dagster
  • dagster-slack - Slack notifications

Development:

  • pytest - Testing framework
  • pytest-cov - Test coverage
  • ruff - Python linter and formatter
  • mypy - Static type checking

Adding New Dependencies

Use uv add to automatically install and update pyproject.toml:

# Add a production dependency
uv add <package-name>

# Add a development dependency
uv add --dev <package-name>

# Add a specific version
uv add <package-name>==1.2.3

# Add with extras
uv add "package[extra1,extra2]"

No manual pyproject.toml editing needed - uv add handles it automatically!

Testing

# Run all tests
pytest

# Run with coverage
pytest --cov=src --cov-report=html

# Run specific test file
pytest tests/test_specific.py

# View coverage report
open htmlcov/index.html  # macOS
start htmlcov/index.html  # Windows

Code Quality

The project uses modern Python tooling for code quality:

# Format code with ruff
ruff format .

# Lint and auto-fix issues
ruff check . --fix

# Type checking with mypy
mypy src/

# Run all quality checks
ruff check . && ruff format --check . && mypy src/

Ruff Configuration

Configured in pyproject.toml:

  • Line length: 100 characters
  • Target: Python 3.10+
  • Selected rules: Errors (E), Pyflakes (F), Import sorting (I), Naming (N), Warnings (W), Pyupgrade (UP)

Development Workflow

  1. Create a new branch for your feature/fix
git checkout -b feature/your-feature-name
  1. Make your changes to the codebase

  2. Add tests for new functionality in tests/

  3. Run code quality checks:

ruff check . --fix
ruff format .
mypy src/
pytest
  1. Commit your changes:
git add .
git commit -m "Description of changes"
  1. Push and create a pull request
git push origin feature/your-feature-name

Troubleshooting

Common Issues

ImportError: No module named 'pipelines'

  • Make sure you're in the project root directory
  • Activate your virtual environment
  • Run with python -m pipelines.hubspot instead of direct execution

BigQuery Authentication Error

  • Verify your service account credentials in .env
  • Ensure the private key is properly formatted with \n for line breaks
  • Check that the service account has BigQuery Data Editor and Job User roles

HubSpot API Rate Limits

  • The pipeline implements pagination and batch limits
  • Rate limit errors are logged - wait and retry
  • Consider adjusting batch sizes in hubspot.py

dlt State Issues

  • Pipeline state is stored in .dlt/ directory
  • To reset incremental state, delete .dlt/ and run full refresh
  • State is also persisted in BigQuery in _dlt_loads and _dlt_pipeline_state tables

Project Links

License

[Your License Here]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dl_pipeline_connector-0.1.0.tar.gz (10.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dl_pipeline_connector-0.1.0-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file dl_pipeline_connector-0.1.0.tar.gz.

File metadata

File hashes

Hashes for dl_pipeline_connector-0.1.0.tar.gz
Algorithm Hash digest
SHA256 401bbcb87262ca791a66879dd3cba69c2c03e1f7fc137a853d5a6aa5c84d989a
MD5 073dccd25610c0088eb0a71ce3ebc7ea
BLAKE2b-256 21f533f9e0c9f50094f2e44ca363706ddbe2c92ec00b1cc8c647231df9b15400

See more details on using hashes here.

File details

Details for the file dl_pipeline_connector-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dl_pipeline_connector-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 01a47c10bf8f773a02b02f3099e17be349e8bf329300f6c230824d5b8eccebdc
MD5 4af81ee6c2d6e49e91cea2316e858a71
BLAKE2b-256 06381a63eab5b612fbc9318a4f9351332cb639788affa84b6f5a96048c948d75

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page