Skip to main content

No project description provided

Project description

Dataverk airflow

Enkelt wrapperbibliotek rundt KubernetesPodOperator som lager Airflow task som kjører i en Kubernetes pod.

Våre operators

Alle våre operators lar deg klone et annet repo enn der DAGene er definert, bare legg det til med repo="navikt/<repo>.

Vi har også støtte for å installere Python pakker ved oppstart av Airflow task, spesifiser requirements.txt-filen din med requirements_path="/path/to/requirements.txt". Merk at hvis du kombinerer repo og requirements_path, må requirements.txt ligge i repoet nevnt i repo.

Quarto operator (datafortelling)

Denne kjører quarto render for deg, som lager en HTML-fil som kan lastes opp til Datamarkedsplassen.

Vi har støtte for enkeltfiler, og kataloger, dette kan du spesifisere med path for enkeltfiler, og folder hvis du har et Quarto prosjekt i en katalog. Quarto prosjekter brukes hovedsakelig for book, website, eller dashboard. Enkeltfiler bygges self-contained, som betyr at HTML-filen blir bygd med alle sine eksterne avhengighter (Javascript, CSS, og bilder).

For å laste opp filer til Datamarkedsplassen må man ha et Quarto-token, som er unikt per team. Dette finner man under Mine teams token i menyen.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from dataverk_airflow import quarto_operator


with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
    t1 = quarto_operator(dag=dag,
                         name="<navn-på-task>",
                         repo="navikt/<repo>",
                         quarto={
                             "path": "/path/to/index.qmd",
                             "env": "dev/prod",
                             "id":"uuid",
                             "token": Variable.get("quarto_token"),
                         },
                         slack_channel="<#slack-alarm-kanal>")

Har du behov for å rendre noe annet enn html, kan du bruke verdien format. Dette må du for eksempel gjøre hvis du ønsker å lage et dashboard.

with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
    t1 = quarto_operator(dag=dag,
                         name="<navn-på-task>",
                         repo="navikt/<repo>",
                         quarto={
                             "folder": "/path/to/book",
                             "format": "dashboard",
                             "env": "dev/prod",
                             "id":"uuid",
                             "token": Variable.get("quarto_token"),
                         },
                         slack_channel="<#slack-alarm-kanal>")

