Skip to main content

airflow provider for intersystems

Project description

airflow-provider-iris

image

one one one one License

Table of Contents

  • Overview
  • Installation
  • Quick Start
  • IrisSQLOperator Parameters
  • Examples
    • 01_IRIS_Raw_SQL_Demo
    • 02_IRIS_ORM_Demo
    • 03_IRIS_Load_Synthetic_Data_Demo

Overview

InterSystems IRIS Provider for Apache Airflow enables seamless integration between Airflow workflows and the InterSystems IRIS data platform. It provides native connection support and operators for executing IRIS SQL and automating IRIS-driven tasks within modern ETL/ELT pipelines.

Designed for reliability and ease of use, this provider helps data engineers and developers build scalable, production-ready workflows for healthcare, interoperability, analytics, and enterprise data processing—powered by InterSystems IRIS.

Features

  • ✔️ IrisHook – for managing IRIS connections
  • ✔️ IrisSQLOperator – for running SQL queries
  • ✔️ Support for both SELECT/CTE and DML statements
  • ✔️ Native Airflow connection UI customization
  • ✔️ Examples for real-world ETL patterns

Installation

pip install airflow-provider-iris

Quick Start

Configure Connection in Airflow UI Go to Admin → Connections → Add Connection

  • Conn Id: Connection ID
  • Description : Connection Description
  • Conn Type: InterSystems IRIS
  • Host: IRIS server hostname
  • Username: User Name
  • Password: Password
  • Port : IRIS Superserver Port
  • Namespace : Namespace
image

Use your InterSystems IRIS connection by setting the iris_conn_id parameter in any of the provided operators.

In the example below, the IrisSQLOperator uses the iris_conn_id parameter to connect to the IRIS instance when the DAG is defined:

from airflow_provider_iris.operators.iris_operator import IrisSQLOperator

with DAG(
    dag_id="01_IRIS_Raw_SQL_Demo_Local",
    start_date=datetime(2025, 12, 1),
    schedule=None,
    catchup=False,
    tags=["iris-contest"],
) as dag:
    
    create_table = IrisSQLOperator(
        task_id="create_table",
        iris_conn_id="ContainerInstance",
        sql="""CREATE TABLE IF NOT EXISTS Test.AirflowDemo (
               ID INTEGER IDENTITY PRIMARY KEY,
               Message VARCHAR(200),
               RunDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )""",
    )

IrisSQLOperator Parameters

Parameter Description Type / Default Required
sql SQL query or template str Yes
iris_conn_id IRIS connection identifier str / iris_default Yes
task_id DAG task name str Yes
autocommit Commit changes automatically bool / True No
**kwargs Airflow BaseOperator arguments -- No

Examples

1. IRIS Raw SQL Demo

Usage of RAW SQL statements

# dags/01_IRIS_Raw_SQL_Demo.py
from datetime import datetime
from airflow import DAG
from airflow_provider_iris.operators.iris_operator import IrisSQLOperator

# ---------------------------------------------------------------------
# Example DAG showing how to run raw SQL statements on InterSystems IRIS
# using the IrisSQLOperator included in the provider.
# ---------------------------------------------------------------------

with DAG(
    dag_id="01_IRIS_Raw_SQL_Demo",
    start_date=datetime(2025, 12, 1),
    schedule=None,        # Run manually – no recurring schedule
    catchup=False,
    tags=["iris-contest"],
) as dag:
    
    # Create a simple table to store demo entries.
    # IF NOT EXISTS ensures the DAG can be re-run without errors.
    create_table = IrisSQLOperator(
        task_id="create_table",
        sql="""
            CREATE TABLE IF NOT EXISTS AirflowDemo.Test (
                ID INTEGER IDENTITY PRIMARY KEY,
                Message VARCHAR(200),
                RunDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """,
    )

    # Insert a sample row into the table.
    # This demonstrates a basic INSERT operation using the operator.
    insert = IrisSQLOperator(
        task_id="insert_row",
        sql="""
            INSERT INTO AirflowDemo.Test (Message)
            VALUES ('Hello from raw SQL operator')
        """,
    )

    # Retrieve all rows so the results appear in the Airflow logs.
    # Useful to confirm end-to-end connectivity with IRIS.
    select = IrisSQLOperator(
        task_id="select_rows",
        sql="""
            SELECT ID, Message, RunDate
            FROM AirflowDemo.Test
            ORDER BY ID DESC
        """,
    )

    # Task order: create table → insert row → select rows
    create_table >> insert >> select

