Skip to main content

A Python library for code-first data ingestion, transformation, and loading

Project description

Nexus

A Python library for code-first data ingestion, transformation, and loading.

Pipeline(source, steps, sink).run()

Installation

Nexus requires Python 3.12+. Clone the repo and install dependencies with uv:

git clone https://github.com/avaneeshdevkota/nexus
cd nexus
uv sync

Note: Nexus uses a flat layout. Set PYTHONPATH=. when running scripts outside of pytest:

PYTHONPATH=. uv run python examples/stripe_customers_to_sqlite.py

Tests set this automatically via pyproject.toml.

Quick start

Fetch a CSV over HTTP and load it into SQLite:

from pipelines.pipeline import Pipeline
from sources.http.http_config import HTTPSourceConfig
from sources.http.http_source import HTTPSource
from steps.bytes_to_csv import BytesToCSV
from sinks.sqlite.sqlite_config import SQLiteSinkConfig
from sinks.sqlite.sqlite_sink import SQLiteSink

Pipeline(
    source=HTTPSource(HTTPSourceConfig(url="https://example.com/data.csv")),
    steps=[BytesToCSV()],
    sink=SQLiteSink(SQLiteSinkConfig(
        db_path="output.db",
        table_name="data",
        schema={"name": "TEXT", "age": "TEXT"},
    )),
).run()

Sources

Source Description
HTTPSource Fetch from any HTTP endpoint, with optional pagination
FileSource Read a single local file
GlobSource Read multiple local files matching a glob pattern
S3Source Fetch a single object from S3 (authenticated or anonymous)
S3GlobSource Fetch multiple S3 objects matching a prefix and pattern
GoogleSheetsSource Read a Google Sheet (service account or public)
StripeSource List any Stripe resource (customers, payouts, etc.)
StripeSearchSource Search Stripe resources using Stripe's search API
PostgresSource Query a Postgres database with raw SQL
MySQLSource Query a MySQL database with raw SQL

HTTPSource with pagination

from sources.http.http_config import HTTPSourceConfig, PaginationConfig

HTTPSource(HTTPSourceConfig(
    url="https://api.example.com/items",
    pagination=PaginationConfig(
        get_params=lambda resp: {"page": resp.json()["next_page"]} if resp else {"page": 1},
        has_more=lambda resp: resp.json()["next_page"] is not None,
    ),
))

S3Source (anonymous public bucket)

from sources.s3.s3_config import S3SourceConfig
from sources.s3.s3_source import S3Source

S3Source(S3SourceConfig(
    bucket="noaa-ghcn-pds",
    key="csv/by_station/ACW00011604.csv",
    region_name="us-east-1",
))

StripeSource

from sources.stripe.stripe_config import StripeSourceConfig
from sources.stripe.stripe_source import StripeSource

StripeSource(StripeSourceConfig(
    api_key=os.environ["STRIPE_API_KEY"],
    resource="Customer",
))

StripeSearchSource

from sources.stripe.stripe_search_config import StripeSearchSourceConfig
from sources.stripe.stripe_search_source import StripeSearchSource

StripeSearchSource(StripeSearchSourceConfig(
    api_key=os.environ["STRIPE_API_KEY"],
    resource="Customer",
    query=["email:'alice@example.com'", "name:'Alice'"],
    operator="AND",
))

PostgresSource

from sources.postgres.postgres_config import PostgresSourceConfig
from sources.postgres.postgres_source import PostgresSource

PostgresSource(PostgresSourceConfig(
    connection_string=os.environ["POSTGRES_CONNECTION_STRING"],
    query="SELECT * FROM transactions WHERE status = 'completed'",
))

MySQLSource

from sources.mysql.mysql_config import MySQLSourceConfig
from sources.mysql.mysql_source import MySQLSource

MySQLSource(MySQLSourceConfig(
    connection_string=os.environ["MYSQL_CONNECTION_STRING"],
    query="SELECT * FROM orders WHERE created_at > '2024-01-01'",
))

Steps

