A pandas wrapper for Airtable that provides seamless integration between pandas DataFrames and Airtable tables.
Project description
pandas-airtable
A pandas wrapper for Airtable that provides seamless integration between pandas DataFrames and Airtable tables.
Features
pd.read_airtable()- Read Airtable tables directly into pandas DataFramesdf.airtable.to_airtable()- Write DataFrames to Airtable with append, replace, or upsert modes- Automatic schema inference - Maps pandas dtypes to Airtable field types
- Robust batching - Handles Airtable's 10-record batch limit automatically
- Rate limiting - Built-in respect for Airtable's 5 QPS limit with exponential backoff
- Table/field creation - Optionally create tables and fields on the fly
- Dry run mode - Preview changes before executing
Installation
# Using uv (recommended)
uv add pandas-airtable
# Using pip
pip install pandas-airtable
Note: This package does not install pandas as a dependency to keep the package slim. It assumes you already have pandas installed in your environment. If you don't have pandas installed, install it separately:
pip install pandas # or uv add pandasAlternatively, you can install pandas as an optional dependency:
pip install pandas-airtable[pandas] # or uv add pandas-airtable[pandas]
Configuration
Getting Your Airtable Credentials
-
API Key (Personal Access Token):
- Go to Airtable Account Settings
- Click "Create new token"
- Give it a name and select the required scopes:
data.records:read- For reading recordsdata.records:write- For writing recordsschema.bases:read- For reading schema (required for schema validation)schema.bases:write- For creating tables/fields
- Copy the token (starts with
pat...)
-
Base ID or Base Name:
- Base ID (recommended): Open your Airtable base - the Base ID is in the URL:
https://airtable.com/appXXXXXXXXXXXXXX/...(starts withapp) - Base Name: Alternatively, you can use the human-readable base name (e.g., "My Project"). The package will look up the Base ID automatically.
Note: If you have multiple bases with the same name, the package will use the first one found. In such cases, use
base_idinstead for clarity. - Base ID (recommended): Open your Airtable base - the Base ID is in the URL:
Environment Variables (Recommended)
Store your credentials in environment variables for security:
# Add to your .env file or shell profile
export AIRTABLE_API_KEY="patXXXXXXXXXXXXXX"
# Use either base_id OR base_name (not both)
export AIRTABLE_BASE_ID="appXXXXXXXXXXXXXX"
# OR
export AIRTABLE_BASE_NAME="My Project"
Then use them in your code:
import os
import pandas as pd
import pandas_airtable
api_key = os.environ["AIRTABLE_API_KEY"]
# Using base_id
base_id = os.environ["AIRTABLE_BASE_ID"]
df = pd.read_airtable(base_id, "MyTable", api_key=api_key)
# OR using base_name
base_name = os.environ["AIRTABLE_BASE_NAME"]
df = pd.read_airtable(base_name=base_name, table_name="MyTable", api_key=api_key)
Quick Start
import pandas as pd
import pandas_airtable # Registers the accessor and pd.read_airtable
# Read from Airtable (using base_id)
df = pd.read_airtable(
base_id="appXXXXXXXXXXXXXX",
table_name="Contacts",
api_key="patXXXXXXXXXXXXXX"
)
# OR read using base_name (human-readable name)
df = pd.read_airtable(
base_name="My Project",
table_name="Contacts",
api_key="patXXXXXXXXXXXXXX"
)
# Work with your data as a normal DataFrame
print(df.head())
print(df.describe())
# Write back to Airtable (works with both base_id and base_name)
df.airtable.to_airtable(
base_id="appXXXXXXXXXXXXXX",
table_name="Contacts",
api_key="patXXXXXXXXXXXXXX"
)
Reading from Airtable
Basic Read
# Read all records from a table
df = pd.read_airtable("appXXX", "Employees", api_key="patXXX")
print(df.columns)
# Index(['_airtable_id', '_airtable_created_time', 'Name', 'Email', 'Department'], dtype='object')
Filtering with Views
# Read only records from a specific view
df = pd.read_airtable(
"appXXX",
"Employees",
api_key="patXXX",
view="Active Employees"
)
Filtering with Formulas
# Filter using Airtable formula syntax
df = pd.read_airtable(
"appXXX",
"Employees",
api_key="patXXX",
formula="{Department} = 'Engineering'"
)
# Complex formula example
df = pd.read_airtable(
"appXXX",
"Orders",
api_key="patXXX",
formula="AND({Status} = 'Pending', {Amount} > 100)"
)
Pagination Control
# Adjust page size for large tables (max 100)
df = pd.read_airtable(
"appXXX",
"LargeTable",
api_key="patXXX",
page_size=50
)
Returned DataFrame Structure
Every DataFrame returned from read_airtable includes metadata columns:
| Column | Description |
|---|---|
_airtable_id |
Unique Airtable record ID (e.g., recXXXXXXXXXXXX) |
_airtable_created_time |
Timestamp when the record was created |
These columns are automatically excluded when writing back to Airtable.
Writing to Airtable
Append Mode (Default)
Add new records to an existing table:
# Create a new DataFrame
new_employees = pd.DataFrame({
"Name": ["Alice Smith", "Bob Johnson"],
"Email": ["alice@company.com", "bob@company.com"],
"Department": ["Engineering", "Marketing"]
})
# Append to existing table
result = new_employees.airtable.to_airtable(
base_id="appXXX",
table_name="Employees",
api_key="patXXX",
if_exists="append" # Default behavior
)
print(f"Created {result.created_count} records")
print(f"New record IDs: {result.created_ids}")
Replace Mode
Delete all existing records and insert new ones:
# Replace entire table contents
result = df.airtable.to_airtable(
base_id="appXXX",
table_name="Employees",
api_key="patXXX",
if_exists="replace"
)
print(f"Deleted {result.deleted_count} old records")
print(f"Created {result.created_count} new records")
Upsert Mode
Update existing records by a key field, insert new ones:
# Update employees by email, create new ones if email doesn't exist
updates = pd.DataFrame({
"Email": ["alice@company.com", "charlie@company.com"],
"Department": ["Product", "Sales"], # Alice gets updated
"Name": ["Alice Smith", "Charlie Brown"] # Charlie is new
})
result = updates.airtable.to_airtable(
base_id="appXXX",
table_name="Employees",
api_key="patXXX",
if_exists="upsert",
key_field="Email" # Required for upsert
)
print(f"Created: {result.created_count}")
print(f"Updated: {result.updated_count}")
Handling Duplicate Keys
By default, duplicate keys in upsert mode raise an error:
from pandas_airtable import AirtableDuplicateKeyError
df_with_duplicates = pd.DataFrame({
"Email": ["alice@company.com", "alice@company.com"], # Duplicate!
"Name": ["Alice V1", "Alice V2"]
})
try:
df_with_duplicates.airtable.to_airtable(
"appXXX", "Employees", api_key="patXXX",
if_exists="upsert", key_field="Email"
)
except AirtableDuplicateKeyError as e:
print(f"Duplicate keys found: {e.duplicate_values}")
To allow duplicates (uses last occurrence):
result = df_with_duplicates.airtable.to_airtable(
"appXXX", "Employees", api_key="patXXX",
if_exists="upsert",
key_field="Email",
allow_duplicate_keys=True # Uses "Alice V2"
)
Creating Tables and Fields
# Create table if it doesn't exist
result = df.airtable.to_airtable(
"appXXX", "NewTable", api_key="patXXX",
create_table=True # Default: True
)
# Create missing fields in existing table
result = df.airtable.to_airtable(
"appXXX", "ExistingTable", api_key="patXXX",
allow_new_columns=True # Default: False
)
Dry Run Mode
Preview what would happen without making changes:
preview = df.airtable.to_airtable(
base_id="appXXX",
table_name="Employees",
api_key="patXXX",
dry_run=True
)
print(f"Operation: {preview.operation}")
print(f"Records to process: {preview.record_count}")
print(f"Inferred schema: {preview.schema}")
print(f"Sample records: {preview.sample_records[:3]}")
# If satisfied, run without dry_run
result = df.airtable.to_airtable(
base_id="appXXX",
table_name="Employees",
api_key="patXXX"
)
Type Mapping
pandas-airtable automatically maps pandas dtypes to Airtable field types:
| pandas dtype | Airtable Field Type |
|---|---|
object / str |
Single line text |
int64, int32 |
Number (integer) |
float64, float32 |
Number (decimal) |
bool |
Checkbox |
datetime64[ns] |
Date |
list[str] |
Multiple select |
dict with url/filename |
Attachment |
Custom Schema Override
Override the automatic type inference:
df.airtable.to_airtable(
"appXXX", "Products", api_key="patXXX",
schema={
"description": {"type": "multilineText"},
"category": {
"type": "singleSelect",
"options": {
"choices": [
{"name": "Electronics"},
{"name": "Clothing"},
{"name": "Food"}
]
}
},
"website": {"type": "url"},
"contact_email": {"type": "email"},
"price": {"type": "currency", "options": {"symbol": "$"}}
}
)
Real-World Examples
Example 1: Sync CSV to Airtable
import pandas as pd
import pandas_airtable
# Read CSV file
df = pd.read_csv("customers.csv")
# Clean and transform data
df["email"] = df["email"].str.lower().str.strip()
df["signup_date"] = pd.to_datetime(df["signup_date"])
# Sync to Airtable (update existing by email, create new)
result = df.airtable.to_airtable(
base_id="appXXX",
table_name="Customers",
api_key="patXXX",
if_exists="upsert",
key_field="email"
)
print(f"Synced {result.created_count + result.updated_count} records")
Example 2: Export Airtable to Excel
import pandas as pd
import pandas_airtable
# Read from Airtable
df = pd.read_airtable(
"appXXX", "Sales", api_key="patXXX",
formula="YEAR({Date}) = 2024"
)
# Remove Airtable metadata columns
df = df.drop(columns=["_airtable_id", "_airtable_created_time"])
# Export to Excel
df.to_excel("sales_2024.xlsx", index=False)
Example 3: Daily Data Pipeline
import pandas as pd
import pandas_airtable
from datetime import datetime, timedelta
# Read yesterday's orders from database/API
yesterday = datetime.now() - timedelta(days=1)
orders = fetch_orders_from_database(date=yesterday)
# Convert to DataFrame
df = pd.DataFrame(orders)
# Append to Airtable orders table
result = df.airtable.to_airtable(
base_id="appXXX",
table_name="Orders",
api_key="patXXX",
if_exists="append",
schema={
"order_date": {"type": "date"},
"amount": {"type": "currency", "options": {"symbol": "$"}},
"notes": {"type": "multilineText"}
}
)
print(f"Added {result.created_count} orders for {yesterday.date()}")
Example 4: Merge Multiple Tables
import pandas as pd
import pandas_airtable
# Read from multiple Airtable tables
customers = pd.read_airtable("appXXX", "Customers", api_key="patXXX")
orders = pd.read_airtable("appXXX", "Orders", api_key="patXXX")
# Merge DataFrames
merged = customers.merge(orders, left_on="email", right_on="customer_email")
# Aggregate and write to summary table
summary = merged.groupby("customer_email").agg({
"order_amount": "sum",
"_airtable_id_y": "count" # Count of orders
}).rename(columns={
"order_amount": "total_spent",
"_airtable_id_y": "order_count"
}).reset_index()
summary.airtable.to_airtable(
"appXXX", "CustomerSummary", api_key="patXXX",
if_exists="replace"
)
Error Handling
The package provides specific exceptions for different error conditions:
from pandas_airtable import (
PandasAirtableError, # Base exception
AirtableAuthenticationError, # Invalid API key
AirtablePermissionError, # Insufficient permissions
AirtableRateLimitError, # Rate limit exceeded after retries
AirtableSchemaError, # Schema mismatch
AirtableValidationError, # Input validation failure
AirtableDuplicateKeyError, # Duplicate keys in upsert mode
AirtableTableNotFoundError, # Table doesn't exist
AirtableFieldCreationError, # Field creation failed
AirtableBatchError, # Partial batch failure
)
try:
df.airtable.to_airtable("appXXX", "Table", api_key="patXXX")
except AirtableAuthenticationError:
print("Invalid API key - check your credentials")
except AirtableTableNotFoundError as e:
print(f"Table '{e.table_name}' not found in base '{e.base_id}'")
except AirtableSchemaError as e:
print(f"Schema mismatch - missing columns: {e.missing_columns}")
except AirtableDuplicateKeyError as e:
print(f"Duplicate keys found: {e.duplicate_values}")
except AirtableRateLimitError as e:
print(f"Rate limited after {e.retries} retries")
except PandasAirtableError as e:
print(f"General error: {e}")
Logging
Enable logging for debugging:
import logging
# Basic logging
logging.basicConfig(level=logging.INFO)
# Detailed debug logging
logging.getLogger("pandas_airtable").setLevel(logging.DEBUG)
Example log output:
INFO:pandas_airtable:Creating batch 1/5 with 10 records
INFO:pandas_airtable:Creating batch 2/5 with 10 records
DEBUG:pandas_airtable:Retry attempt 1/5 after 1.2s delay: Rate limit exceeded
INFO:pandas_airtable:Creating batch 3/5 with 10 records
API Reference
pd.read_airtable()
pd.read_airtable(
base_id: str = None, # Airtable base ID (e.g., "appXXX")
table_name: str, # Table name
api_key: str, # Personal access token (e.g., "patXXX")
view: str = None, # Optional: filter by view name
formula: str = None, # Optional: filter by Airtable formula
page_size: int = 100, # Records per API page (max 100)
base_name: str = None, # Alternative to base_id: use human-readable base name
) -> pd.DataFrame
Note: Provide either
base_idorbase_name, not both. If multiple bases share the same name, the first one found will be used.
Returns: DataFrame with all records and metadata columns (_airtable_id, _airtable_created_time)
df.airtable.to_airtable()
df.airtable.to_airtable(
base_id: str = None, # Airtable base ID
table_name: str, # Table name
api_key: str, # Personal access token
if_exists: str = "append", # "append", "replace", or "upsert"
key_field: str = None, # Required for upsert mode
schema: dict = None, # Override inferred schema
batch_size: int = 10, # Records per batch (max 10)
create_table: bool = True, # Create table if it doesn't exist
allow_new_columns: bool = False, # Create missing fields in Airtable
allow_duplicate_keys: bool = False, # Allow duplicate keys in upsert
dry_run: bool = False, # Preview without making changes
base_name: str = None, # Alternative to base_id: use human-readable base name
) -> WriteResult | DryRunResult
Note: Provide either
base_idorbase_name, not both. If multiple bases share the same name, the first one found will be used.
Returns:
WriteResult- Containscreated_count,updated_count,deleted_count,created_ids,updated_ids,errors,successDryRunResult(ifdry_run=True) - Containsoperation,record_count,schema,sample_records,fields_to_create,warnings
Testing
Unit Tests (97 tests)
Run unit tests without Airtable credentials:
uv run pytest tests/ --ignore=tests/integration/ -v
Integration Tests (90 tests)
Run against the real Airtable API:
# Set credentials
export AIRTABLE_API_KEY="patXXXXXXXXXXXXXX"
# Use either base_id OR base_name
export AIRTABLE_BASE_ID="appXXXXXXXXXXXXXX"
# OR
export AIRTABLE_BASE_NAME="My Test Base"
# Run integration tests
uv run pytest tests/integration/ -v
Integration tests are automatically skipped if credentials are not set.
All Tests
uv run pytest tests/ -v
Requirements
- Python >= 3.12
- pyairtable >= 3.3.0
- pandas >= 2.3.3 (peer dependency - must be installed separately or via
pandas-airtable[pandas])
License
MIT License
Contributing
Contributions are welcome! Please ensure all tests pass before submitting a PR:
# Run tests
uv run pytest tests/ -v
# Run linter
uv run ruff check src/ tests/
# Fix lint errors automatically
uv run ruff check src/ tests/ --fix
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pandas_airtable-1.2.1.tar.gz.
File metadata
- Download URL: pandas_airtable-1.2.1.tar.gz
- Upload date:
- Size: 46.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5791dd4288def82def66ff858052a83af8e06ce932c686fbd02ea69cbae8302d
|
|
| MD5 |
505cb89bdd57011cb2075fc20310f78c
|
|
| BLAKE2b-256 |
e81ee3d8120f82827ea9b1afc896891cc0f043f9ec317a61f009d936ba0b29bf
|
Provenance
The following attestation bundles were made for pandas_airtable-1.2.1.tar.gz:
Publisher:
publish.yml on GitSujal/pandas-airtable
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pandas_airtable-1.2.1.tar.gz -
Subject digest:
5791dd4288def82def66ff858052a83af8e06ce932c686fbd02ea69cbae8302d - Sigstore transparency entry: 809197439
- Sigstore integration time:
-
Permalink:
GitSujal/pandas-airtable@939a58ff42bafed75666678a2d7b44d33230b155 -
Branch / Tag:
refs/tags/v1.2.1 - Owner: https://github.com/GitSujal
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@939a58ff42bafed75666678a2d7b44d33230b155 -
Trigger Event:
release
-
Statement type:
File details
Details for the file pandas_airtable-1.2.1-py3-none-any.whl.
File metadata
- Download URL: pandas_airtable-1.2.1-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad4442d2043b961b0fca7193e5bfe006b4b9a6668668ef42941ce2a272706bdf
|
|
| MD5 |
bba2df4fd06a5407564a8c1f01778040
|
|
| BLAKE2b-256 |
341fa63d79424b83dd84da7399ca8cf57f9a6dda0a32ccf1637df2503dacffd6
|
Provenance
The following attestation bundles were made for pandas_airtable-1.2.1-py3-none-any.whl:
Publisher:
publish.yml on GitSujal/pandas-airtable
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pandas_airtable-1.2.1-py3-none-any.whl -
Subject digest:
ad4442d2043b961b0fca7193e5bfe006b4b9a6668668ef42941ce2a272706bdf - Sigstore transparency entry: 809197496
- Sigstore integration time:
-
Permalink:
GitSujal/pandas-airtable@939a58ff42bafed75666678a2d7b44d33230b155 -
Branch / Tag:
refs/tags/v1.2.1 - Owner: https://github.com/GitSujal
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@939a58ff42bafed75666678a2d7b44d33230b155 -
Trigger Event:
release
-
Statement type: