Skip to main content

Async Salesforce library for Python

Project description

aio-sf

An async Salesforce library for Python.

Features

✅ Supported APIs

  • Bulk API 2.0 - Efficient querying of large datasets
  • Describe API - Field metadata and object descriptions
  • SOQL Query API - Standard Salesforce queries
  • SObjects Collections API - CRUD on collections of SObjects (up to 2000 records at a time)
  • Tooling API - Development and deployment tools
  • Bulk API 1.0 - Legacy bulk operations
  • Streaming API - Real-time event streaming

✅ Supported Authentication Strategies

  • OAuth Client Credentials - Automatic authentication
  • Static Token - Existing access tokens
  • Refresh Token - Refresh token flow
  • SFDX CLI - Login by grabbing a token from the SFDX CLI
  • Password Authentication - Password + ST authentication (soap login)

🚀 Export Features

  • Parquet Export - Efficient columnar storage with schema mapping
  • CSV Export - Simple text format export
  • Resume Support - Resume interrupted queries using job IDs
  • Streaming Processing - Memory-efficient processing of large datasets

Installation

Full Package (Default - Includes Everything)

uv add aio-sf
# or: pip install aio-sf

Core Only (Minimal Dependencies)

uv add "aio-sf[core]"
# or: pip install "aio-sf[core]"

Quick Start

Authentication & Connection

import asyncio
import os
from aio_sf import SalesforceClient, ClientCredentialsAuth

async def main():
    auth = ClientCredentialsAuth(
        client_id=os.getenv('SF_CLIENT_ID'),
        client_secret=os.getenv('SF_CLIENT_SECRET'),
        instance_url=os.getenv('SF_INSTANCE_URL'),
    )
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        print(f"✅ Connected to: {sf.instance_url}")

        sobjects = await sf.describe.list_sobjects()
        print(sobjects[0]["name"])

        contact_describe = await sf.describe.sobject("Contact")

        # retrieve first 5 "creatable" fields on contact
        queryable_fields = [
            field.get("name", "")
            for field in contact_describe["fields"]
            if field.get("createable")
        ][:5]

        query = f"SELECT {', '.join(queryable_fields)} FROM Contact LIMIT 5"
        print(query)

        query_result = await sf.query.soql(query)
        # Loop over records using async iteration
        # or: await query_result.collect_all() to collect all records into a list
        async for record in query_result:
            print(record.get("AccountId"))

        # Create a new Account
        await sf.collections.insert(
            sobject_type="Account",
            records=[{"Name": "Test Account"}]
        )

asyncio.run(main())

Collections API - Batch Operations

Bulk operations (insert, update, upsert, delete) with automatic batching and concurrency.

Basic Usage:

async with SalesforceClient(auth_strategy=auth) as sf:
    records = [{"Name": f"Account {i}"} for i in range(1000)]
    
    results = await sf.collections.insert(records, sobject_type="Account")
    # Also: update(), upsert(), delete()

Advanced - With Retries, Concurrency Scaling, and Progress:

from aio_sf.api.collections import ResultInfo

async def on_result(info: ResultInfo):
    # Called after each batch completes with successes and errors split
    print(
        f"Batch: {len(info['successes'])} succeeded, {len(info['errors'])} failed | "
        f"Attempt {info['current_attempt']}, "
        f"Overall: {info['records_succeeded']} OK, {info['records_failed']} failed, "
        f"{info['records_pending']} pending"
    )
    # Inspect errors (includes both API errors and HTTP failures)
    for error in info['errors']:
        print(f"  Error: {error['errors']}")

async with SalesforceClient(auth_strategy=auth) as sf:
    results = await sf.collections.insert(
        records=records,
        sobject_type="Account",
        batch_size=[200, 100, 25],         # Shrink batch size on retry
        max_concurrent_batches=[5, 3, 1],  # Reduce concurrency on retry
        max_attempts=5,                    # Retry up to 5 times
        on_result=on_result,               # Callback with results
    )

Exporter

The Exporter library contains a streamlined and "opinionated" way to export data from Salesforce to various formats.

3. Export to Parquet

# With full installation (default), you can import directly from aio_sf
from aio_sf import SalesforceClient, ClientCredentialsAuth, bulk_query, write_query_to_parquet

# Or import from the exporter module (both work)
# from aio_sf.exporter import bulk_query, write_query_to_parquet

async def main():
    # ... authentication code from above ...
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        # Query with proper schema
        query_result = await bulk_query(
            sf=sf,
            soql_query="SELECT Id, Name, Email, CreatedDate FROM Contact"
        )
        
        # Export to Parquet
        write_query_to_parquet(
            query_result=query_result,
            file_path="contacts.parquet"
        )
        
        print(f"✅ Exported {len(query_result)} contacts to Parquet")

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_sf-0.1.0b12.tar.gz (95.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_sf-0.1.0b12-py3-none-any.whl (45.1 kB view details)

Uploaded Python 3

File details

Details for the file aio_sf-0.1.0b12.tar.gz.

File metadata

  • Download URL: aio_sf-0.1.0b12.tar.gz
  • Upload date:
  • Size: 95.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b12.tar.gz
Algorithm Hash digest
SHA256 062e10395690f650936019f008877ac80eec63a1ff1af346ab7d149c4c3b0bfd
MD5 e48627b11f16db63998c1d6777d8fb8d
BLAKE2b-256 9f1d20718b18b0f39bbeb8bf22b3836e4b8207095042971b217004f84dd7a42c

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b12.tar.gz:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aio_sf-0.1.0b12-py3-none-any.whl.

File metadata

  • Download URL: aio_sf-0.1.0b12-py3-none-any.whl
  • Upload date:
  • Size: 45.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b12-py3-none-any.whl
Algorithm Hash digest
SHA256 2d822203b779d1c9986f2e9ce10fefba51bc3ba7516893262a1ded4c71f4b38b
MD5 d7d9f27548bbe9d3ff9122c962d7ee39
BLAKE2b-256 971ee6af082383afee00d044a5befb19df1b4f3aae9c286ca63f4cab3752913e

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b12-py3-none-any.whl:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page