No project description provided
Project description
Dataverk airflow
Enkelt wrapperbibliotek rundt KubernetesPodOperator som lager Airflow task som kjører i en Kubernetes pod.
Våre operators
Alle våre operators lar deg klone et annet repo enn der DAGene er definert, bare legg det til med repo="navikt/<repo>
.
Vi har også støtte for å installere Python pakker ved oppstart av Airflow task, spesifiser requirements.txt
-filen din med requirements_path="/path/to/requirements.txt"
.
Merk at hvis du kombinerer repo
og requirements_path
, må requirements.txt
ligge i repoet nevnt i repo
.
Quarto operator
Denne kjører Quarto render for deg. Man finner Quarto-token for ditt teamet i Datamarkedsplassen.
I eksempelt under lagrere vi tokenet i en Airflow variable som så brukes i DAG tasken under. Se offisiell Airflow dokumentasjon for hvordan man bruker `Variable.get()´ i en task.
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from dataverk_airflow import quarto_operator
with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
t1 = quarto_operator(dag=dag,
name="<navn-på-task>",
repo="navikt/<repo>",
quarto={
"path": "/path/to/index.qmd",
"env": "dev/prod",
"id":"uuid",
"token": Variable.get("quarto_token"),
},
slack_channel="<#slack-alarm-kanal>")
Notebook operator
Denne lar deg kjøre en Jupyter notebook.
from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import notebook_operator
with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
t1 = notebook_operator(dag=dag,
name="<navn-på-task>",
repo="navikt/<repo>",
nb_path="/path/to/notebook.ipynb",
slack_channel="<#slack-alarm-kanal>")
Python operator
Denne lar deg kjøre vilkårlig Python-scripts.
from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import python_operator
with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
t1 = python_operator(dag=dag,
name="<navn-på-task>",
repo="navikt/<repo>",
script_path="/path/to/script.py",
slack_channel="<#slack-alarm-kanal>")
Kubernetes operator
Vi tilbyr også vår egen Kubernetes operator som kloner et valg repo inn i containeren.
from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import kubernetes_operator
with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
t1 = kubernetes_operator(dag=dag,
name="<navn-på-task>",
repo="navikt/<repo>",
cmds=["/path/to/bin/", "script-name.sh", "argument1", "argument2"],
image="europe-north1-docker.pkg.dev/nais-management-233d/ditt-team/ditt-image:din-tag",
slack_channel="<#slack-alarm-kanal>")
Denne operatoren har støtte for to ekstra flagg som ikke er tilgjengelig fra de andre.
cmds: str: Command to run in pod
working_dir: str: Path to working directory
Felles argumenter
Alle operatorene våre har støtte for disse argumentene i funksjonskallet.
dag: DAG: owner DAG
name: str: Name of task
repo: str: Github repo
image: str: Dockerimage the pod should use
branch: str: Branch in repo, default "main"
email: str: Email of owner
slack_channel: str: Name of Slack channel, default None (no Slack notification)
extra_envs: dict: dict with environment variables example: {"key": "value", "key2": "value2"}
allowlist: list: list of hosts and port the task needs to reach on the format host:port
requirements_path: bool: Path (including filename) to your requirements.txt
resources: dict: Specify required cpu and memory requirements (keys in dict: request_memory, request_cpu, limit_memory, limit_cpu), default None
startup_timeout_seconds: int: pod startup timeout
retries: int: Number of retries for task before DAG fails, default 3
delete_on_finish: bool: Whether to delete pod on completion
retry_delay: timedelta: Time inbetween retries, default 5 seconds
do_xcom_push: bool: Enable xcom push of content in file "/airflow/xcom/return.json", default False
on_success_callback:: func: a function to be called when a task instance of this task succeeds
Sette resource requirements
Vi har støtte for å sette requests
og limits
for hver operator.
Merk at man ikke trenger å sette limits
på CPU da dette blir automatisk løst av plattformen.
Ved å bruke ephemeral-storage
kan man be om ekstra diskplass for lagring i en task.
from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import python_operator
with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
t1 = python_operator(dag=dag,
name="<navn-på-task>",
repo="navikt/<repo>",
script_path="/path/to/script.py",
resources={
"requests": {
"memory": "50Mi",
"cpu": "100m",
"ephemeral-storage": "1Gi"
},
"limits": {
"memory": "100Mi"
}
})
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file dataverk_airflow-1.0.6.tar.gz
.
File metadata
- Download URL: dataverk_airflow-1.0.6.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8cad79f13b3d3bfb46529c315c89c4e9fce2e3839f7e2cb799610248044c34fc |
|
MD5 | eea9a99e18d56c71fa155e79b2b5d5c3 |
|
BLAKE2b-256 | e4f2030082e40be3f7535bf614e40ca27009bd391e0e62aaa6525bca5658f9b6 |
Provenance
File details
Details for the file dataverk_airflow-1.0.6-py3-none-any.whl
.
File metadata
- Download URL: dataverk_airflow-1.0.6-py3-none-any.whl
- Upload date:
- Size: 13.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 95f57552fabfdafbb017c83b419fc1d54ac31616366369134c6ebd934e91e71d |
|
MD5 | d73003d95ba72059c16aa6ee9a8ef288 |
|
BLAKE2b-256 | 04f651bf5696824d19d0e0589be1a9c91a2929c4ed44bc7b9bfe2e97adbec835 |