Skip to main content

Airflow operators for PostgreSQL <-> CSV file transfers using COPY

Project description

airflow-postgres-csv

lint tests codecov python airflow PyPI Downloads

Airflow operators for bulk PostgreSQL <-> CSV transfers using COPY. Supports Airflow 2.9+ and Airflow 3.

Operators

  • PostgresToCsvOperator - Run a SQL query and export results to a CSV file
  • CsvToPostgresOperator - Load a CSV file into a PostgreSQL table

Both use PostgreSQL's COPY command for maximum throughput.

Installation

pip install airflow-postgres-csv

Usage

from airflow_postgres_csv import PostgresToCsvOperator, CsvToPostgresOperator

# Export query results to CSV
export_task = PostgresToCsvOperator(
    task_id="export_users",
    conn_id="my_postgres",
    sql="SELECT * FROM users WHERE active = %(active)s",
    parameters={"active": True},
    csv_file_path="/tmp/users.csv",
)

# Load CSV into a table (with truncate)
import_task = CsvToPostgresOperator(
    task_id="import_users",
    conn_id="my_postgres",
    table_name="staging.users",
    csv_file_path="/tmp/users.csv",
    truncate=True,
)

SQL from file

The sql parameter supports multiple formats:

# Inline SQL
PostgresToCsvOperator(sql="SELECT * FROM users", ...)

# Relative path (loaded by Airflow from DAG folder or template_searchpath)
PostgresToCsvOperator(sql="sql/export_users.sql", ...)

# Absolute path (loaded directly)
PostgresToCsvOperator(sql="/opt/airflow/sql/export_users.sql", ...)

Gzip compression

Both operators support gzip compression for large files:

# Export to gzip
PostgresToCsvOperator(
    sql="SELECT * FROM large_table",
    csv_file_path="/tmp/data.csv.gz",
    compression="gzip",
    ...
)

# Import from gzip
CsvToPostgresOperator(
    csv_file_path="/tmp/data.csv.gz",
    compression="gzip",
    ...
)

Parameters

PostgresToCsvOperator

Parameter Description Default
conn_id Airflow Postgres connection ID required
csv_file_path Output file path (templated) required
sql SQL query string, or path to .sql file required
parameters Dict passed to cursor.mogrify {}
has_header Include CSV header row True
compression Compression format ("gzip" or None) None
timeout Query timeout in minutes 60
count_lines Count and log the number of lines in the CSV after writing True

CsvToPostgresOperator

Parameter Description Default
conn_id Airflow Postgres connection ID required
table_name Target table (templated, supports schema.table) required
csv_file_path Input file path (templated) required
columns Explicit column list None
has_header CSV has header row True
truncate Truncate table before loading False
compression Compression format ("gzip" or None) None
delimiter CSV delimiter ","
quote_char CSV quote character '"'
null_string String representing NULL ""
timeout Query timeout in minutes 60
count_lines Count and log the number of lines in the CSV before loading True

Development

Running tests

Tests can be run against both supported Airflow versions using tox:

pip install tox

tox -e airflow2   # test against Airflow 2.x
tox -e airflow3   # test against Airflow 3.x
tox               # run both

Each environment installs the correct Airflow and provider versions automatically — no manual dependency management needed.

Requirements

Airflow 2 Airflow 3
apache-airflow >=2.9, <3.0 >=3.0
apache-airflow-providers-postgres >=5.0, <6.0 >=6.0
Python 3.10 – 3.12 3.10 – 3.12

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_postgres_csv-0.3.2.tar.gz (182.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_postgres_csv-0.3.2-py3-none-any.whl (6.6 kB view details)

Uploaded Python 3

File details

Details for the file airflow_postgres_csv-0.3.2.tar.gz.

File metadata

  • Download URL: airflow_postgres_csv-0.3.2.tar.gz
  • Upload date:
  • Size: 182.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for airflow_postgres_csv-0.3.2.tar.gz
Algorithm Hash digest
SHA256 0a1d6f11b99e3ab4c39a633735daecab08904813771e6a892f92413a9fd343f5
MD5 82a73bcc615da20d29188619a1254a28
BLAKE2b-256 97b7095b5cf8575801f49c7d3abf3a8f982353c97ed9a0a6d87cfe438814c136

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_postgres_csv-0.3.2.tar.gz:

Publisher: publish.yml on Redevil10/airflow-postgres-csv

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_postgres_csv-0.3.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_postgres_csv-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8773daabbe7a06233d3a2c5297e0ce66722b9d10e8ec1c2c906baca5f3520379
MD5 2504cce427ed5fa04c08f1af2987d52a
BLAKE2b-256 2d0bf767a8ff33f938447f1efdb0b84d4f71658b5a867164bdde5777b6cfdc12

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_postgres_csv-0.3.2-py3-none-any.whl:

Publisher: publish.yml on Redevil10/airflow-postgres-csv

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page