I eksemplene over lagrer vi tokenet i en Airflow variable som så brukes i DAG tasken under. Se offisiell Airflow dokumentasjon for hvordan man bruker `Variable.get()´ i en task.

Notebook operator

Denne lar deg kjøre en Jupyter notebook.

from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import notebook_operator


with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
    t1 = notebook_operator(dag=dag,
                           name="<navn-på-task>",
                           repo="navikt/<repo>",
                           nb_path="/path/to/notebook.ipynb",
                           slack_channel="<#slack-alarm-kanal>")

Python operator

Denne lar deg kjøre vilkårlig Python-scripts.

from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import python_operator


with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
    t1 = python_operator(dag=dag,
                         name="<navn-på-task>",
                         repo="navikt/<repo>",
                         script_path="/path/to/script.py",
                         slack_channel="<#slack-alarm-kanal>")

Kubernetes operator

Vi tilbyr også vår egen Kubernetes operator som kloner et valg repo inn i containeren.

from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import kubernetes_operator


with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
    t1 = kubernetes_operator(dag=dag,
                             name="<navn-på-task>",
                             repo="navikt/<repo>",
                             cmds=["/path/to/bin/", "script-name.sh", "argument1", "argument2"],
                             image="europe-north1-docker.pkg.dev/nais-management-233d/ditt-team/ditt-image:din-tag",
                             slack_channel="<#slack-alarm-kanal>")

Denne operatoren har støtte for to ekstra flagg som ikke er tilgjengelig fra de andre.

cmds: str: Command to run in pod
working_dir: str: Path to working directory

Allow list

Alle operators støtter å sette allow list, men det er noen adresser som blir lagt til av Dataverk Airflow.

Hvis du bruker slack_channel argumentet, vil vi legge til:

  • hooks.slack.com

Hvis du bruker email argumentet, vil vi legge til:

  • Riktig SMTP-adresse

Hvis du bruker requirements_path argumentet, vil vi legge til:

  • pypi.org
  • files.pythonhosted.org
  • pypi.python.org

For quarto_operator vil vi legge til:

  • Adressen til riktig Datamarkedsplass
  • cdnjs.cloudflare.com

Felles argumenter

Alle operatorene våre har støtte for disse argumentene i funksjonskallet.

dag: DAG: owner DAG
name: str: Name of task
repo: str: Github repo
image: str: Dockerimage the pod should use
branch: str: Branch in repo, default "main"
email: str: Email of owner
slack_channel: str: Name of Slack channel, default None (no Slack notification)
extra_envs: dict: dict with environment variables example: {"key": "value", "key2": "value2"}
allowlist: list: list of hosts and port the task needs to reach on the format host:port
requirements_path: bool: Path (including filename) to your requirements.txt
python_version: str: Desired Python version for the environment your code will be running in when using the default image. We offer only supported versions of Python, and default to the latest version if this parameter is omitted. See https://devguide.python.org/versions/ for available versions.
resources: dict: Specify required cpu and memory requirements (keys in dict: request_memory, request_cpu, limit_memory, limit_cpu), default None
startup_timeout_seconds: int: pod startup timeout
retries: int: Number of retries for task before DAG fails, default 3
delete_on_finish: bool: Whether to delete pod on completion
retry_delay: timedelta: Time inbetween retries, default 5 seconds
do_xcom_push: bool: Enable xcom push of content in file "/airflow/xcom/return.json", default False
on_success_callback:: func: a function to be called when a task instance of this task succeeds

Sette resource requirements

Vi har støtte for å sette requests og limits for hver operator. Merk at man ikke trenger å sette limits på CPU da dette blir automatisk løst av plattformen.

Ved å bruke ephemeral-storage kan man be om ekstra diskplass for lagring i en task.

from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import python_operator


with DAG('navn-dag', start_date=days_ago(1), schedule_interval="*/10 * * * *") as dag:
    t1 = python_operator(dag=dag,
                         name="<navn-på-task>",
                         repo="navikt/<repo>",
                         script_path="/path/to/script.py",
                         resources={
                             "requests": {
                                 "memory": "50Mi",
                                 "cpu": "100m",
                                 "ephemeral-storage": "1Gi"
                             },
                             "limits": {
                                 "memory": "100Mi"
                             }
                         })

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataverk_airflow-1.4.4.tar.gz (11.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataverk_airflow-1.4.4-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file dataverk_airflow-1.4.4.tar.gz.

File metadata

  • Download URL: dataverk_airflow-1.4.4.tar.gz
  • Upload date:
  • Size: 11.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.7

File hashes

Hashes for dataverk_airflow-1.4.4.tar.gz
Algorithm Hash digest
SHA256 922537c94645959c865d017a41eae48123e692429d671d45bf61852390221cd1
MD5 16478bfd3130ac943dac26cf6bda5d3b
BLAKE2b-256 d1c04067e92913641c6923027500ecc3dd8a1b3bdbb0761241a3dff4e7c0b896

See more details on using hashes here.

File details

Details for the file dataverk_airflow-1.4.4-py3-none-any.whl.

File metadata

File hashes

Hashes for dataverk_airflow-1.4.4-py3-none-any.whl
Algorithm Hash digest
SHA256 6a352434ae783876c4f41e61cf33fccbbabd7079d1cfb96965b130c8e1e1a58a
MD5 91a22fbe8d4a587e461edfa4bb49719e
BLAKE2b-256 1bec7371257cebdb79f5ab12afcb1f090750dc64976fa5a96970ce0e5234c185

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page