A flexible ETL framework with a database-driven scheduler, extensible pipeline blocs, and a RESTful API.
Project description
th2etl
thaink2 in house built ETL and automatisation library
Design
A pipeline is modeled as a set of interdependent blocs. Each bloc is one of:
LoaderBloc— loads or extracts raw dataTransformerBloc— transforms data between stagesExporterBloc— exports or writes output
Dependencies between blocs are resolved before execution, so the pipeline runs in dependency order.
Loader Blocs
The following loader blocs are available:
CsvLoaderBloc: Loads data from a CSV file.bloc_type:csv_loader- Config:
file_path(required): The path to the CSV file.delimiter(optional): The delimiter character (default:,).
PostgresLoaderBloc: Loads data from a PostgreSQL database.bloc_type:postgres_loader- Config:
query(required): The SQL query to execute.
ApiLoaderBloc: Loads data from a web API.bloc_type:api_loader- Config:
url(required): The API endpoint URL.method(optional): The HTTP method (default:GET).params(optional): A dictionary of URL parameters.headers(optional): A dictionary of HTTP headers.json(optional): A dictionary for the JSON request body.
Transformer Blocs
The following transformer blocs are available:
RunAdkAgentsBloc: Runs an ADK agent via an API call.bloc_type:run_adk_agents- Config:
base_url(required): The base URL of the agent API (e.g.,https://api-agent-dev.thaink2.fr).agent_id(required): The ID of the agent to run (e.g.,database_assistant).user_id(required): The user's ID (e.g., email), used for authentication.message_text(required): The text message to send to the agent.
RefreshWebhooksBloc: Refreshes webhooks via an API call.bloc_type:refresh_webhooks- Config:
url(required): The API endpoint for refreshing webhooks.user_id(required): The user's ID (e.g., email), used for authentication.
API Service
The application includes a FastAPI-based API for managing resources.
To run the API server, use the --serve-api command:
th2etl --serve-api
You can also specify the host and port:
th2etl --serve-api --host 0.0.0.0 --port 8080
The API documentation will be available at http://127.0.0.1:8000/docs when the server is running.
When you run the API server, the scheduler manager will also start automatically in the background.
Health Check
You can monitor the status of the service, including its connection to the database and the status of the scheduler, by sending a GET request to the /health endpoint.
curl http://127.0.0.1:8000/health
If the service is running and all components are healthy, it will return a 200 OK response. If any component is down, it will return a 503 Service Unavailable error.
Usage
Run the scheduler as a standalone process:
python -m th2etl.runner
Run the scheduler in a separate isolated session:
python -m th2etl.runner --background
Pass environment variables into the isolated session:
python -m th2etl.runner --background --env SOURCE=prod --env DESTINATION=warehouse
If the package is installed, use the CLI entry point:
th2etl
This will start the scheduler by default.
To start the scheduler in the background:
th2etl --background
To start the API service in the background:
th2etl --serve-api --background
Quickstart
Create pipeline metadata from the command line and then start the ETL service.
- Set your PostgreSQL database settings in environment variables:
$env:DATABASE_HOST = "localhost"
$env:DATABASE_PORT = "5432"
$env:DATABASE_NAME = "th2etl"
$env:DATABASE_USER = "etl_user"
$env:DATABASE_PASSWORD = "secret"
- Create blocs in the database:
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
storage.create_bloc('example_loader','example_loader',dependencies=[],config={'source':'csv'})
storage.create_bloc('example_transformer','example_transformer',dependencies=['example_loader'],config={'factor':2})
storage.create_bloc('example_exporter','example_exporter',dependencies=['example_transformer'],config={'destination':'stdout'})"
- Create a trigger for your pipeline:
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
storage.create_trigger('every_hour','example_pipeline','0 * * * *')"
- Create the pipeline and optional scheduler:
python -c "from th2etl import DatabaseStorage; from th2etl.configs.settings import get_settings; s = get_settings();
with DatabaseStorage.from_settings(s) as storage:
storage.create_pipeline('example_pipeline',['example_loader','example_transformer','example_exporter'])
storage.create_scheduler('example_scheduler','example_pipeline','every_hour')"
- Start the ETL service:
th2etl
Or in the background:
th2etl --background
Persistent Storage
Use DatabaseStorage to persist bloc, pipeline, trigger, and scheduler definitions.
from th2etl import DatabaseStorage
from th2etl.configs.settings import get_settings
settings = get_settings()
with DatabaseStorage.from_settings(settings) as storage:
storage.create_bloc("example_loader", "example_loader", dependencies=[], config={"source": "csv"})
storage.create_bloc("example_transformer", "example_transformer", dependencies=["example_loader"], config={"factor": 2})
storage.create_bloc("example_exporter", "example_exporter", dependencies=["example_transformer"], config={"destination": "stdout"})
storage.create_pipeline("example_pipeline", ["example_loader", "example_transformer", "example_exporter"])
storage.create_trigger("every_hour", "example_pipeline", "0 * * * *")
storage.create_scheduler("example_scheduler", "example_pipeline", "every_hour")
print(storage.list_pipelines())
print(storage.list_schedulers())
The Settings object reads database connection details from environment variables such as DATABASE_HOST, DATABASE_PORT, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, and optionally DATABASE_SSL_MODE. You can also provide DATABASE_URL directly.
Scheduler
Use CronTrigger and CronScheduler to run a pipeline on a cron-like schedule:
from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import CronScheduler, CronTrigger
pipeline = build_example_pipeline()
trigger = CronTrigger("*/5 * * * *")
scheduler = CronScheduler(pipeline, trigger)
scheduler.start()
For multiple pipelines with independent trigger schedules, use SchedulerManager so each pipeline can run on its own cadence in parallel:
from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import CronTrigger, CronScheduler, SchedulerManager
pipeline1 = build_example_pipeline()
pipeline2 = build_example_pipeline()
scheduler1 = CronScheduler(pipeline1, CronTrigger("0 * * * *"), name="hourly_pipeline")
scheduler2 = CronScheduler(pipeline2, CronTrigger("*/5 * * * *"), name="five_minute_pipeline")
manager = SchedulerManager([scheduler1, scheduler2])
manager.start()
If you persist scheduler metadata in the database, load scheduled pipelines dynamically from DatabaseStorage:
from th2etl import DatabaseStorage
from th2etl.configs.settings import get_settings
from th2etl.scheduler import load_scheduler_manager
settings = get_settings()
with DatabaseStorage.from_settings(settings) as storage:
manager = load_scheduler_manager(storage)
manager.start()
Or create a scheduler helper directly:
from th2etl.pipelines import build_example_pipeline
from th2etl.scheduler import schedule_pipeline
pipeline = build_example_pipeline()
scheduler = schedule_pipeline(pipeline, "0 * * * *")
scheduler.start()
Logging
The application uses Python's standard logging module. You can control the log verbosity and output location using environment variables.
Environment Variables
LOG_DIR: If set to a path (e.g.,logs), separate log files will be created in that directory for each main module (scheduler.log,api.log, etc.), along with a generalth2etl.logfile.LOG_LEVEL: Sets the global log level. Defaults toINFO. Can be set toDEBUG,INFO,WARNING,ERROR.LOG_LEVELS: Provides fine-grained control over different parts of the application. This is a comma-separated list oflogger_name:LEVEL.
Example Usage
To save logs to a logs directory with separate files for each module, you can set the following environment variables:
export LOG_DIR="logs"
export LOG_LEVELS="th2etl.scheduler:INFO,th2etl:WARNING"
This configuration will:
- Create a
logsdirectory. - Create log files like
scheduler.log,api.log, andth2etl.loginside it. - Log detailed messages from the scheduler to
scheduler.log. - Only show warnings and errors from other modules in their respective files and the console.
Output Storage
When pipelines are run by the scheduler, their output can be stored in a directory for later review.
pipelines_logs_dir: Set this environment variable to the path of a directory where you want to store the output of each pipeline run.
If this variable is set, a new subdirectory will be created for each run, named with the scheduler and a timestamp (e.g., five_minute_scheduler/20260515_103000). This folder is passed to the pipeline in the RunContext, and blocs can be designed to write their output there.
API Client
A generic HttpClient is available in th2etl.helpers.client to simplify making API calls to external services.
Usage
You can create a client for any service by providing its base URL.
from th2etl.helpers.client import HttpClient
# Create a client for the JSONPlaceholder API
client = HttpClient(base_url="https://jsonplaceholder.typicode.com")
# Make a GET request
posts = client.get("/posts")
print(f"Found {len(posts)} posts.")
# Make a POST request
new_post = {
"title": "foo",
"body": "bar",
"userId": 1,
}
created_post = client.post("/posts", json_data=new_post)
print(f"Created new post with ID: {created_post['id']}")
You can also include an authentication token when creating the client:
secure_client = HttpClient(
base_url="https://api.example.com",
auth_token="your-secret-token"
)
th2etl API Client
A dedicated client for the th2etl API is available in th2etl.helpers.th2etl_client. This client provides convenient methods for all the API's endpoints.
from th2etl.helpers.th2etl_client import Th2etlClient
client = Th2etlClient()
# Check the health of the service
health = client.health_check()
print(f"Service status: {health['status']}")
# Create a new bloc
new_bloc = client.create_bloc(
name="my-new-bloc",
bloc_type="csv_loader",
config={"file_path": "data.csv"},
)
print(f"Created bloc: {new_bloc}")
# List all pipelines
pipelines = client.list_pipelines()
print(f"Found {len(pipelines)} pipelines.")
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file th2etl-0.0.5.tar.gz.
File metadata
- Download URL: th2etl-0.0.5.tar.gz
- Upload date:
- Size: 24.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f50be275c47461514429078f514e6cbbf6cadb6989e42b4430d6e11434b0bbd6
|
|
| MD5 |
7417339b7a4807de4800ee4b97a0f58f
|
|
| BLAKE2b-256 |
2e82d0aec49d25253919f39bc0bd209842b638821ac49ec88214e6d53e259e42
|
Provenance
The following attestation bundles were made for th2etl-0.0.5.tar.gz:
Publisher:
publish.yaml on thaink2/th2etl
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
th2etl-0.0.5.tar.gz -
Subject digest:
f50be275c47461514429078f514e6cbbf6cadb6989e42b4430d6e11434b0bbd6 - Sigstore transparency entry: 1554374643
- Sigstore integration time:
-
Permalink:
thaink2/th2etl@06ac3a9047293e9ecbea31fff567fa33f9f6345c -
Branch / Tag:
refs/tags/v0.0.5 - Owner: https://github.com/thaink2
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@06ac3a9047293e9ecbea31fff567fa33f9f6345c -
Trigger Event:
release
-
Statement type:
File details
Details for the file th2etl-0.0.5-py3-none-any.whl.
File metadata
- Download URL: th2etl-0.0.5-py3-none-any.whl
- Upload date:
- Size: 35.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6d85b9e06441ecb01d6c6a828d16e3d3c966262d668e7298acc8c3d3d3e9a54
|
|
| MD5 |
57a6e4451b2d7667c0d3c7cae313d72e
|
|
| BLAKE2b-256 |
a5ed90257f15d131c135be92a0e2032516a974d46eb53e8ae1ae88d9fb99446b
|
Provenance
The following attestation bundles were made for th2etl-0.0.5-py3-none-any.whl:
Publisher:
publish.yaml on thaink2/th2etl
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
th2etl-0.0.5-py3-none-any.whl -
Subject digest:
a6d85b9e06441ecb01d6c6a828d16e3d3c966262d668e7298acc8c3d3d3e9a54 - Sigstore transparency entry: 1554374653
- Sigstore integration time:
-
Permalink:
thaink2/th2etl@06ac3a9047293e9ecbea31fff567fa33f9f6345c -
Branch / Tag:
refs/tags/v0.0.5 - Owner: https://github.com/thaink2
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@06ac3a9047293e9ecbea31fff567fa33f9f6345c -
Trigger Event:
release
-
Statement type: