Skip to main content

A flexible ETL framework with a database-driven scheduler, extensible pipeline blocs, and a RESTful API.

Project description

th2etl

thaink2 in house built ETL and automatisation library

Design

A pipeline is modeled as a set of interdependent blocs. Each bloc is one of:

  • LoaderBloc — loads or extracts raw data
  • TransformerBloc — transforms data between stages
  • ExporterBloc — exports or writes output

Dependencies between blocs are resolved before execution, so the pipeline runs in dependency order.

Loader Blocs

The following loader blocs are available:

  • CsvLoaderBloc: Loads data from a CSV file.
    • bloc_type: csv_loader
    • Config:
      • file_path (required): The path to the CSV file.
      • delimiter (optional): The delimiter character (default: ,).
  • PostgresLoaderBloc: Loads data from a PostgreSQL database.
    • bloc_type: postgres_loader
    • Config:
      • query (required): The SQL query to execute.
  • ApiLoaderBloc: Loads data from a web API.
    • bloc_type: api_loader
    • Config:
      • url (required): The API endpoint URL.
      • method (optional): The HTTP method (default: GET).
      • params (optional): A dictionary of URL parameters.
      • headers (optional): A dictionary of HTTP headers.
      • json (optional): A dictionary for the JSON request body.

Transformer Blocs

The following transformer blocs are available:

  • RunAdkAgentsBloc: Runs an ADK agent via an API call.
    • bloc_type: run_adk_agents
    • Config:
      • url (optional): The API endpoint for the agent. Defaults to the development server.
      • agent_id (required): The ID of the agent to run (e.g., database_assistant).
      • user_id (required): The user's ID (e.g., email), used for authentication.
      • message_text (required): The text message to send to the agent.
  • RefreshWebhooksBloc: Refreshes webhooks via an API call.
    • bloc_type: refresh_webhooks
    • Config:
      • url (required): The API endpoint for refreshing webhooks.
      • user_id (required): The user's ID (e.g., email), used for authentication.

API Service

The application includes a FastAPI-based API for managing resources.

To run the API server, use the --serve-api command:

th2etl --serve-api

You can also specify the host and port:

th2etl --serve-api --host 0.0.0.0 --port 8080

The API documentation will be available at http://127.0.0.1:8000/docs when the server is running.

Health Check

You can monitor the status of the service, including its connection to the database, by sending a GET request to the /health endpoint.

curl http://127.0.0.1:8000/health

If the service is running and connected to the database, it will return a 200 OK response with {"status": "ok"}. If the database connection fails, it will return a 503 Service Unavailable error.

Usage

Run the scheduler in the current process:

python -m th2etl.runner

Run the scheduler in a separate isolated session:

python -m th2etl.runner --background

Pass environment variables into the isolated session:

python -m th2etl.runner --background --env SOURCE=prod --env DESTINATION=warehouse

If the package is installed, use the CLI entry point:

th2etl

This will start the scheduler by default.

To start the scheduler in the background:

th2etl --background

To start the API service in the background:

th2etl --serve-api --background

Quickstart

Create pipeline metadata from the command line and then start the ETL service.

  1. Set your PostgreSQL database settings in environment variables:
$env:DATABASE_HOST = "localhost"
$env:DATABASE_PORT = "5432"
$env:DATABASE_NAME = "th2etl"
$env:DATABASE_USER = "etl_user"
$env:DATABASE_PASSWORD = "secret"
  1. Create blocs in the database:
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
    storage.create_bloc('example_loader','example_loader',dependencies=[],config={'source':'csv'})
    storage.create_bloc('example_transformer','example_transformer',dependencies=['example_loader'],config={'factor':2})
    storage.create_bloc('example_exporter','example_exporter',dependencies=['example_transformer'],config={'destination':'stdout'})"
  1. Create a trigger for your pipeline:
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
    storage.create_trigger('every_hour','example_pipeline','0 * * * *')"
  1. Create the pipeline and optional scheduler:
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
    storage.create_pipeline('example_pipeline',['example_loader','example_transformer','example_exporter'])
    storage.create_scheduler('example_scheduler','example_pipeline','every_hour')"
  1. Start the ETL service:
th2etl

Or in the background:

th2etl --background

Persistent Storage

Use DatabaseStorage to persist bloc, pipeline, trigger, and scheduler definitions.

from th2etl import DatabaseStorage
from th2etl.configs.settings import get_settings