2. IRIS ORM Demo

Uses SQLAlchemy + pandas with the only known reliable method for bulk inserts into IRIS.

# dags/02_IRIS_ORM_Demo.py

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np

# Import IRIS hook and SQLAlchemy components
from airflow_provider_iris.hooks.iris_hook import IrisHook
from sqlalchemy import Column, Integer, String, DateTime, Float
from sqlalchemy.orm import declarative_base

Base = declarative_base()

# ---------------------------------------------------------------------
# ORM MODEL
# Defines the structure of the AirflowDemo.ORMSales table in IRIS.
# ---------------------------------------------------------------------
class SalesRecord(Base):
    __tablename__ = "ORMSales"
    __table_args__ = {"schema": "AirflowDemo"}

    id        = Column(Integer, primary_key=True)
    region    = Column(String(50))
    amount    = Column(Float)
    sale_date = Column(DateTime)


# ---------------------------------------------------------------------
# TASK 1: Create table and insert synthetic sample records.
# Uses pandas.to_sql() with chunksize=1 → most consistent for IRIS.
# ---------------------------------------------------------------------
def create_and_insert_orm(**context):
    # If you use a non-default connection → ALWAYS pass iris_conn_id explicitly
    # e.g hook = IrisHook(iris_conn_id="iris_Connection_ID")
    hook = IrisHook()
    engine = hook.get_engine()

    # Ensure the table exists
    Base.metadata.create_all(engine)

    # ---- Generate synthetic generic sample data ----
    num_records = 5
    regions = [f"Region {i}" for i in range(1, num_records + 1)]

    sample_data = [
        {
            "region": region,
            "amount": round(np.random.uniform(5000, 50000), 2),
            "sale_date": pd.Timestamp("2025-12-01") + pd.Timedelta(days=i)
        }
        for i, region in enumerate(regions)
    ]

    df = pd.DataFrame(sample_data)

    # Insert rows → IRIS requires single-batch inserts for reliability
    df.to_sql(
        name="ORMSales",
        con=engine,
        schema="AirflowDemo",
        if_exists="append",
        index=False,
        method="multi",
        chunksize=1,   # key setting for IRIS compatibility
    )

    print(f"Inserted {len(df)} generated rows into AirflowDemo.ORMSales")


# ---------------------------------------------------------------------
# TASK 2: Query back data and print rows in Airflow logs.
# ---------------------------------------------------------------------
def query_orm(**context):
    hook = IrisHook()
    engine = hook.get_engine()

    df = pd.read_sql(
        "SELECT * FROM AirflowDemo.ORMSales ORDER BY id",
        engine
    )

    for _, r in df.iterrows():
        print(
            f"ORM → {int(r.id):>3} | "
            f"{r.region:<15} | "
            f"${r.amount:>10,.2f} | "
            f"{r.sale_date.date()}"
        )


# ---------------------------------------------------------------------
# DAG DEFINITION
# ---------------------------------------------------------------------
with DAG(
    dag_id="02_IRIS_ORM_Demo",
    start_date=datetime(2025, 12, 1),
    schedule=None,
    catchup=False,
    tags=["iris-contest", "orm"],
) as dag:

    orm_create = PythonOperator(
        task_id="orm_create_and_insert",
        python_callable=create_and_insert_orm,
    )

    orm_read = PythonOperator(
        task_id="orm_read",
        python_callable=query_orm,
    )

    orm_create >> orm_read

3. Synthetic Sales Pipeline

Generate realistic sales data and load efficiently.

# dags/03_IRIS_Load_Synthetic_Data_Demo

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np
from airflow_provider_iris.hooks.iris_hook import IrisHook
from sqlalchemy import Column, Integer, String, DateTime, Float
from sqlalchemy.orm import declarative_base

