Skip to main content

A pandas wrapper for Airtable that provides seamless integration between pandas DataFrames and Airtable tables.

Project description

pandas-airtable

PyPI version Python versions License CI codecov Downloads Ruff GitHub stars

A pandas wrapper for Airtable that provides seamless integration between pandas DataFrames and Airtable tables.

Features

  • pd.read_airtable() - Read Airtable tables directly into pandas DataFrames
  • df.airtable.to_airtable() - Write DataFrames to Airtable with append, replace, or upsert modes
  • Automatic schema inference - Maps pandas dtypes to Airtable field types
  • Robust batching - Handles Airtable's 10-record batch limit automatically
  • Rate limiting - Built-in respect for Airtable's 5 QPS limit with exponential backoff
  • Table/field creation - Optionally create tables and fields on the fly
  • Dry run mode - Preview changes before executing

Installation

# Using uv (recommended)
uv add pandas-airtable

# Using pip
pip install pandas-airtable

Note: This package does not install pandas as a dependency to keep the package slim. It assumes you already have pandas installed in your environment. If you don't have pandas installed, install it separately:

pip install pandas
# or
uv add pandas

Alternatively, you can install pandas as an optional dependency:

pip install pandas-airtable[pandas]
# or
uv add pandas-airtable[pandas]

Configuration

Getting Your Airtable Credentials

  1. API Key (Personal Access Token):

    • Go to Airtable Account Settings
    • Click "Create new token"
    • Give it a name and select the required scopes:
      • data.records:read - For reading records
      • data.records:write - For writing records
      • schema.bases:read - For reading schema (required for schema validation)
      • schema.bases:write - For creating tables/fields
    • Copy the token (starts with pat...)
  2. Base ID or Base Name:

    • Base ID (recommended): Open your Airtable base - the Base ID is in the URL: https://airtable.com/appXXXXXXXXXXXXXX/... (starts with app)
    • Base Name: Alternatively, you can use the human-readable base name (e.g., "My Project"). The package will look up the Base ID automatically.

    Note: If you have multiple bases with the same name, the package will use the first one found. In such cases, use base_id instead for clarity.

Environment Variables (Recommended)

Store your credentials in environment variables for security:

# Add to your .env file or shell profile
export AIRTABLE_API_KEY="patXXXXXXXXXXXXXX"

# Use either base_id OR base_name (not both)
export AIRTABLE_BASE_ID="appXXXXXXXXXXXXXX"
# OR
export AIRTABLE_BASE_NAME="My Project"

Then use them in your code:

import os
import pandas as pd
import pandas_airtable

api_key = os.environ["AIRTABLE_API_KEY"]

# Using base_id
base_id = os.environ["AIRTABLE_BASE_ID"]
df = pd.read_airtable(base_id, "MyTable", api_key=api_key)

# OR using base_name
base_name = os.environ["AIRTABLE_BASE_NAME"]
df = pd.read_airtable(base_name=base_name, table_name="MyTable", api_key=api_key)

Quick Start

import pandas as pd
import pandas_airtable  # Registers the accessor and pd.read_airtable

# Read from Airtable (using base_id)
df = pd.read_airtable(
    base_id="appXXXXXXXXXXXXXX",
    table_name="Contacts",
    api_key="patXXXXXXXXXXXXXX"
)

# OR read using base_name (human-readable name)
df = pd.read_airtable(
    base_name="My Project",
    table_name="Contacts",
    api_key="patXXXXXXXXXXXXXX"
)

# Work with your data as a normal DataFrame
print(df.head())
print(df.describe())

# Write back to Airtable (works with both base_id and base_name)
df.airtable.to_airtable(
    base_id="appXXXXXXXXXXXXXX",
    table_name="Contacts",
    api_key="patXXXXXXXXXXXXXX"
)

Reading from Airtable

Basic Read

# Read all records from a table
df = pd.read_airtable("appXXX", "Employees", api_key="patXXX")

print(df.columns)
# Index(['_airtable_id', '_airtable_created_time', 'Name', 'Email', 'Department'], dtype='object')

Filtering with Views

# Read only records from a specific view
df = pd.read_airtable(
    "appXXX",
    "Employees",
    api_key="patXXX",
    view="Active Employees"
)

Filtering with Formulas

# Filter using Airtable formula syntax
df = pd.read_airtable(
    "appXXX",
    "Employees",
    api_key="patXXX",
    formula="{Department} = 'Engineering'"
)

# Complex formula example
df = pd.read_airtable(
    "appXXX",
    "Orders",
    api_key="patXXX",
    formula="AND({Status} = 'Pending', {Amount} > 100)"
)

Pagination Control

# Adjust page size for large tables (max 100)
df = pd.read_airtable(
    "appXXX",
    "LargeTable",
    api_key="patXXX",
    page_size=50
)

Returned DataFrame Structure

Every DataFrame returned from read_airtable includes metadata columns:

Column Description
_airtable_id Unique Airtable record ID (e.g., recXXXXXXXXXXXX)
_airtable_created_time Timestamp when the record was created

These columns are automatically excluded when writing back to Airtable.

Writing to Airtable

Append Mode (Default)

Add new records to an existing table:

# Create a new DataFrame
new_employees = pd.DataFrame({
    "Name": ["Alice Smith", "Bob Johnson"],
    "Email": ["alice@company.com", "bob@company.com"],
    "Department": ["Engineering", "Marketing"]
})

# Append to existing table
result = new_employees.airtable.to_airtable(
    base_id="appXXX",
    table_name="Employees",
    api_key="patXXX",
    if_exists="append"  # Default behavior
)

print(f"Created {result.created_count} records")
print(f"New record IDs: {result.created_ids}")

Replace Mode

Delete all existing records and insert new ones:

# Replace entire table contents
result = df.airtable.to_airtable(
    base_id="appXXX",
    table_name="Employees",
    api_key="patXXX",
    if_exists="replace"
)

print(f"Deleted {result.deleted_count} old records")
print(f"Created {result.created_count} new records")

Upsert Mode

Update existing records by a key field, insert new ones:

# Update employees by email, create new ones if email doesn't exist
updates = pd.DataFrame({
    "Email": ["alice@company.com", "charlie@company.com"],
    "Department": ["Product", "Sales"],  # Alice gets updated
    "Name": ["Alice Smith", "Charlie Brown"]  # Charlie is new
})

result = updates.airtable.to_airtable(
    base_id="appXXX",
    table_name="Employees",
    api_key="patXXX",
    if_exists="upsert",
    key_field="Email"  # Required for upsert
)

print(f"Created: {result.created_count}")
print(f"Updated: {result.updated_count}")

Handling Duplicate Keys

By default, duplicate keys in upsert mode raise an error:

from pandas_airtable import AirtableDuplicateKeyError

df_with_duplicates = pd.DataFrame({
    "Email": ["alice@company.com", "alice@company.com"],  # Duplicate!
    "Name": ["Alice V1", "Alice V2"]
})

try:
    df_with_duplicates.airtable.to_airtable(
        "appXXX", "Employees", api_key="patXXX",
        if_exists="upsert", key_field="Email"
    )
except AirtableDuplicateKeyError as e:
    print(f"Duplicate keys found: {e.duplicate_values}")

To allow duplicates (uses last occurrence):

result = df_with_duplicates.airtable.to_airtable(
    "appXXX", "Employees", api_key="patXXX",
    if_exists="upsert",
    key_field="Email",
    allow_duplicate_keys=True  # Uses "Alice V2"
)

Creating Tables and Fields

# Create table if it doesn't exist
result = df.airtable.to_airtable(
    "appXXX", "NewTable", api_key="patXXX",
    create_table=True  # Default: True
)

# Create missing fields in existing table
result = df.airtable.to_airtable(
    "appXXX", "ExistingTable", api_key="patXXX",
    allow_new_columns=True  # Default: False
)

Dry Run Mode

Preview what would happen without making changes:

preview = df.airtable.to_airtable(
    base_id="appXXX",
    table_name="Employees",
    api_key="patXXX",
    dry_run=True
)

print(f"Operation: {preview.operation}")
print(f"Records to process: {preview.record_count}")
print(f"Inferred schema: {preview.schema}")
print(f"Sample records: {preview.sample_records[:3]}")

# If satisfied, run without dry_run
result = df.airtable.to_airtable(
    base_id="appXXX",
    table_name="Employees",
    api_key="patXXX"
)

Type Mapping

pandas-airtable automatically maps pandas dtypes to Airtable field types:

pandas dtype Airtable Field Type
object / str Single line text
int64, int32 Number (integer)
float64, float32 Number (decimal)
bool Checkbox
datetime64[ns] Date
list[str] Multiple select
dict with url/filename Attachment

Custom Schema Override

Override the automatic type inference:

df.airtable.to_airtable(
    "appXXX", "Products", api_key="patXXX",
    schema={
        "description": {"type": "multilineText"},
        "category": {
            "type": "singleSelect",
            "options": {
                "choices": [
                    {"name": "Electronics"},
                    {"name": "Clothing"},
                    {"name": "Food"}
                ]
            }
        },
        "website": {"type": "url"},
        "contact_email": {"type": "email"},
        "price": {"type": "currency", "options": {"symbol": "$"}}
    }
)