settings = get_settings()
with DatabaseStorage.from_settings(settings) as storage:
    storage.create_bloc("example_loader", "example_loader", dependencies=[], config={"source": "csv"})
    storage.create_bloc("example_transformer", "example_transformer", dependencies=["example_loader"], config={"factor": 2})
    storage.create_bloc("example_exporter", "example_exporter", dependencies=["example_transformer"], config={"destination": "stdout"})
    storage.create_pipeline("example_pipeline", ["example_loader", "example_transformer", "example_exporter"])
    storage.create_trigger("every_hour", "example_pipeline", "0 * * * *")
    storage.create_scheduler("example_scheduler", "example_pipeline", "every_hour")

    print(storage.list_pipelines())
    print(storage.list_schedulers())

The Settings object reads database connection details from environment variables such as DATABASE_HOST, DATABASE_PORT, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, and optionally DATABASE_SSL_MODE. You can also provide DATABASE_URL directly.

Scheduler

Use CronTrigger and CronScheduler to run a pipeline on a cron-like schedule:

from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import CronScheduler, CronTrigger

pipeline = build_example_pipeline()
trigger = CronTrigger("*/5 * * * *")
scheduler = CronScheduler(pipeline, trigger)
scheduler.start()

For multiple pipelines with independent trigger schedules, use SchedulerManager so each pipeline can run on its own cadence in parallel:

from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import CronTrigger, CronScheduler, SchedulerManager

pipeline1 = build_example_pipeline()
pipeline2 = build_example_pipeline()

scheduler1 = CronScheduler(pipeline1, CronTrigger("0 * * * *"), name="hourly_pipeline")
scheduler2 = CronScheduler(pipeline2, CronTrigger("*/5 * * * *"), name="five_minute_pipeline")

manager = SchedulerManager([scheduler1, scheduler2])
manager.start()

If you persist scheduler metadata in the database, load scheduled pipelines dynamically from DatabaseStorage:

from th2etl import DatabaseStorage
from th2etl.configs.settings import get_settings
from th2etl.scheduler import load_scheduler_manager

settings = get_settings()
with DatabaseStorage.from_settings(settings) as storage:
    manager = load_scheduler_manager(storage)
    manager.start()

Or create a scheduler helper directly:

from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import schedule_pipeline

pipeline = build_example_pipeline()
scheduler = schedule_pipeline(pipeline, "0 * * * *")
scheduler.start()

Logging

The application uses Python's standard logging module. You can control the log verbosity using environment variables.

  • TH2ETL_LOG_LEVEL: Sets the global log level. Defaults to INFO. Can be set to DEBUG, INFO, WARNING, ERROR.
  • TH2ETL_LOG_LEVELS: Provides fine-grained control over different parts of the application. This is a comma-separated list of logger_name:LEVEL.

For example, to see detailed logs from the scheduler but only warnings and errors from the pipelines and blocs, you can set:

export TH2ETL_LOG_LEVELS="th2etl.scheduler:INFO,th2etl:WARNING"

This sets the logger for the th2etl.scheduler module to INFO, while setting the base th2etl logger (which other modules inherit from) to WARNING. This is useful for focusing on the scheduler's activity without being overwhelmed by pipeline execution details.

Output Storage

When pipelines are run by the scheduler, their output can be stored in a directory for later review.

  • pipelines_logs_dir: Set this environment variable to the path of a directory where you want to store the output of each pipeline run.

If this variable is set, a new subdirectory will be created for each run, named with the scheduler and a timestamp (e.g., five_minute_scheduler/20260515_103000). This folder is passed to the pipeline in the RunContext, and blocs can be designed to write their output there.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

th2etl-0.0.2.tar.gz (21.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

th2etl-0.0.2-py3-none-any.whl (31.7 kB view details)

Uploaded Python 3

File details

Details for the file th2etl-0.0.2.tar.gz.

File metadata

  • Download URL: th2etl-0.0.2.tar.gz
  • Upload date:
  • Size: 21.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for th2etl-0.0.2.tar.gz
Algorithm Hash digest
SHA256 5d02af8aeefb747167f537b5ac98e96b27bd15dbda72c44aedde65c55902398e
MD5 fb837e3655c5230b9891f925e126066f
BLAKE2b-256 5351c6a4e61c196e76c92702ef9c38ffcf81da8a4bda19c0de77b5273e99e48b

See more details on using hashes here.

Provenance

The following attestation bundles were made for th2etl-0.0.2.tar.gz:

Publisher: publish.yaml on thaink2/th2etl

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file th2etl-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: th2etl-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 31.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for th2etl-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 33aa4e893ba8565a8da59125705e4cffb62af6208d516d6ea16450953d833378
MD5 94257f493f52a46116b4f6f1d1e42c03
BLAKE2b-256 f2e7cfb03bdfe161027cf44e176066f46772b6374d3ba9b6cbbf0e0e4dc98f86

See more details on using hashes here.

Provenance

The following attestation bundles were made for th2etl-0.0.2-py3-none-any.whl:

Publisher: publish.yaml on thaink2/th2etl

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page