Skip to main content

Airflow operators for PostgreSQL <-> CSV file transfers using COPY

Project description

airflow-postgres-csv

Airflow 3 operators for bulk PostgreSQL <-> CSV transfers using COPY.

Operators

  • PostgresToCsvOperator - Run a SQL query and export results to a CSV file
  • CsvToPostgresOperator - Load a CSV file into a PostgreSQL table

Both use PostgreSQL's COPY command for maximum throughput.

Installation

pip install airflow-postgres-csv

Usage

from airflow_postgres_csv import PostgresToCsvOperator, CsvToPostgresOperator

# Export query results to CSV
export_task = PostgresToCsvOperator(
    task_id="export_users",
    conn_id="my_postgres",
    sql="SELECT * FROM users WHERE active = %(active)s",
    parameters={"active": True},
    csv_file_path="/tmp/users.csv",
)

# Load CSV into a table (with truncate)
import_task = CsvToPostgresOperator(
    task_id="import_users",
    conn_id="my_postgres",
    table_name="staging.users",
    csv_file_path="/tmp/users.csv",
    truncate=True,
)

SQL from file

The sql parameter supports multiple formats:

# Inline SQL
PostgresToCsvOperator(sql="SELECT * FROM users", ...)

# Relative path (loaded by Airflow from DAG folder or template_searchpath)
PostgresToCsvOperator(sql="sql/export_users.sql", ...)

# Absolute path (loaded directly)
PostgresToCsvOperator(sql="/opt/airflow/sql/export_users.sql", ...)

Gzip compression

Both operators support gzip compression for large files:

# Export to gzip
PostgresToCsvOperator(
    sql="SELECT * FROM large_table",
    csv_file_path="/tmp/data.csv.gz",
    compression="gzip",
    ...
)

# Import from gzip
CsvToPostgresOperator(
    csv_file_path="/tmp/data.csv.gz",
    compression="gzip",
    ...
)

Parameters

PostgresToCsvOperator

Parameter Description Default
conn_id Airflow Postgres connection ID required
csv_file_path Output file path (templated) required
sql SQL query string, or path to .sql file required
parameters Dict passed to cursor.mogrify {}
has_header Include CSV header row True
compression Compression format ("gzip" or None) None
timeout Query timeout in minutes 60

CsvToPostgresOperator

Parameter Description Default
conn_id Airflow Postgres connection ID required
table_name Target table (templated, supports schema.table) required
csv_file_path Input file path (templated) required
columns Explicit column list None
has_header CSV has header row True
truncate Truncate table before loading False
compression Compression format ("gzip" or None) None
delimiter CSV delimiter ","
quote_char CSV quote character '"'
null_string String representing NULL ""
timeout Query timeout in minutes 60

Requirements

  • Apache Airflow >= 3.0.0
  • apache-airflow-providers-postgres >= 6.0.0

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_postgres_csv-0.2.0.tar.gz (181.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_postgres_csv-0.2.0-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file airflow_postgres_csv-0.2.0.tar.gz.

File metadata

  • Download URL: airflow_postgres_csv-0.2.0.tar.gz
  • Upload date:
  • Size: 181.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for airflow_postgres_csv-0.2.0.tar.gz
Algorithm Hash digest
SHA256 935a0fadded05f17c845315d9a976dccb2d8c863cebe4a431e2d5c3f2e3b8fb1
MD5 87fd322ab0eee98fbac237339f4797c8
BLAKE2b-256 c505b797a2de90661faa714ee23c69e8676c08b0a508d45bd450c949aff54c39

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_postgres_csv-0.2.0.tar.gz:

Publisher: publish.yml on Redevil10/airflow-postgres-csv

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_postgres_csv-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_postgres_csv-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ed34310b754b552f6acc753eb40638c8b915a2258eb9a72220eeb3c85fa1b7a4
MD5 9bb4ce6b27d627c6bf676ce0ff907b60
BLAKE2b-256 0461461fca73959ff6fc3691d4f25eb35fac04d28a516197a91a4dbeb7e930c8

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_postgres_csv-0.2.0-py3-none-any.whl:

Publisher: publish.yml on Redevil10/airflow-postgres-csv

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page