Real-World Examples

Example 1: Sync CSV to Airtable

import pandas as pd
import pandas_airtable

# Read CSV file
df = pd.read_csv("customers.csv")

# Clean and transform data
df["email"] = df["email"].str.lower().str.strip()
df["signup_date"] = pd.to_datetime(df["signup_date"])

# Sync to Airtable (update existing by email, create new)
result = df.airtable.to_airtable(
    base_id="appXXX",
    table_name="Customers",
    api_key="patXXX",
    if_exists="upsert",
    key_field="email"
)

print(f"Synced {result.created_count + result.updated_count} records")

Example 2: Export Airtable to Excel

import pandas as pd
import pandas_airtable

# Read from Airtable
df = pd.read_airtable(
    "appXXX", "Sales", api_key="patXXX",
    formula="YEAR({Date}) = 2024"
)

# Remove Airtable metadata columns
df = df.drop(columns=["_airtable_id", "_airtable_created_time"])

# Export to Excel
df.to_excel("sales_2024.xlsx", index=False)

Example 3: Daily Data Pipeline

import pandas as pd
import pandas_airtable
from datetime import datetime, timedelta

# Read yesterday's orders from database/API
yesterday = datetime.now() - timedelta(days=1)
orders = fetch_orders_from_database(date=yesterday)

# Convert to DataFrame
df = pd.DataFrame(orders)

# Append to Airtable orders table
result = df.airtable.to_airtable(
    base_id="appXXX",
    table_name="Orders",
    api_key="patXXX",
    if_exists="append",
    schema={
        "order_date": {"type": "date"},
        "amount": {"type": "currency", "options": {"symbol": "$"}},
        "notes": {"type": "multilineText"}
    }
)

print(f"Added {result.created_count} orders for {yesterday.date()}")

Example 4: Merge Multiple Tables

import pandas as pd
import pandas_airtable

# Read from multiple Airtable tables
customers = pd.read_airtable("appXXX", "Customers", api_key="patXXX")
orders = pd.read_airtable("appXXX", "Orders", api_key="patXXX")

# Merge DataFrames
merged = customers.merge(orders, left_on="email", right_on="customer_email")

# Aggregate and write to summary table
summary = merged.groupby("customer_email").agg({
    "order_amount": "sum",
    "_airtable_id_y": "count"  # Count of orders
}).rename(columns={
    "order_amount": "total_spent",
    "_airtable_id_y": "order_count"
}).reset_index()

summary.airtable.to_airtable(
    "appXXX", "CustomerSummary", api_key="patXXX",
    if_exists="replace"
)

Error Handling

The package provides specific exceptions for different error conditions:

from pandas_airtable import (
    PandasAirtableError,          # Base exception
    AirtableAuthenticationError,   # Invalid API key
    AirtablePermissionError,       # Insufficient permissions
    AirtableRateLimitError,        # Rate limit exceeded after retries
    AirtableSchemaError,           # Schema mismatch
    AirtableValidationError,       # Input validation failure
    AirtableDuplicateKeyError,     # Duplicate keys in upsert mode
    AirtableTableNotFoundError,    # Table doesn't exist
    AirtableFieldCreationError,    # Field creation failed
    AirtableBatchError,            # Partial batch failure
)

try:
    df.airtable.to_airtable("appXXX", "Table", api_key="patXXX")
except AirtableAuthenticationError:
    print("Invalid API key - check your credentials")
except AirtableTableNotFoundError as e:
    print(f"Table '{e.table_name}' not found in base '{e.base_id}'")
except AirtableSchemaError as e:
    print(f"Schema mismatch - missing columns: {e.missing_columns}")
except AirtableDuplicateKeyError as e:
    print(f"Duplicate keys found: {e.duplicate_values}")
except AirtableRateLimitError as e:
    print(f"Rate limited after {e.retries} retries")
except PandasAirtableError as e:
    print(f"General error: {e}")

Logging

Enable logging for debugging:

import logging

# Basic logging
logging.basicConfig(level=logging.INFO)

# Detailed debug logging
logging.getLogger("pandas_airtable").setLevel(logging.DEBUG)

Example log output:

INFO:pandas_airtable:Creating batch 1/5 with 10 records
INFO:pandas_airtable:Creating batch 2/5 with 10 records
DEBUG:pandas_airtable:Retry attempt 1/5 after 1.2s delay: Rate limit exceeded
INFO:pandas_airtable:Creating batch 3/5 with 10 records

API Reference

pd.read_airtable()

pd.read_airtable(
    base_id: str = None,    # Airtable base ID (e.g., "appXXX")
    table_name: str,        # Table name
    api_key: str,           # Personal access token (e.g., "patXXX")
    view: str = None,       # Optional: filter by view name
    formula: str = None,    # Optional: filter by Airtable formula
    page_size: int = 100,   # Records per API page (max 100)
    base_name: str = None,  # Alternative to base_id: use human-readable base name
) -> pd.DataFrame

Note: Provide either base_id or base_name, not both. If multiple bases share the same name, the first one found will be used.

Returns: DataFrame with all records and metadata columns (_airtable_id, _airtable_created_time)

df.airtable.to_airtable()

df.airtable.to_airtable(
    base_id: str = None,                       # Airtable base ID
    table_name: str,                           # Table name
    api_key: str,                              # Personal access token
    if_exists: str = "append",                 # "append", "replace", or "upsert"
    key_field: str = None,                     # Required for upsert mode
    schema: dict = None,                       # Override inferred schema
    batch_size: int = 10,                      # Records per batch (max 10)
    create_table: bool = True,                 # Create table if it doesn't exist
    allow_new_columns: bool = False,           # Create missing fields in Airtable
    allow_duplicate_keys: bool = False,        # Allow duplicate keys in upsert
    dry_run: bool = False,                     # Preview without making changes
    base_name: str = None,                     # Alternative to base_id: use human-readable base name
) -> WriteResult | DryRunResult

Note: Provide either base_id or base_name, not both. If multiple bases share the same name, the first one found will be used.

Returns:

  • WriteResult - Contains created_count, updated_count, deleted_count, created_ids, updated_ids, errors, success
  • DryRunResult (if dry_run=True) - Contains operation, record_count, schema, sample_records, fields_to_create, warnings

Testing

Unit Tests (97 tests)

Run unit tests without Airtable credentials:

uv run pytest tests/ --ignore=tests/integration/ -v

Integration Tests (90 tests)

Run against the real Airtable API:

# Set credentials
export AIRTABLE_API_KEY="patXXXXXXXXXXXXXX"

# Use either base_id OR base_name
export AIRTABLE_BASE_ID="appXXXXXXXXXXXXXX"
# OR
export AIRTABLE_BASE_NAME="My Test Base"

# Run integration tests
uv run pytest tests/integration/ -v

Integration tests are automatically skipped if credentials are not set.

All Tests

uv run pytest tests/ -v

Requirements

  • Python >= 3.12
  • pyairtable >= 3.3.0
  • pandas >= 2.3.3 (peer dependency - must be installed separately or via pandas-airtable[pandas])

License

MIT License

Contributing

Contributions are welcome! Please ensure all tests pass before submitting a PR:

# Run tests
uv run pytest tests/ -v

# Run linter
uv run ruff check src/ tests/

# Fix lint errors automatically
uv run ruff check src/ tests/ --fix

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pandas_airtable-1.3.3.tar.gz (48.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pandas_airtable-1.3.3-py3-none-any.whl (28.1 kB view details)

Uploaded Python 3

File details

Details for the file pandas_airtable-1.3.3.tar.gz.

File metadata

  • Download URL: pandas_airtable-1.3.3.tar.gz
  • Upload date:
  • Size: 48.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pandas_airtable-1.3.3.tar.gz
Algorithm Hash digest
SHA256 58f3b4d55b76a68e7a042e63328d741031b7155bc4829395c477d30068150e75
MD5 ee0106c6108aca5dd8c1ff090b53080e
BLAKE2b-256 33735ae70e5b0fbb5f76d1725ffc53938abe3098a0d3ee599c8910ef989fb3aa

See more details on using hashes here.

Provenance

The following attestation bundles were made for pandas_airtable-1.3.3.tar.gz:

Publisher: publish.yml on GitSujal/pandas-airtable

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pandas_airtable-1.3.3-py3-none-any.whl.

File metadata

File hashes

Hashes for pandas_airtable-1.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 48f8b72a7afa06c203cf7f3924485e41df00ced5ce462612e7dfdbd765f6faff
MD5 f5bc26989f51032b23881fd408f58bf3
BLAKE2b-256 bd19e0f5c93f9bae787dc478ab10e5d67106b9f4ad418766a896bdf46e0db600

See more details on using hashes here.

Provenance

The following attestation bundles were made for pandas_airtable-1.3.3-py3-none-any.whl:

Publisher: publish.yml on GitSujal/pandas-airtable

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page