Skip to main content

Async Salesforce library for Python

Project description

aio-sf

An async Salesforce library for Python.

Features

✅ Supported APIs

  • Bulk API 2.0 - Efficient querying of large datasets
  • Describe API - Field metadata and object descriptions
  • SOQL Query API - Standard Salesforce queries
  • SObjects Collections API - CRUD on collections of SObjects (up to 2000 records at a time)
  • Tooling API - Development and deployment tools
  • Bulk API 1.0 - Legacy bulk operations
  • Streaming API - Real-time event streaming

✅ Supported Authentication Strategies

  • OAuth Client Credentials - Automatic authentication
  • Static Token - Existing access tokens
  • Refresh Token - Refresh token flow
  • SFDX CLI - Login by grabbing a token from the SFDX CLI
  • Password Authentication - Password + ST authentication (soap login)

🚀 Export Features

  • Parquet Export - Efficient columnar storage with schema mapping
  • CSV Export - Simple text format export
  • Resume Support - Resume interrupted queries using job IDs
  • Streaming Processing - Memory-efficient processing of large datasets

Installation

Full Package (Default - Includes Everything)

uv add aio-sf
# or: pip install aio-sf

Core Only (Minimal Dependencies)

uv add "aio-sf[core]"
# or: pip install "aio-sf[core]"

Quick Start

Authentication & Connection

import asyncio
import os
from aio_sf import SalesforceClient, ClientCredentialsAuth

async def main():
    auth = ClientCredentialsAuth(
        client_id=os.getenv('SF_CLIENT_ID'),
        client_secret=os.getenv('SF_CLIENT_SECRET'),
        instance_url=os.getenv('SF_INSTANCE_URL'),
    )
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        print(f"✅ Connected to: {sf.instance_url}")

        sobjects = await sf.describe.list_sobjects()
        print(sobjects[0]["name"])

        contact_describe = await sf.describe.sobject("Contact")

        # retrieve first 5 "creatable" fields on contact
        queryable_fields = [
            field.get("name", "")
            for field in contact_describe["fields"]
            if field.get("createable")
        ][:5]

        query = f"SELECT {', '.join(queryable_fields)} FROM Contact LIMIT 5"
        print(query)

        query_result = await sf.query.soql(query)
        # Loop over records using async iteration
        # or: await query_result.collect_all() to collect all records into a list
        async for record in query_result:
            print(record.get("AccountId"))

        # Create a new Account
        await sf.collections.insert(
            sobject_type="Account",
            records=[{"Name": "Test Account"}]
        )

asyncio.run(main())

Collections API - Batch Operations

Bulk operations (insert, update, upsert, delete) with automatic batching and concurrency.

Basic Usage:

async with SalesforceClient(auth_strategy=auth) as sf:
    records = [{"Name": f"Account {i}"} for i in range(1000)]
    
    results = await sf.collections.insert(records, sobject_type="Account")
    # Also: update(), upsert(), delete()

Advanced - With Retries, Concurrency Scaling, and Progress:

from aio_sf.api.collections import ResultInfo

async def on_result(info: ResultInfo):
    # Called after each batch completes with successes and errors split
    print(
        f"Batch: {len(info['successes'])} succeeded, {len(info['errors'])} failed | "
        f"Attempt {info['current_attempt']}, "
        f"Overall: {info['records_succeeded']} OK, {info['records_failed']} failed, "
        f"{info['records_pending']} pending"
    )
    # Inspect errors (includes both API errors and HTTP failures)
    for error in info['errors']:
        print(f"  Error: {error['errors']}")

async with SalesforceClient(auth_strategy=auth) as sf:
    results = await sf.collections.insert(
        records=records,
        sobject_type="Account",
        batch_size=[200, 100, 25],         # Shrink batch size on retry
        max_concurrent_batches=[5, 3, 1],  # Reduce concurrency on retry
        max_attempts=5,                    # Retry up to 5 times
        on_result=on_result,               # Callback with results
    )

Exporter

The Exporter library contains a streamlined and "opinionated" way to export data from Salesforce to various formats.

3. Export to Parquet

# With full installation (default), you can import directly from aio_sf
from aio_sf import SalesforceClient, ClientCredentialsAuth, bulk_query, write_query_to_parquet

# Or import from the exporter module (both work)
# from aio_sf.exporter import bulk_query, write_query_to_parquet

async def main():
    # ... authentication code from above ...
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        # Query with proper schema
        query_result = await bulk_query(
            sf=sf,
            soql_query="SELECT Id, Name, Email, CreatedDate FROM Contact"
        )
        
        # Export to Parquet
        write_query_to_parquet(
            query_result=query_result,
            file_path="contacts.parquet"
        )
        
        print(f"✅ Exported {len(query_result)} contacts to Parquet")

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_sf-0.1.0b13.tar.gz (96.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_sf-0.1.0b13-py3-none-any.whl (46.4 kB view details)

Uploaded Python 3

File details

Details for the file aio_sf-0.1.0b13.tar.gz.

File metadata

  • Download URL: aio_sf-0.1.0b13.tar.gz
  • Upload date:
  • Size: 96.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b13.tar.gz
Algorithm Hash digest
SHA256 55673de30d4936ba07a6c0747859d90870804ceba66d7fe6065901bb4c904f99
MD5 e5d11f94b51b1086fa7d142e707dae1c
BLAKE2b-256 523a7922736839020d97ff33298ffe4a56c1d63a5e682629e6e66a08ff8b0404

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b13.tar.gz:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aio_sf-0.1.0b13-py3-none-any.whl.

File metadata

  • Download URL: aio_sf-0.1.0b13-py3-none-any.whl
  • Upload date:
  • Size: 46.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b13-py3-none-any.whl
Algorithm Hash digest
SHA256 379a3cfab7ae61436309957beedf9685a36b278c75914634b3f6d9987b528fb7
MD5 eb1c32bd847f878c8adb082f6f791da7
BLAKE2b-256 d1ce3e2aac129a257c73e09b7e13a27f2b6178758011f91298315c52d46dfc2c

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b13-py3-none-any.whl:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page