Skip to main content

A FastAPI-inspired wrapper around dlt for ELT pipelines

Project description

FastELT

A FastAPI-inspired wrapper around dlt for ELT pipelines.

FastELT brings FastAPI's developer experience to data pipelines: decorators, type hints, Pydantic v2 validation, automatic environment variable resolution — all built on top of dlt's battle-tested engine (20+ destinations, incremental loading, schema evolution, merge strategies).

Installation

pip install fastelt

With optional extras:

pip install fastelt[cli]        # CLI support (Typer)
pip install fastelt[rest_api]   # REST API source (dlt rest_api)
pip install fastelt[filesystem] # Filesystem sources (local, GCS)

Quickstart

import csv
from fastelt import FastELT, Source

local_data = Source(name="local")

@local_data.resource(primary_key="name", write_disposition="replace")
def users():
    with open("users.csv") as f:
        for row in csv.DictReader(f):
            yield row

app = FastELT(pipeline_name="my_pipeline", destination="duckdb")
app.include_source(local_data)
app.run()

Key Concepts

Sources and Resources

A Source groups related resources with shared config — like FastAPI's APIRouter. Resources are generator functions that yield dict records:

from fastelt import Source, Env

github = Source(
    name="github",
    base_url="https://api.github.com",
    token=Env("GH_TOKEN"),
    org="anthropics",
)

@github.resource(primary_key="id", write_disposition="merge")
def repositories():
    headers = {"Authorization": f"Bearer {github.token}"}
    resp = httpx.get(f"{github.base_url}/orgs/{github.org}/repos", headers=headers)
    yield from resp.json()

@app.source — Quick Inline

For single-resource sources, skip the Source object — like @app.get in FastAPI:

app = FastELT(pipeline_name="demo", destination="duckdb")

@app.source("users", primary_key="id")
def users():
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

app.run()

Environment Variables

Three ways to inject env vars — all resolved automatically:

from typing import Annotated
from fastelt import Env, Secret, Source

# 1. As a Source field value
github = Source(name="github", token=Env("GH_TOKEN"))

# 2. As an Annotated type hint on a resource function
@source.resource()
def repos(token: Annotated[str, Secret("GH_TOKEN")]):
    ...

# 3. Auto-resolved from plain str params (uppercased)
@source.resource()
def repos(gh_token: str):  # resolves from GH_TOKEN env var
    ...

Secret works like Env but masks the value in logs/repr.

Pydantic response_model

Validate, coerce types, and normalize columns — like FastAPI's response_model:

from pydantic import BaseModel, field_validator

class UserModel(BaseModel):
    name: str
    email: str
    age: int

    @field_validator("age")
    @classmethod
    def age_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError(f"age must be > 0, got {v}")
        return v

@local_data.resource(
    response_model=UserModel,
    primary_key="name",
    write_disposition="replace",
)
def users():
    # CSV yields strings — pydantic coerces age from str to int
    with open("users.csv") as f:
        for row in csv.DictReader(f):
            yield row

Use frozen=True to reject unexpected columns with SchemaFrozenError instead of a warning.

Tip: If your resource uses a -> list[Model] return annotation, response_model is set automatically — no need to specify it twice.

Parent-Child Resource Chaining

Resources can depend on other resources via type annotations — no extra decorator needed:

from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str

class Repo(BaseModel):
    id: int
    user_id: int
    name: str

github = Source(name="github", token=Env("GH_TOKEN"))

@github.resource(primary_key="id")
def users() -> list[User]:
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

# Auto-detected: `user: User` matches `users() -> list[User]`
@github.resource(primary_key="id")
def repos(user: User) -> list[Repo]:
    yield {"id": 100, "user_id": user.id, "name": f"repo-{user.name}"}

FastELT matches the User type annotation on repos(user: User) to the users() -> list[User] return type. Under the hood, users is built as a dlt.resource and repos as a dlt.transformer(data_from=users). The child function receives a validated Pydantic model instance with dot-access to fields.

Chains of any depth work: users → repos → commits. When running selectively (e.g., resources=["repos"]), parent resources are auto-included.