Step Description
BytesToCSV Parse raw bytes into list[dict[str, str]]
BytesToJSON Parse raw bytes into a JSON value
JSONToRows Store a JSON payload as a single row under a named column
CastTypes Coerce column values to Python types (int, float, str, …)
FilterRows Drop rows that don't match a predicate
SelectColumns Keep only specified columns
RenameColumns Rename columns via a mapping
AddColumn Derive a new column from existing ones via a callable

Example: filter + select + cast + rename

steps = [
    BytesToCSV(),
    FilterRows(lambda row: row["status"] == "completed"),
    SelectColumns(["id", "amount", "created_at"]),
    CastTypes({"amount": float}),
    RenameColumns({"created_at": "date"}),
]

AddColumn

from steps.add_column import AddColumn

AddColumn("total", lambda row: row["price"] * row["quantity"])

Sinks

Sink Description
SQLiteSink Write rows into a SQLite table
FileSink Write rows to a local CSV or JSON file

Incremental loading

StatefulPipeline runs a pipeline incrementally — it remembers where it left off between runs and only fetches new data.

from pipelines.stateful_pipeline import StatefulPipeline

StatefulPipeline(
    build_pipeline=lambda since: Pipeline(
        source=StripeSource(StripeSourceConfig(
            api_key=os.environ["STRIPE_API_KEY"],
            resource="Customer",
            params={"created[gt]": since} if since else {},
        )),
        steps=[SelectColumns(["id", "email", "name", "created"])],
        sink=sink,
    ),
    get_state=lambda: store.get("stripe_customers"),
    save_state=lambda value: store.set("stripe_customers", value),
    advance_state=lambda: max_created_from_db(),
).run()
  • build_pipeline — builds the pipeline given the current checkpoint
  • get_state — returns the last saved checkpoint
  • save_state — persists the new checkpoint after a run
  • advance_state — derives the new checkpoint (called after the pipeline runs)

Running tests

# Unit tests only
uv run pytest

# Include integration tests (hits real external endpoints)
uv run pytest -m integration

Examples

See the examples/ directory for end-to-end pipelines:

  • http_csv_to_file.py — HTTP → CSV → local file
  • file_csv_to_sqlite.py — local file → SQLite
  • glob_csv_to_sqlite.py — multiple local files → SQLite
  • s3_csv_to_sqlite.py — S3 object → SQLite
  • s3_glob_csv_to_sqlite.py — multiple S3 objects → SQLite
  • s3_filter_select_to_sqlite.py — S3 → filter → select → SQLite
  • google_sheets_to_sqlite.py — public Google Sheet → SQLite
  • github_paginated.py — paginated GitHub API → SQLite
  • stripe_customers_to_sqlite.py — Stripe customers → SQLite
  • stripe_payouts_to_sqlite.py — Stripe payouts → SQLite
  • stripe_search_to_sqlite.py — Stripe search → SQLite
  • stripe_customers_incremental.py — incremental Stripe sync with StatefulPipeline
  • postgres_transactions_to_sqlite.py — Postgres → SQLite

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nexus_core-0.1.0.tar.gz (77.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nexus_core-0.1.0-py3-none-any.whl (24.0 kB view details)

Uploaded Python 3

File details

Details for the file nexus_core-0.1.0.tar.gz.

File metadata

  • Download URL: nexus_core-0.1.0.tar.gz
  • Upload date:
  • Size: 77.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for nexus_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 26cec8274a80dc8c02caa3c334a1c1fd9a1f5a1a282b1a8ee5d01823fe52f5e6
MD5 b0ba92a1235c68c133fb953b89a2875c
BLAKE2b-256 82dce302a677146d2e290171fd711526f9a98c0f39f6acf6c7082b8bc6f4c026

See more details on using hashes here.

File details

Details for the file nexus_core-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: nexus_core-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 24.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for nexus_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1ec39aa8c3a893453849cbb393918ab271de78f941e675a1d8ce5efafbc3af4c
MD5 5c1032dfe872dc403faad65f84b868e8
BLAKE2b-256 527c760252cb7c0bd3dc9eeeb8b4fe6204f39b8d7b7e7275e274ef76fa258b66

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page