Provider for Apache Airflow. Implements apache-airflow-providers-mnb package
Project description
MNB Provider for Apache Airflow
This package contains all the necessary operators, hooks and sensors necessary to access the daily exchange rates published by MNB (Central Bank of Hungary) in Apache Airflow.
Installation
You can install it with any package manager compatible with PyPI:
pip install apache-airflow-provider-mnb
Components
Component | Notes |
---|---|
MnbHook |
Implements low-level functions to interact with MNB's API |
MnbExchangeRateOperator |
Provides access to the exchange rates published on a specific day |
MnbExchangeRateSensor |
Senses whether the rates were already published for a specific day |
Example
This following example demonstrates a typical use case:
- Start running at 8:00AM every day
- Check if the exchange rates were already published (repeat every 10 minutes)
- Permanently fail after 3 hours if the rates were never published (there are no rates available on the weekends and public holidays)
- Generate the SQL commands
- Run them against a PostgreSQL database
from datetime import date
import json
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.mnb.sensors.mnb import MnbExchangeRateSensor
from airflow.providers.mnb.operators.mnb import MnbExchangeRateOperator
@dag(
dag_id="refresh_currency_rates",
description="Refresh currency exchange rates",
schedule="0 8 * * *",
start_date=pendulum.datetime(2023, 1, 16, tz="Europe/Budapest"),
catchup=False
)
def mnb():
is_exchange_rate_available = MnbExchangeRateSensor(
task_id="is_exchange_rate_available",
timeout=10800,
poke_interval=600,
date="{{ ds }}"
)
exchange_rates = MnbExchangeRateOperator(
task_id="get_exchange_rates",
date="{{ ds }}"
)
@task
def generate_queries(exchange_rates: str):
rates = json.loads(exchange_rates)
queries = ""
mnb_date = rates["date"]
for rate in rates["rates"]:
currency_id = rate["currency"]
mnb_rate = rate["rate"]
queries += f"UPDATE finance.currency SET mnb_rate = '{mnb_rate}', mnb_date = '{mnb_date}' WHERE currency_id = '{currency_id}';\n"
return queries
is_exchange_rate_available >> exchange_rates
queries = generate_queries(exchange_rates.output)
PostgresOperator(
task_id="update_exchange_rates",
postgres_conn_id="postgres_default",
database="erp",
sql=queries)
mnb()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file apache_airflow_providers_mnb-1.0.2.tar.gz
.
File metadata
- Download URL: apache_airflow_providers_mnb-1.0.2.tar.gz
- Upload date:
- Size: 4.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.10.6 Linux/5.15.79.1-microsoft-standard-WSL2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8a286cbdc16d8115a03ca9813e5926730b92ae77af455dd82f0033c63e806c6f |
|
MD5 | a5b752ec090d16e024ef04159b64337d |
|
BLAKE2b-256 | c8b56ccb474582cc77620af0ff369843298d5beebf1cc29cd9025954f209eb2c |
File details
Details for the file apache_airflow_providers_mnb-1.0.2-py3-none-any.whl
.
File metadata
- Download URL: apache_airflow_providers_mnb-1.0.2-py3-none-any.whl
- Upload date:
- Size: 6.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.10.6 Linux/5.15.79.1-microsoft-standard-WSL2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8b0e188aa082ccb55b0457e111ac72058fc7894ecf6650ede26e18f32fd7c873 |
|
MD5 | 51d142b9552ce84edf60faecc11ee69d |
|
BLAKE2b-256 | 50beaaf1fd6135d0e867db1238fc4b092402cfc8f1c6f7148d251e3253b3f76a |