REST API Source (Declarative)

For standard REST APIs, define endpoints as config — dlt handles pagination, auth, and incremental loading:

from fastelt import Env, FastELT
from fastelt.sources.rest_api import RESTAPISource, BearerTokenAuth

github = RESTAPISource(
    name="github",
    base_url="https://api.github.com",
    auth=BearerTokenAuth(token=Env("GH_TOKEN")),
    paginator="header_link",
    resources=[
        {
            "name": "repos",
            "endpoint": {
                "path": "/orgs/{org}/repos",
                "params": {"org": "anthropics", "per_page": 100},
            },
            "primary_key": "id",
            "write_disposition": "merge",
        },
    ],
)

app = FastELT(pipeline_name="github_pipeline", destination="duckdb")
app.include_source(github)
app.run()

Filesystem Sources

Load files from local disk or cloud storage:

from fastelt.sources.filesystem import LocalFileSystemSource

src = LocalFileSystemSource(
    name="local_data",
    bucket_url="/path/to/data",
    resources=[
        {"name": "users", "file_glob": "users/*.csv", "format": "csv"},
        {"name": "events", "file_glob": "events/*.jsonl", "format": "jsonl"},
    ],
)

Also available: GCSFileSystemSource for Google Cloud Storage (gs:// URLs).

Incremental Loading

Use dlt's incremental cursors for efficient syncing:

import dlt

@api.resource(primary_key="id", write_disposition="merge")
def events(
    updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01"),
):
    yield {"id": 1, "name": "signup", "updated_at": "2024-06-15T10:00:00"}

CLI

pip install fastelt[cli]

fastelt run --destination duckdb --source github
fastelt list
fastelt describe github:repos

The CLI auto-discovers your FastELT app instance, like fastapi run.

Why FastELT?

Feature FastELT Meltano / Singer dlt (raw)
Define pipelines Python decorators YAML / JSON config Python decorators
Config Inferred from Source fields Manual definition Manual / partial
Data validation Pydantic v2 response_model None built-in Schema inference
Resource chaining Type annotations (UserRepo) Config-based dlt.transformer + manual wiring
Env var management Env() / Secret() + auto-resolve .env files dlt.secrets
Destinations 20+ (via dlt) 300+ connectors 20+
Learning curve Familiar if you know FastAPI Tool-specific DSL dlt-specific API

Documentation

Full docs: fastelt.dev

Requirements

  • Python >= 3.12
  • dlt (installed automatically)
  • Pydantic >= 2.0 (installed automatically)
  • Loguru (installed automatically)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastelt-0.1.0.dev20260309.tar.gz (21.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastelt-0.1.0.dev20260309-py3-none-any.whl (29.7 kB view details)

Uploaded Python 3

File details

Details for the file fastelt-0.1.0.dev20260309.tar.gz.

File metadata

  • Download URL: fastelt-0.1.0.dev20260309.tar.gz
  • Upload date:
  • Size: 21.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fastelt-0.1.0.dev20260309.tar.gz
Algorithm Hash digest
SHA256 5a735d9d8d1fc09cab9ce71195228f1a5a3daffce6688f70a41fc429cc16a69c
MD5 66e992c41672987e95d1f48c9fbf1bf0
BLAKE2b-256 2a80057e8d11f0771ef871fe1ab1a248d8c71b9cae5d1bf4d645b88d93c339dd

See more details on using hashes here.

File details

Details for the file fastelt-0.1.0.dev20260309-py3-none-any.whl.

File metadata

  • Download URL: fastelt-0.1.0.dev20260309-py3-none-any.whl
  • Upload date:
  • Size: 29.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fastelt-0.1.0.dev20260309-py3-none-any.whl
Algorithm Hash digest
SHA256 4086f8f9de87f9870ec9475a85cfddd91cffa564ac13b3c9ed0e013eed1318d1
MD5 a194a45d2d2b7e9290f7721bddf11a72
BLAKE2b-256 ce1061a209434303e77cacd81fbb4f83c840a8e4d823fffae58419a84f9c1ee5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page