Base = declarative_base()

# ---------------------------------------------------------------------
# ORM model representing a generic sales table in the "AirflowDemo" schema.
# Reusable for both table creation and data insertion.
# ---------------------------------------------------------------------
class SalesRecord(Base):
    __tablename__ = "BulkSales"
    __table_args__ = {"schema": "AirflowDemo"}

    id        = Column(Integer, primary_key=True)
    region    = Column(String(50))
    amount    = Column(Float)
    sale_date = Column(DateTime)


# ---------------------------------------------------------------------
# Generate synthetic sales data for testing or demo purposes.
# Supports dynamic number of rows and produces realistic random data.
# ---------------------------------------------------------------------
def generate_synthetic_sales(num_rows=500):
    """Create synthetic sales data as a pandas DataFrame."""
    
    regions = [
        "North America", "South America", "Europe",
        "Asia-Pacific", "Middle East", "Africa"
    ]

    # Randomly pick a region for each row
    region_data = np.random.choice(regions, size=num_rows)

    # Random sales amounts between 10k and 120k
    amounts = np.random.uniform(10000, 120000, size=num_rows).round(2)

    # Random sale dates within last 30 days
    start_date = datetime(2025, 11, 1)
    sale_dates = [
        start_date + timedelta(days=int(x)) 
        for x in np.random.randint(0, 30, size=num_rows)
    ]

    # Construct a DataFrame
    df = pd.DataFrame({
        "region": region_data,
        "amount": amounts,
        "sale_date": sale_dates
    })

    return df


# ---------------------------------------------------------------------
# Airflow task: bulk load synthetic sales data into IRIS.
# ---------------------------------------------------------------------
def bulk_load_synthetic_sales(**context):

    # Generate synthetic dataset
    df = generate_synthetic_sales(num_rows=200)

    # Create SQLAlchemy engine via IRIS hook
    # If you use a non-default connection → ALWAYS pass iris_conn_id explicitly
    # e.g hook = IrisHook(iris_conn_id="iris_Connection_ID")
    hook = IrisHook()
    engine = hook.get_engine()

    # Ensure table exists
    Base.metadata.create_all(engine)

    # Bulk insert into IRIS
    df.to_sql(
        "BulkSales",
        con=engine,
        schema="AirflowDemo",
        if_exists="append",
        index=False
    )

    print(f"Bulk loaded {len(df)} synthetic rows into AirflowDemo.BulkSales")


# ---------------------------------------------------------------------
# DAG definition
# Demonstrates ETL-style bulk load of synthetic sales data into IRIS.
# ---------------------------------------------------------------------
with DAG(
    dag_id="03_IRIS_Load_Synthetic_Data_Demo",
    start_date=datetime(2025, 12, 1),
    schedule=None,
    catchup=False,
    tags=["iris-contest", "etl", "synthetic"],
) as dag:

    bulk_task = PythonOperator(
        task_id="bulk_load_synthetic_sales",
        python_callable=bulk_load_synthetic_sales
    )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_iris-0.2.8.tar.gz (11.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_iris-0.2.8-py3-none-any.whl (8.7 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_iris-0.2.8.tar.gz.

File metadata

  • Download URL: airflow_provider_iris-0.2.8.tar.gz
  • Upload date:
  • Size: 11.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for airflow_provider_iris-0.2.8.tar.gz
Algorithm Hash digest
SHA256 2449da423f8499c897e9cd931d1dc95184fb2b0b04cad9ce01f61318e610ba17
MD5 5bf02066e6c8e0ae2179282f80daa543
BLAKE2b-256 be9a2492b339bafcb76e27a616c81eed970478f9ec7481ff2763200f7d00a170

See more details on using hashes here.

File details

Details for the file airflow_provider_iris-0.2.8-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_iris-0.2.8-py3-none-any.whl
Algorithm Hash digest
SHA256 c22881b7cc549fe6530f6bc2796139fd7f3e2125ab147fed044c422dabc5a0e5
MD5 6e8f3f6064e19de8979484750487a9da
BLAKE2b-256 6e2fac0a9942c8def2f353868899334dec465623439e38d4920eefa4caf1199d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page