Apache Airflow provider for Google Sheets — read, write, and smart merge
Project description
airflow-provider-google-sheets
Apache Airflow provider for Google Sheets API v4. Read, write, and manage Google Sheets spreadsheets from Airflow DAGs.
AI Disclosure: This provider was developed with the assistance of Claude Code (Anthropic, model Claude Opus 4.6). The code, tests, and documentation were co-authored by a human developer and an LLM. Please evaluate the code quality on its own merits and make informed decisions about whether to use it in your projects.
Features
- Read data from Google Sheets with chunked streaming, schema-based type conversion, and CSV/JSON/JSONL/XCom output
- Write data in three modes: overwrite, append, and smart merge (upsert by key)
- Smart merge — update, insert, and delete rows based on a key column with correct index recalculation
- Manage spreadsheets — create new spreadsheets, sheets, and list sheets with filtering
- Large datasets — streaming read/write without loading everything into memory
- Schema support — automatic type conversion (date, int, float, bool) on read and write
- Header processing — deduplication, Cyrillic transliteration (on by default), special character removal, lowercase conversion, snake_case normalization
Installation
pip install airflow-provider-google-sheets
With Cyrillic header transliteration support:
pip install airflow-provider-google-sheets[transliterate]
Requirements
- Python >= 3.10
- Apache Airflow 2.x (>= 2.7, tested on 2.9.1; Airflow 3.x not tested)
- Google service account with Sheets API access
Connection Setup
- Create a Google Cloud service account with Google Sheets API enabled.
- Download the JSON key file.
- In Airflow UI, create a connection with one of the supported configurations:
Option A: Standard Google Cloud connection (recommended)
Use this if you already have a google_cloud_platform connection configured in Airflow.
- Conn Id:
google_cloud_default - Conn Type:
Google Cloud - Keyfile Path:
/path/to/service-account.json - Scopes:
https://www.googleapis.com/auth/spreadsheets(add more if needed)
Option B: Inline JSON key
- Conn Id:
google_cloud_default - Conn Type:
Google Cloudorgoogle_sheets - Keyfile JSON: paste the full service account JSON
Option C: JSON in Extra field
- Conn Id:
google_cloud_default - Conn Type:
google_sheets - Extra: paste the full JSON key, or use
{"keyfile_dict": <JSON key>}
The hook checks credentials in this order: key_path / keyfile_path (file on disk) → keyfile_dict (inline JSON) → raw Extra JSON.
Operators
GoogleSheetsReadOperator
Read data from a spreadsheet.
from airflow_provider_google_sheets.operators.read import GoogleSheetsReadOperator
# Basic read — returns list[dict] via XCom
read = GoogleSheetsReadOperator(
task_id="read_sheets",
spreadsheet_id="your-spreadsheet-id",
sheet_name="Sheet1",
)
# Stream large sheet to CSV file (no memory accumulation)
read_csv = GoogleSheetsReadOperator(
task_id="read_to_csv",
spreadsheet_id="your-spreadsheet-id",
output_type="csv",
output_path="/tmp/export.csv",
chunk_size=10000,
)
# Stream to JSONL file (one JSON object per line, memory-efficient)
read_jsonl = GoogleSheetsReadOperator(
task_id="read_to_jsonl",
spreadsheet_id="your-spreadsheet-id",
output_type="jsonl",
output_path="/tmp/export.json",
chunk_size=10000,
)
# Stream to JSON array file
read_json = GoogleSheetsReadOperator(
task_id="read_to_json",
spreadsheet_id="your-spreadsheet-id",
output_type="json",
output_path="/tmp/export.json",
)
# Read with type conversion
read_typed = GoogleSheetsReadOperator(
task_id="read_typed",
spreadsheet_id="your-spreadsheet-id",
schema={
"date": {"type": "date", "format": "%Y-%m-%d"},
"revenue": {"type": "float", "required": True},
"quantity": {"type": "int"},
},
)
# Default behavior: headers are transliterated, sanitized, and lowercased.
# "Дата отчёта" → "data_otchyota", "Клиент (ФИО)" → "klient_fio"
read_default = GoogleSheetsReadOperator(
task_id="read_default",
spreadsheet_id="your-spreadsheet-id",
)
# column_mapping takes priority — all other header processing is skipped,
# mapping keys use the original raw header names from the spreadsheet.
read_mapped = GoogleSheetsReadOperator(
task_id="read_mapped",
spreadsheet_id="your-spreadsheet-id",
output_type="jsonl",
output_path="/tmp/export.json",
column_mapping={
"Дата": "report_date",
"Клиент": "client",
"Сумма": "amount",
},
)
# Disable all header processing to keep original names
read_raw = GoogleSheetsReadOperator(
task_id="read_raw",
spreadsheet_id="your-spreadsheet-id",
transliterate_headers=False,
sanitize_headers=False,
lowercase_headers=False,
)
# Skip rows where status is "deleted" and stop reading at "ИТОГО"
read_filtered = GoogleSheetsReadOperator(
task_id="read_filtered",
spreadsheet_id="your-spreadsheet-id",
row_skip={"column": "status", "value": "deleted"},
row_stop={"column": "name", "value": "ИТОГО"},
)
# Skip multiple conditions (OR logic)
read_multi_skip = GoogleSheetsReadOperator(
task_id="read_multi_skip",
spreadsheet_id="your-spreadsheet-id",
row_skip=[
{"column": "status", "value": "deleted"},
{"column": "status", "value": "archived"},
{"column": "amount", "op": "empty"},
],
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
gcp_conn_id |
str | "google_cloud_default" |
Airflow Connection ID |
spreadsheet_id |
str | — | Spreadsheet ID |
sheet_name |
str | None |
Sheet name (None = first sheet) |
cell_range |
str | None |
A1-notation range (None = entire sheet) |
has_headers |
bool | True |
First row contains headers |
transliterate_headers |
bool | True |
Transliterate Cyrillic to Latin |
sanitize_headers |
bool | True |
Remove spaces and special characters (keep letters, digits, _) |
lowercase_headers |
bool | True |
Convert headers to lowercase |
normalize_headers |
bool | False |
Normalize to snake_case (overrides sanitize + lowercase) |
column_mapping |
dict | None |
Rename headers using raw names: {"Original": "new_name"}. Skips all other processing |
schema |
dict | None |
Column type schema |
strip_strings |
bool | False |
Strip leading/trailing whitespace from string cell values |
row_skip |
dict | list[dict] | None |
Condition(s) to skip rows: {"column": "status", "value": "deleted", "op": "equals"}. Multiple dicts = OR logic |
row_stop |
dict | list[dict] | None |
Condition(s) to stop reading: rows from the first match onward are discarded, no further API calls |
chunk_size |
int | 5000 |
Rows per API request |
output_type |
str | "xcom" |
"xcom", "csv", "json" (JSON array), or "jsonl" (one object per line) |
output_path |
str | None |
File path for csv/json/jsonl output |
max_xcom_rows |
int | 50000 |
Max rows for XCom output |
GoogleSheetsWriteOperator
Write data to a spreadsheet.
from airflow_provider_google_sheets.operators.write import GoogleSheetsWriteOperator
# Overwrite with list[dict]
write = GoogleSheetsWriteOperator(
task_id="write_sheets",
spreadsheet_id="your-spreadsheet-id",
sheet_name="Output",
write_mode="overwrite",
data=[{"date": "2024-01-01", "value": 100}],
)
# Append rows
append = GoogleSheetsWriteOperator(
task_id="append_sheets",
spreadsheet_id="your-spreadsheet-id",
write_mode="append",
data=[{"event": "login", "user": "alice"}],
)
# Smart merge by key
merge = GoogleSheetsWriteOperator(
task_id="smart_merge",
spreadsheet_id="your-spreadsheet-id",
write_mode="smart_merge",
merge_key="date",
data=[
{"date": "2024-01-01", "value": 110}, # update existing
{"date": "2024-01-03", "value": 200}, # append new
],
)
# Table starting at a non-default cell (e.g. C3)
# Headers are written to C3 on first run; key column is resolved relative to C
merge_offset = GoogleSheetsWriteOperator(
task_id="smart_merge_offset",
spreadsheet_id="your-spreadsheet-id",
sheet_name="Report",
write_mode="smart_merge",
merge_key="date",
table_start="C3", # table header lives at C3
data=[{"date": "2024-01-01", "revenue": 110}],
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
gcp_conn_id |
str | "google_cloud_default" |
Airflow Connection ID |
spreadsheet_id |
str | — | Spreadsheet ID |
sheet_name |
str | None |
Sheet name |
cell_range |
str | None |
Target A1 range (overwrite mode) |
write_mode |
str | "overwrite" |
"overwrite", "append", "smart_merge" |
clear_mode |
str | "sheet" |
Overwrite clearing strategy: "sheet" clears entire sheet and trims extra rows; "range" clears only data columns |
data |
Any | None |
Data: list[list], list[dict], or file path |
data_xcom_task_id |
str | None |
Pull data from this task's XCom |
data_xcom_key |
str | "return_value" |
XCom key |
has_headers |
bool | True |
Data contains headers |
write_headers |
bool | True |
Write header row. In append/smart_merge modes, headers are written automatically when the sheet is empty |
schema |
dict | None |
Schema for formatting values |
batch_size |
int | 1000 |
Rows per API request |
pause_between_batches |
float | 1.0 |
Seconds between batches |
merge_key |
str | None |
Key column for smart_merge |
table_start |
str | "A1" |
Top-left cell of the table (e.g. "C3"). Used by append and smart_merge to locate the header and resolve column positions. Defaults to "A1" |
Data input formats:
list[dict]— headers auto-detected from keyslist[list]— raw rows (sethas_headers=Trueif first row is header)str— file path (.csvfiles read as CSV; all other extensions read as JSONL by default)- XCom — set
data_xcom_task_id
File format is auto-detected by extension: .csv → CSV, everything else → JSONL.
To read a JSON array file, pass source_type="json" to normalize_input_data() or write data as JSONL instead.
Smart Merge Algorithm
Smart merge reads the key column from the sheet, compares with incoming data, and generates minimal operations:
- Read the key column to build an index
{key_value: [row_numbers]} - Compare each key: same count → update, more incoming → insert, fewer → delete, new key → append
- Sort structural operations bottom-up (descending row number) to prevent index corruption
- Execute inserts/deletes via
batchUpdate, then recalculate row indices for value updates - Write values via
batch_update_valuesfor efficiency
GoogleSheetsCreateSpreadsheetOperator
from airflow_provider_google_sheets.operators.manage import GoogleSheetsCreateSpreadsheetOperator
create = GoogleSheetsCreateSpreadsheetOperator(
task_id="create_spreadsheet",
title="Monthly Report",
sheet_titles=["Summary", "Details"],
)
# Returns spreadsheet_id via XCom
GoogleSheetsCreateSheetOperator
from airflow_provider_google_sheets.operators.manage import GoogleSheetsCreateSheetOperator
add_sheet = GoogleSheetsCreateSheetOperator(
task_id="add_sheet",
spreadsheet_id="your-spreadsheet-id",
sheet_title="NewSheet",
)
GoogleSheetsListSheetsOperator
List sheet (tab) names of a spreadsheet with optional filtering. Returns list[str], compatible with Airflow dynamic task mapping.
from airflow_provider_google_sheets.operators.manage import GoogleSheetsListSheetsOperator
# List all sheets
list_sheets = GoogleSheetsListSheetsOperator(
task_id="list_sheets",
spreadsheet_id="your-spreadsheet-id",
)
# Filter by regex and use with dynamic task mapping
list_data_sheets = GoogleSheetsListSheetsOperator(
task_id="list_data_sheets",
spreadsheet_id="your-spreadsheet-id",
name_pattern=r"^Data", # include only sheets starting with "Data"
exclude_pattern=r"_archive$", # exclude sheets ending with "_archive"
index_range=(0, 10), # only first 10 sheets
)
# Dynamic task mapping — read each sheet in parallel
read_each = GoogleSheetsReadOperator.partial(
task_id="read_each",
spreadsheet_id="your-spreadsheet-id",
).expand(sheet_name=list_data_sheets.output)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
gcp_conn_id |
str | "google_cloud_default" |
Airflow Connection ID |
spreadsheet_id |
str | — | Spreadsheet ID |
name_pattern |
str | None |
Regex to include sheets by name (re.search) |
exclude_pattern |
str | None |
Regex to exclude sheets by name (re.search) |
index_range |
tuple[int, int] | None |
Positional slice (start, end), 0-based, start inclusive, end exclusive |
Schema
Define column types for automatic conversion on read/write:
schema = {
"date": {"type": "date", "format": "%Y-%m-%d", "required": True},
"revenue": {"type": "float", "required": True},
"quantity": {"type": "int"},
"comment": {"type": "str"},
"is_active": {"type": "bool"},
}
Supported types: str, int, float, date, datetime, bool
Robust numeric parsing
For numeric columns (int, float) add "default" to enable lenient parsing.
Non-numeric values are replaced with the default instead of raising an error:
schema = {
"revenue": {"type": "float", "default": None}, # "n/a", "-", "" → None
"quantity": {"type": "int", "default": 0}, # "n/a", "-", "" → 0
}
Lenient mode also handles:
- Comma as decimal separator:
"1,2"→1.2 - Prefix/suffix stripping:
"1000.4 р."→1000.4,"10.2%"→10.2
Without "default", the strict behaviour is preserved (error on invalid values).
Examples
See the examples/ directory for complete DAG examples:
example_read.py— reading with various configurationsexample_write.py— overwrite and append modesexample_smart_merge.py— smart merge scenariosexample_manage.py— creating spreadsheets and sheetsexample_sheets_to_bigquery.py— Google Sheets → BigQuery (overwrite, append, date-range update)example_bigquery_to_sheets.py— BigQuery → Google Sheets (overwrite, smart merge by date)
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_provider_google_sheets-0.6.2.tar.gz.
File metadata
- Download URL: airflow_provider_google_sheets-0.6.2.tar.gz
- Upload date:
- Size: 71.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fd666c86f1e722a36aaa5af7c17b046adc2bc0d959051cc30e0c7eb50d2e74e9
|
|
| MD5 |
d6a744fda28e2b8c2f50f65731d5fff3
|
|
| BLAKE2b-256 |
002bd4652eb83dcbd7503b5df91055f19361869d2ecaebe42597dbc21e620416
|
Provenance
The following attestation bundles were made for airflow_provider_google_sheets-0.6.2.tar.gz:
Publisher:
publish.yml on mkozhin/airflow-provider-google-sheets
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_provider_google_sheets-0.6.2.tar.gz -
Subject digest:
fd666c86f1e722a36aaa5af7c17b046adc2bc0d959051cc30e0c7eb50d2e74e9 - Sigstore transparency entry: 1112230583
- Sigstore integration time:
-
Permalink:
mkozhin/airflow-provider-google-sheets@2c9fdf7d7e0d95760fd4bda478fd0e69c3890b5e -
Branch / Tag:
refs/tags/v0.6.2 - Owner: https://github.com/mkozhin
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2c9fdf7d7e0d95760fd4bda478fd0e69c3890b5e -
Trigger Event:
push
-
Statement type:
File details
Details for the file airflow_provider_google_sheets-0.6.2-py3-none-any.whl.
File metadata
- Download URL: airflow_provider_google_sheets-0.6.2-py3-none-any.whl
- Upload date:
- Size: 32.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e64b922a01aa9599e00f87d39b5283cbbe834f75a916e6fd849d8ee3d8078c71
|
|
| MD5 |
458d0eb04309e3e26a754f5ae2a8bc70
|
|
| BLAKE2b-256 |
3266912d098dd90a1d3e767e8b96792eca97ac9df48d1186f7d3e9124bee739a
|
Provenance
The following attestation bundles were made for airflow_provider_google_sheets-0.6.2-py3-none-any.whl:
Publisher:
publish.yml on mkozhin/airflow-provider-google-sheets
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_provider_google_sheets-0.6.2-py3-none-any.whl -
Subject digest:
e64b922a01aa9599e00f87d39b5283cbbe834f75a916e6fd849d8ee3d8078c71 - Sigstore transparency entry: 1112230657
- Sigstore integration time:
-
Permalink:
mkozhin/airflow-provider-google-sheets@2c9fdf7d7e0d95760fd4bda478fd0e69c3890b5e -
Branch / Tag:
refs/tags/v0.6.2 - Owner: https://github.com/mkozhin
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2c9fdf7d7e0d95760fd4bda478fd0e69c3890b5e -
Trigger Event:
push
-
Statement type: