HubSpot to BigQuery ETL pipeline using dlt and Dagster
Project description
DL Pipeline Connector
A data pipeline connector for ETL processes built with dlt (data load tool) and Dagster, focusing on extracting data from HubSpot CRM and loading it into BigQuery.
Features
- HubSpot CRM data extraction with incremental loading
- BigQuery destination support
- Dagster orchestration integration
- Multiple HubSpot resources: contacts, companies, deals, engagement activities, and more
- Incremental data loading with state management
- Automated pipeline scheduling and monitoring
Project Structure
dl-pipeline-connector/
├── src/
│ ├── pipelines/ # ETL pipeline implementations
│ │ ├── hubspot.py # HubSpot to BigQuery pipeline
│ │ └── __init__.py
│ ├── dagster/ # Dagster orchestration
│ │ ├── assets.py # Dagster assets definitions
│ │ ├── definitions.py # Dagster definitions
│ │ ├── jobs.py # Dagster jobs
│ │ ├── schedulers.py # Dagster schedulers
│ │ ├── sensors.py # Dagster sensors
│ │ └── __init__.py
│ ├── dagster_test/ # Dagster testing utilities
│ │ └── __init__.py
│ ├── constants/ # Constants and configuration
│ │ └── urls.py # API URLs
│ └── __init__.py
├── .github/ # GitHub configuration
├── .venv/ # Virtual environment
├── pyproject.toml # Project configuration & dependencies
├── .env.example # Environment variables template
├── .env # Environment variables (gitignored)
├── .gitignore # Git ignore rules
├── UV_GUIDE.md # UV package manager guide
└── README.md
Setup
Prerequisites
- Python 3.10 or higher
- uv package manager
- HubSpot account with API access
- Google Cloud Platform account with BigQuery enabled
Install uv
# On macOS/Linux
curl -LsSf https://astral.sh/uv/install.sh | sh
# On Windows
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"
Project Setup
- Clone the repository and navigate to the project directory:
cd dl-pipeline-connector
- Sync the project (creates venv and installs dependencies):
uv sync
- Install the core dependencies:
uv add "dlt[bigquery]" dagster dagster-embedded-elt dagster-webserver dagster-slack
- Install development dependencies:
uv add --dev pytest pytest-cov ruff mypy
- Set up environment variables:
cp .env.example .env
# Edit .env with your actual credentials
Required environment variables in .env:
HUBSPOT_PRIVATE_APP_ACCESS_TOKEN- Your HubSpot private app access tokenDESTINATION__BIGQUERY__LOCATION- BigQuery dataset location (e.g., US, EU)DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID- GCP project IDDESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY- GCP service account private keyDESTINATION__BIGQUERY__CREDENTIALS__CLIENT_EMAIL- GCP service account email
Usage
Running the HubSpot Pipeline
Activate your virtual environment first:
# On Windows
.venv\Scripts\activate
# On macOS/Linux
source .venv/bin/activate
Run the HubSpot pipeline directly:
python -m pipelines.hubspot
Or import and run programmatically:
from pipelines.hubspot import pipeline_hubspot
# Run the pipeline
pipeline_hubspot()
Running with Dagster
Start the Dagster web server:
dagster dev
This will start the Dagster UI at http://localhost:3000 where you can:
- View pipeline definitions and assets
- Trigger manual runs
- Monitor pipeline execution
- View run history and logs
- Configure schedules and sensors
Available HubSpot Resources
The pipeline extracts the following HubSpot resources:
Core Objects:
owners- HubSpot users and ownerscompanies- Company records (incremental)contacts- Contact records (incremental)deals- Deal records (incremental)leads- Lead records (incremental)quotes- Quote records (incremental)tickets- Ticket records (incremental)goal_targets- Goal target records (incremental)
Pipelines:
deals_pipelines- Deal pipeline configurations
Associations:
deals_contacts- Deal-to-contact associationscontacts_companies- Contact-to-company associations
Engagement Activities:
engagement_calls- Call activities (incremental)engagement_communications- Communication activities (incremental)engagement_meetings- Meeting activities (incremental)engagement_notes- Note activities (incremental)engagement_tasks- Task activities (incremental)
Pipeline Configuration
Incremental Loading
The pipeline uses dlt's incremental loading feature to only fetch records that have been updated since the last run. The state is automatically managed by dlt.
Incremental Key: Most resources use updatedAt as the incremental cursor field.
Batch Limits: The pipeline fetches up to 10,000 records per incremental run to avoid API rate limits and timeouts. If more records exist, the pipeline will run multiple times until all data is synchronized.
Write Disposition
All resources use write_disposition='merge' which means:
- New records are inserted
- Existing records (based on primary key) are updated
- No data is deleted from the destination
BigQuery Dataset
Data is loaded into the hubspot_raw dataset in BigQuery. Each resource becomes a separate table.
Managing Dependencies
Current Dependencies
Core ETL:
dlt[bigquery]- Data load tool with BigQuery supportdagster- Orchestration frameworkdagster-embedded-elt- Embedded ELT capabilitiesdagster-webserver- Web UI for Dagsterdagster-slack- Slack notifications
Development:
pytest- Testing frameworkpytest-cov- Test coverageruff- Python linter and formattermypy- Static type checking
Adding New Dependencies
Use uv add to automatically install and update pyproject.toml:
# Add a production dependency
uv add <package-name>
# Add a development dependency
uv add --dev <package-name>
# Add a specific version
uv add <package-name>==1.2.3
# Add with extras
uv add "package[extra1,extra2]"
No manual pyproject.toml editing needed - uv add handles it automatically!
Testing
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_specific.py
# View coverage report
open htmlcov/index.html # macOS
start htmlcov/index.html # Windows
Code Quality
The project uses modern Python tooling for code quality:
# Format code with ruff
ruff format .
# Lint and auto-fix issues
ruff check . --fix
# Type checking with mypy
mypy src/
# Run all quality checks
ruff check . && ruff format --check . && mypy src/
Ruff Configuration
Configured in pyproject.toml:
- Line length: 100 characters
- Target: Python 3.10+
- Selected rules: Errors (E), Pyflakes (F), Import sorting (I), Naming (N), Warnings (W), Pyupgrade (UP)
Development Workflow
- Create a new branch for your feature/fix
git checkout -b feature/your-feature-name
-
Make your changes to the codebase
-
Add tests for new functionality in
tests/ -
Run code quality checks:
ruff check . --fix
ruff format .
mypy src/
pytest
- Commit your changes:
git add .
git commit -m "Description of changes"
- Push and create a pull request
git push origin feature/your-feature-name
Troubleshooting
Common Issues
ImportError: No module named 'pipelines'
- Make sure you're in the project root directory
- Activate your virtual environment
- Run with
python -m pipelines.hubspotinstead of direct execution
BigQuery Authentication Error
- Verify your service account credentials in
.env - Ensure the private key is properly formatted with
\nfor line breaks - Check that the service account has BigQuery Data Editor and Job User roles
HubSpot API Rate Limits
- The pipeline implements pagination and batch limits
- Rate limit errors are logged - wait and retry
- Consider adjusting batch sizes in hubspot.py
dlt State Issues
- Pipeline state is stored in
.dlt/directory - To reset incremental state, delete
.dlt/and run full refresh - State is also persisted in BigQuery in
_dlt_loadsand_dlt_pipeline_statetables
Project Links
- dlt Documentation
- Dagster Documentation
- HubSpot API Reference
- BigQuery Documentation
- uv Package Manager
License
[Your License Here]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dl_pipeline_connector-0.1.0.tar.gz.
File metadata
- Download URL: dl_pipeline_connector-0.1.0.tar.gz
- Upload date:
- Size: 10.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
401bbcb87262ca791a66879dd3cba69c2c03e1f7fc137a853d5a6aa5c84d989a
|
|
| MD5 |
073dccd25610c0088eb0a71ce3ebc7ea
|
|
| BLAKE2b-256 |
21f533f9e0c9f50094f2e44ca363706ddbe2c92ec00b1cc8c647231df9b15400
|
File details
Details for the file dl_pipeline_connector-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dl_pipeline_connector-0.1.0-py3-none-any.whl
- Upload date:
- Size: 10.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
01a47c10bf8f773a02b02f3099e17be349e8bf329300f6c230824d5b8eccebdc
|
|
| MD5 |
4af81ee6c2d6e49e91cea2316e858a71
|
|
| BLAKE2b-256 |
06381a63eab5b612fbc9318a4f9351332cb639788affa84b6f5a96048c948d75
|