Skip to main content

A FastAPI-inspired wrapper around dlt for ELT pipelines

Project description

FastELT

A FastAPI-inspired wrapper around dlt for ELT pipelines.

FastELT brings FastAPI's developer experience to data pipelines: decorators, type hints, Pydantic v2 validation, automatic environment variable resolution — all built on top of dlt's battle-tested engine (20+ destinations, incremental loading, schema evolution, merge strategies).

Installation

pip install fastelt

With optional extras:

pip install fastelt[cli]        # CLI support (Typer)
pip install fastelt[rest_api]   # REST API source (dlt rest_api)
pip install fastelt[filesystem] # Filesystem sources (local, GCS)

Quickstart

import csv
from fastelt import FastELT, Source

local_data = Source(name="local")

@local_data.resource(primary_key="name", write_disposition="replace")
def users():
    with open("users.csv") as f:
        for row in csv.DictReader(f):
            yield row

app = FastELT(pipeline_name="my_pipeline", destination="duckdb")
app.include_source(local_data)
app.run()

Key Concepts

Sources and Resources

A Source groups related resources with shared config — like FastAPI's APIRouter. Resources are generator functions that yield dict records:

from fastelt import Source, Env

github = Source(
    name="github",
    base_url="https://api.github.com",
    token=Env("GH_TOKEN"),
    org="anthropics",
)

@github.resource(primary_key="id", write_disposition="merge")
def repositories():
    headers = {"Authorization": f"Bearer {github.token}"}
    resp = httpx.get(f"{github.base_url}/orgs/{github.org}/repos", headers=headers)
    yield from resp.json()

@app.source — Quick Inline

For single-resource sources, skip the Source object — like @app.get in FastAPI:

app = FastELT(pipeline_name="demo", destination="duckdb")

@app.source("users", primary_key="id")
def users():
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

app.run()

Environment Variables

Three ways to inject env vars — all resolved automatically:

from typing import Annotated
from fastelt import Env, Secret, Source

# 1. As a Source field value
github = Source(name="github", token=Env("GH_TOKEN"))

# 2. As an Annotated type hint on a resource function
@source.resource()
def repos(token: Annotated[str, Secret("GH_TOKEN")]):
    ...

# 3. Auto-resolved from plain str params (uppercased)
@source.resource()
def repos(gh_token: str):  # resolves from GH_TOKEN env var
    ...

Secret works like Env but masks the value in logs/repr.

Pydantic response_model

Validate, coerce types, and normalize columns — like FastAPI's response_model:

from pydantic import BaseModel, field_validator

class UserModel(BaseModel):
    name: str
    email: str
    age: int

    @field_validator("age")
    @classmethod
    def age_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError(f"age must be > 0, got {v}")
        return v

@local_data.resource(
    response_model=UserModel,
    primary_key="name",
    write_disposition="replace",
)
def users():
    # CSV yields strings — pydantic coerces age from str to int
    with open("users.csv") as f:
        for row in csv.DictReader(f):
            yield row

Use frozen=True to reject unexpected columns with SchemaFrozenError instead of a warning.

REST API Source (Declarative)

For standard REST APIs, define endpoints as config — dlt handles pagination, auth, and incremental loading:

from fastelt import Env, FastELT
from fastelt.rest_api import RESTAPISource

github = RESTAPISource(
    name="github",
    base_url="https://api.github.com",
    auth={"type": "bearer", "token": Env("GH_TOKEN")},
    paginator="header_link",
    resources=[
        {
            "name": "repos",
            "endpoint": {
                "path": "/orgs/{org}/repos",
                "params": {"org": "anthropics", "per_page": 100},
            },
            "primary_key": "id",
            "write_disposition": "merge",
        },
    ],
)

app = FastELT(pipeline_name="github_pipeline", destination="duckdb")
app.include_source(github)
app.run()

Filesystem Sources

Load files from local disk or cloud storage:

from fastelt.sources.filesystem import LocalFileSystemSource

src = LocalFileSystemSource(
    name="local_data",
    bucket_url="/path/to/data",
    resources=[
        {"name": "users", "file_glob": "users/*.csv", "format": "csv"},
        {"name": "events", "file_glob": "events/*.jsonl", "format": "jsonl"},
    ],
)

Also available: GCSFileSystemSource for Google Cloud Storage (gs:// URLs).

Incremental Loading

Use dlt's incremental cursors for efficient syncing:

import dlt

@api.resource(primary_key="id", write_disposition="merge")
def events(
    updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01"),
):
    yield {"id": 1, "name": "signup", "updated_at": "2024-06-15T10:00:00"}

CLI

pip install fastelt[cli]

fastelt run --destination duckdb --source github
fastelt list
fastelt describe github:repos

The CLI auto-discovers your FastELT app instance, like fastapi run.

Why FastELT?

Feature FastELT Meltano / Singer dlt (raw)
Define pipelines Python decorators YAML / JSON config Python decorators
Config Inferred from Source fields Manual definition Manual / partial
Data validation Pydantic v2 response_model None built-in Schema inference
Env var management Env() / Secret() + auto-resolve .env files dlt.secrets
Destinations 20+ (via dlt) 300+ connectors 20+
Learning curve Familiar if you know FastAPI Tool-specific DSL dlt-specific API

Documentation

Full docs: fastelt.dev

Requirements

  • Python >= 3.12
  • dlt (installed automatically)
  • Pydantic >= 2.0 (installed automatically)
  • Loguru (installed automatically)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastelt-0.1.0rc2.tar.gz (22.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastelt-0.1.0rc2-py3-none-any.whl (29.2 kB view details)

Uploaded Python 3

File details

Details for the file fastelt-0.1.0rc2.tar.gz.

File metadata

  • Download URL: fastelt-0.1.0rc2.tar.gz
  • Upload date:
  • Size: 22.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.12

File hashes

Hashes for fastelt-0.1.0rc2.tar.gz
Algorithm Hash digest
SHA256 9f533ad93e12fc017544ae66bebfe9cbd1f44ba52a2d80e472f66c0225f48620
MD5 50fb1adc240aec6547619c1e6f17c118
BLAKE2b-256 45ea5fed87699b572ba81cffb69904b26bc9f5c3fa82370fba60205d99b2b951

See more details on using hashes here.

File details

Details for the file fastelt-0.1.0rc2-py3-none-any.whl.

File metadata

  • Download URL: fastelt-0.1.0rc2-py3-none-any.whl
  • Upload date:
  • Size: 29.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.12

File hashes

Hashes for fastelt-0.1.0rc2-py3-none-any.whl
Algorithm Hash digest
SHA256 ab49c379e35c9d0353c26a400b4bc28e3a3cd05b749a5b481d85edb47084b3e8
MD5 145bcc4d62ca7f31facbaceee88cd8f6
BLAKE2b-256 3e73abbfc5922fd4f00890598a5304c008ce06f7693eab40fe10087351e818de

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page