Skip to main content

Async Salesforce library for Python

Project description

aio-sf

An async Salesforce library for Python.

Features

✅ Supported APIs

  • Bulk API 2.0 - Efficient querying of large datasets
  • Describe API - Field metadata and object descriptions
  • SOQL Query API - Standard Salesforce queries
  • SObjects Collections API - CRUD on collections of SObjects (up to 2000 records at a time)
  • Tooling API - Development and deployment tools
  • Bulk API 1.0 - Legacy bulk operations
  • Streaming API - Real-time event streaming

✅ Supported Authentication Strategies

  • OAuth Client Credentials - Automatic authentication
  • Static Token - Existing access tokens
  • Refresh Token - Refresh token flow
  • SFDX CLI - Login by grabbing a token from the SFDX CLI
  • Password Authentication - Password + ST authentication (soap login)

🚀 Export Features

  • Parquet Export - Efficient columnar storage with schema mapping
  • CSV Export - Simple text format export
  • Resume Support - Resume interrupted queries using job IDs
  • Streaming Processing - Memory-efficient processing of large datasets

Installation

Full Package (Default - Includes Everything)

uv add aio-sf
# or: pip install aio-sf

Core Only (Minimal Dependencies)

uv add "aio-sf[core]"
# or: pip install "aio-sf[core]"

Quick Start

Authentication & Connection

import asyncio
import os
from aio_sf import SalesforceClient, ClientCredentialsAuth

async def main():
    auth = ClientCredentialsAuth(
        client_id=os.getenv('SF_CLIENT_ID'),
        client_secret=os.getenv('SF_CLIENT_SECRET'),
        instance_url=os.getenv('SF_INSTANCE_URL'),
    )
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        print(f"✅ Connected to: {sf.instance_url}")

        sobjects = await sf.describe.list_sobjects()
        print(sobjects[0]["name"])

        contact_describe = await sf.describe.sobject("Contact")

        # retrieve first 5 "creatable" fields on contact
        queryable_fields = [
            field.get("name", "")
            for field in contact_describe["fields"]
            if field.get("createable")
        ][:5]

        query = f"SELECT {', '.join(queryable_fields)} FROM Contact LIMIT 5"
        print(query)

        query_result = await sf.query.soql(query)
        # Loop over records using async iteration
        # or: await query_result.collect_all() to collect all records into a list
        async for record in query_result:
            print(record.get("AccountId"))

        # Create a new Account
        await sf.collections.insert(
            sobject_type="Account",
            records=[{"Name": "Test Account"}]
        )

asyncio.run(main())

Collections API - Batch Operations

Bulk operations (insert, update, upsert, delete) with automatic batching and concurrency.

Basic Usage:

async with SalesforceClient(auth_strategy=auth) as sf:
    records = [{"Name": f"Account {i}"} for i in range(1000)]
    
    results = await sf.collections.insert(records, sobject_type="Account")
    # Also: update(), upsert(), delete()

Advanced - With Retries, Concurrency Scaling, and Progress:

from aio_sf.api.collections import ResultInfo

async def on_result(info: ResultInfo):
    # Called after each batch completes with successes and errors split
    print(
        f"Batch: {len(info['successes'])} succeeded, {len(info['errors'])} failed | "
        f"Attempt {info['current_attempt']}, "
        f"Overall: {info['records_succeeded']} OK, {info['records_failed']} failed, "
        f"{info['records_pending']} pending"
    )
    # Inspect errors (includes both API errors and HTTP failures)
    for error in info['errors']:
        print(f"  Error: {error['errors']}")

async with SalesforceClient(auth_strategy=auth) as sf:
    results = await sf.collections.insert(
        records=records,
        sobject_type="Account",
        batch_size=[200, 100, 25],         # Shrink batch size on retry
        max_concurrent_batches=[5, 3, 1],  # Reduce concurrency on retry
        max_attempts=5,                    # Retry up to 5 times
        on_result=on_result,               # Callback with results
    )

Exporter

The Exporter library contains a streamlined and "opinionated" way to export data from Salesforce to various formats.

3. Export to Parquet

# With full installation (default), you can import directly from aio_sf
from aio_sf import SalesforceClient, ClientCredentialsAuth, bulk_query, write_query_to_parquet

# Or import from the exporter module (both work)
# from aio_sf.exporter import bulk_query, write_query_to_parquet

async def main():
    # ... authentication code from above ...
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        # Query with proper schema
        query_result = await bulk_query(
            sf=sf,
            soql_query="SELECT Id, Name, Email, CreatedDate FROM Contact"
        )
        
        # Export to Parquet
        write_query_to_parquet(
            query_result=query_result,
            file_path="contacts.parquet"
        )
        
        print(f"✅ Exported {len(query_result)} contacts to Parquet")

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_sf-0.1.0b10.tar.gz (95.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_sf-0.1.0b10-py3-none-any.whl (45.0 kB view details)

Uploaded Python 3

File details

Details for the file aio_sf-0.1.0b10.tar.gz.

File metadata

  • Download URL: aio_sf-0.1.0b10.tar.gz
  • Upload date:
  • Size: 95.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b10.tar.gz
Algorithm Hash digest
SHA256 c81f4256c7c6e3bacb37d1b97a3c774730ae617f9544b2bbae9e7df796c24983
MD5 98b7f1edeef35438d50544ce9b721b5f
BLAKE2b-256 3f54898b88e424b97cdeaac7e91e16795b4810332ed127b14da70438c402d3f2

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b10.tar.gz:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aio_sf-0.1.0b10-py3-none-any.whl.

File metadata

  • Download URL: aio_sf-0.1.0b10-py3-none-any.whl
  • Upload date:
  • Size: 45.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b10-py3-none-any.whl
Algorithm Hash digest
SHA256 cf5d4d30e8def1e7953179f4c8298d0bfe773ff60f9e0abb7cd484da8814b50b
MD5 b4f2375b574d4f89b13ebe2421c87cdf
BLAKE2b-256 abb817d0fc146570711f3868d43f5dabf075334ff1250d3c2629a03de65163e1

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b10-py3-none-any.whl:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page