A Python library for code-first data ingestion, transformation, and loading
Project description
Nexus
A Python library for code-first data ingestion, transformation, and loading.
Pipeline(source, steps, sink).run()
Installation
Nexus requires Python 3.12+. Clone the repo and install dependencies with uv:
git clone https://github.com/avaneeshdevkota/nexus
cd nexus
uv sync
Note: Nexus uses a flat layout. Set
PYTHONPATH=.when running scripts outside ofpytest:PYTHONPATH=. uv run python examples/stripe_customers_to_sqlite.pyTests set this automatically via
pyproject.toml.
Quick start
Fetch a CSV over HTTP and load it into SQLite:
from pipelines.pipeline import Pipeline
from sources.http.http_config import HTTPSourceConfig
from sources.http.http_source import HTTPSource
from steps.bytes_to_csv import BytesToCSV
from sinks.sqlite.sqlite_config import SQLiteSinkConfig
from sinks.sqlite.sqlite_sink import SQLiteSink
Pipeline(
source=HTTPSource(HTTPSourceConfig(url="https://example.com/data.csv")),
steps=[BytesToCSV()],
sink=SQLiteSink(SQLiteSinkConfig(
db_path="output.db",
table_name="data",
schema={"name": "TEXT", "age": "TEXT"},
)),
).run()
Sources
| Source | Description |
|---|---|
HTTPSource |
Fetch from any HTTP endpoint, with optional pagination |
FileSource |
Read a single local file |
GlobSource |
Read multiple local files matching a glob pattern |
S3Source |
Fetch a single object from S3 (authenticated or anonymous) |
S3GlobSource |
Fetch multiple S3 objects matching a prefix and pattern |
GoogleSheetsSource |
Read a Google Sheet (service account or public) |
StripeSource |
List any Stripe resource (customers, payouts, etc.) |
StripeSearchSource |
Search Stripe resources using Stripe's search API |
PostgresSource |
Query a Postgres database with raw SQL |
MySQLSource |
Query a MySQL database with raw SQL |
HTTPSource with pagination
from sources.http.http_config import HTTPSourceConfig, PaginationConfig
HTTPSource(HTTPSourceConfig(
url="https://api.example.com/items",
pagination=PaginationConfig(
get_params=lambda resp: {"page": resp.json()["next_page"]} if resp else {"page": 1},
has_more=lambda resp: resp.json()["next_page"] is not None,
),
))
S3Source (anonymous public bucket)
from sources.s3.s3_config import S3SourceConfig
from sources.s3.s3_source import S3Source
S3Source(S3SourceConfig(
bucket="noaa-ghcn-pds",
key="csv/by_station/ACW00011604.csv",
region_name="us-east-1",
))
StripeSource
from sources.stripe.stripe_config import StripeSourceConfig
from sources.stripe.stripe_source import StripeSource
StripeSource(StripeSourceConfig(
api_key=os.environ["STRIPE_API_KEY"],
resource="Customer",
))
StripeSearchSource
from sources.stripe.stripe_search_config import StripeSearchSourceConfig
from sources.stripe.stripe_search_source import StripeSearchSource
StripeSearchSource(StripeSearchSourceConfig(
api_key=os.environ["STRIPE_API_KEY"],
resource="Customer",
query=["email:'alice@example.com'", "name:'Alice'"],
operator="AND",
))
PostgresSource
from sources.postgres.postgres_config import PostgresSourceConfig
from sources.postgres.postgres_source import PostgresSource
PostgresSource(PostgresSourceConfig(
connection_string=os.environ["POSTGRES_CONNECTION_STRING"],
query="SELECT * FROM transactions WHERE status = 'completed'",
))
MySQLSource
from sources.mysql.mysql_config import MySQLSourceConfig
from sources.mysql.mysql_source import MySQLSource
MySQLSource(MySQLSourceConfig(
connection_string=os.environ["MYSQL_CONNECTION_STRING"],
query="SELECT * FROM orders WHERE created_at > '2024-01-01'",
))
Steps
| Step | Description |
|---|---|
BytesToCSV |
Parse raw bytes into list[dict[str, str]] |
BytesToJSON |
Parse raw bytes into a JSON value |
JSONToRows |
Store a JSON payload as a single row under a named column |
CastTypes |
Coerce column values to Python types (int, float, str, …) |
FilterRows |
Drop rows that don't match a predicate |
SelectColumns |
Keep only specified columns |
RenameColumns |
Rename columns via a mapping |
AddColumn |
Derive a new column from existing ones via a callable |
Example: filter + select + cast + rename
steps = [
BytesToCSV(),
FilterRows(lambda row: row["status"] == "completed"),
SelectColumns(["id", "amount", "created_at"]),
CastTypes({"amount": float}),
RenameColumns({"created_at": "date"}),
]
AddColumn
from steps.add_column import AddColumn
AddColumn("total", lambda row: row["price"] * row["quantity"])
Sinks
| Sink | Description |
|---|---|
SQLiteSink |
Write rows into a SQLite table |
FileSink |
Write rows to a local CSV or JSON file |
Incremental loading
StatefulPipeline runs a pipeline incrementally — it remembers where it left off between runs and only fetches new data.
from pipelines.stateful_pipeline import StatefulPipeline
StatefulPipeline(
build_pipeline=lambda since: Pipeline(
source=StripeSource(StripeSourceConfig(
api_key=os.environ["STRIPE_API_KEY"],
resource="Customer",
params={"created[gt]": since} if since else {},
)),
steps=[SelectColumns(["id", "email", "name", "created"])],
sink=sink,
),
get_state=lambda: store.get("stripe_customers"),
save_state=lambda value: store.set("stripe_customers", value),
advance_state=lambda: max_created_from_db(),
).run()
build_pipeline— builds the pipeline given the current checkpointget_state— returns the last saved checkpointsave_state— persists the new checkpoint after a runadvance_state— derives the new checkpoint (called after the pipeline runs)
Running tests
# Unit tests only
uv run pytest
# Include integration tests (hits real external endpoints)
uv run pytest -m integration
Examples
See the examples/ directory for end-to-end pipelines:
http_csv_to_file.py— HTTP → CSV → local filefile_csv_to_sqlite.py— local file → SQLiteglob_csv_to_sqlite.py— multiple local files → SQLites3_csv_to_sqlite.py— S3 object → SQLites3_glob_csv_to_sqlite.py— multiple S3 objects → SQLites3_filter_select_to_sqlite.py— S3 → filter → select → SQLitegoogle_sheets_to_sqlite.py— public Google Sheet → SQLitegithub_paginated.py— paginated GitHub API → SQLitestripe_customers_to_sqlite.py— Stripe customers → SQLitestripe_payouts_to_sqlite.py— Stripe payouts → SQLitestripe_search_to_sqlite.py— Stripe search → SQLitestripe_customers_incremental.py— incremental Stripe sync with StatefulPipelinepostgres_transactions_to_sqlite.py— Postgres → SQLite
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nexus_core-0.1.0.tar.gz.
File metadata
- Download URL: nexus_core-0.1.0.tar.gz
- Upload date:
- Size: 77.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
26cec8274a80dc8c02caa3c334a1c1fd9a1f5a1a282b1a8ee5d01823fe52f5e6
|
|
| MD5 |
b0ba92a1235c68c133fb953b89a2875c
|
|
| BLAKE2b-256 |
82dce302a677146d2e290171fd711526f9a98c0f39f6acf6c7082b8bc6f4c026
|
File details
Details for the file nexus_core-0.1.0-py3-none-any.whl.
File metadata
- Download URL: nexus_core-0.1.0-py3-none-any.whl
- Upload date:
- Size: 24.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ec39aa8c3a893453849cbb393918ab271de78f941e675a1d8ce5efafbc3af4c
|
|
| MD5 |
5c1032dfe872dc403faad65f84b868e8
|
|
| BLAKE2b-256 |
527c760252cb7c0bd3dc9eeeb8b4fe6204f39b8d7b7e7275e274ef76fa258b66
|