Building blocks for ETL pipelines.
Project description
Neo4j ETL Toolbox
A robust Python library of building blocks to assemble efficient, scalable ETL pipelines for Neo4j.
It simplifies the process of moving data from SQL, CSV, and Parquet sources into Neo4j by handling common concerns like batching, parallelism, logging, and error handling.
Key Features
- Task-Based Architecture: Compose pipelines from reusable units of work.
- Parallel Loading: Optimized strategies for high-performance loading without locking issues.
- Data Validation: Integrated Pydantic support for ensuring data quality before loading.
- Detailed Reporting: Built-in tracking of execution time and row counts.
- Flexible Sources: Support for SQL (via SQLAlchemy), CSV, Neo4j and Parquet (via PyArrow).
Parallel Loading Example
The library provides specialized tasks for parallel data loading. By using a "mix-and-batch" strategy, it can load relationships in parallel while minimizing deadlocks.
Here is an example of defining a parallel CSV loader task (taken from the examples/nyc-taxi project):
from pathlib import Path
from etl_lib.core.ETLContext import ETLContext
from etl_lib.core.SplittingBatchProcessor import dict_id_extractor
from etl_lib.task.data_loading.ParallelCSVLoad2Neo4jTask import ParallelCSVLoad2Neo4jTask
from model.trip import Trip # Your Pydantic model
class LoadTripsParallelTask(ParallelCSVLoad2Neo4jTask):
def __init__(self, context: ETLContext, csv_path: Path):
super().__init__(
context,
file=csv_path,
model=Trip,
error_file=Path('errors_parallel.json'),
batch_size=5000,
max_workers=10
)
def _query(self):
return """
UNWIND $batch AS row
MATCH (pu:Location {id: row.pu_location})
MATCH (do:Location {id: row.do_location})
CREATE (t:Trip {
id: randomUUID(),
pickup_datetime: row.pickup_datetime,
dropoff_datetime: row.dropoff_datetime,
...
})
CREATE (t)-[:STARTED_AT]->(pu)
CREATE (t)-[:ENDED_AT]->(do)
"""
def _id_extractor(self):
# Defines how to route rows to avoid locking on start/end nodes
return dict_id_extractor(table_size=10, start_key='pu_location', end_key='do_location')
Documentation & Examples
Complete documentation can be found on https://neo-technology-field.github.io/python-etl-lib/index.html
See the examples directory for complete projects:
Installation
The library can be installed via:
pip install neo4j-etl-lib
System Dependencies
Some components or documentation tools require additional system-level packages.
Graphviz
If you are building the documentation locally and want to generate diagrams (e.g., using make docs), you need Graphviz installed.
Debian/Ubuntu:
sudo apt install graphviz
Fedora/RHEL/CentOS:
sudo dnf install graphviz
Arch Linux / CachyOS:
sudo pacman -S graphviz
Podman + Testcontainers (Linux)
Don't. I could not get this to work without a brittle setup. I currently run my tests by pointing to a running db instance via .env. And on CI I use docker and it just works.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file neo4j_etl_lib-0.4.0.tar.gz.
File metadata
- Download URL: neo4j_etl_lib-0.4.0.tar.gz
- Upload date:
- Size: 41.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a2a99ba4a66963a341767032d8d83b07290c47452f3ad9a6ef55d2fbd184cfd7
|
|
| MD5 |
a0ffdf877a16a435fc5018fa1497b331
|
|
| BLAKE2b-256 |
bbf65a1eb14175ec0078ba4fb141eb77adc60ddc9336eaa236ad999fbd501210
|
File details
Details for the file neo4j_etl_lib-0.4.0-py3-none-any.whl.
File metadata
- Download URL: neo4j_etl_lib-0.4.0-py3-none-any.whl
- Upload date:
- Size: 56.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
abcdb7ce879da15f94b17843349df087c527dc160933618e06a97b27d588c3f6
|
|
| MD5 |
162d2769872966fcc4d7dc56358cea79
|
|
| BLAKE2b-256 |
f06e4aa197d312304185836d73bc78e00e37bcf0815d094bd0ed9b28aa877abd
|