Skip to main content

Async Salesforce library for Python

Project description

aio-sf

An async Salesforce library for Python.

Features

✅ Supported APIs

  • Bulk API 2.0 - Efficient querying of large datasets
  • Describe API - Field metadata and object descriptions
  • SOQL Query API - Standard Salesforce queries
  • SObjects Collections API - CRUD on collections of SObjects (up to 2000 records at a time)
  • Tooling API - Development and deployment tools
  • Bulk API 1.0 - Legacy bulk operations
  • Streaming API - Real-time event streaming

✅ Supported Authentication Strategies

  • OAuth Client Credentials - Automatic authentication
  • Static Token - Existing access tokens
  • Refresh Token - Refresh token flow
  • SFDX CLI - Login by grabbing a token from the SFDX CLI
  • Password Authentication - Password + ST authentication (soap login)

🚀 Export Features

  • Parquet Export - Efficient columnar storage with schema mapping
  • CSV Export - Simple text format export
  • Resume Support - Resume interrupted queries using job IDs
  • Streaming Processing - Memory-efficient processing of large datasets

Installation

Full Package (Default - Includes Everything)

uv add aio-sf
# or: pip install aio-sf

Core Only (Minimal Dependencies)

uv add "aio-sf[core]"
# or: pip install "aio-sf[core]"

Quick Start

Authentication & Connection

import asyncio
import os
from aio_sf import SalesforceClient, ClientCredentialsAuth

async def main():
    auth = ClientCredentialsAuth(
        client_id=os.getenv('SF_CLIENT_ID'),
        client_secret=os.getenv('SF_CLIENT_SECRET'),
        instance_url=os.getenv('SF_INSTANCE_URL'),
    )
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        print(f"✅ Connected to: {sf.instance_url}")

        sobjects = await sf.describe.list_sobjects()
        print(sobjects[0]["name"])

        contact_describe = await sf.describe.sobject("Contact")

        # retrieve first 5 "creatable" fields on contact
        queryable_fields = [
            field.get("name", "")
            for field in contact_describe["fields"]
            if field.get("createable")
        ][:5]

        query = f"SELECT {', '.join(queryable_fields)} FROM Contact LIMIT 5"
        print(query)

        query_result = await sf.query.soql(query)
        # Loop over records using async iteration
        # or: await query_result.collect_all() to collect all records into a list
        async for record in query_result:
            print(record.get("AccountId"))

        # Create a new Account
        await sf.collections.insert(
            sobject_type="Account",
            records=[{"Name": "Test Account"}]
        )

asyncio.run(main())

Collections API - Batch Operations

Bulk operations (insert, update, upsert, delete) with automatic batching and concurrency.

Basic Usage:

async with SalesforceClient(auth_strategy=auth) as sf:
    records = [{"Name": f"Account {i}"} for i in range(1000)]
    
    results = await sf.collections.insert(records, sobject_type="Account")
    # Also: update(), upsert(), delete()

Advanced - With Retries, Concurrency Scaling, and Progress:

from aio_sf.api.collections import ProgressInfo

async def on_progress(info: ProgressInfo):
    print(
        f"Attempt {info['current_attempt']}: "
        f"{info['records_succeeded']} succeeded, "
        f"{info['records_failed']} failed, "
        f"{info['records_pending']} pending"
    )

async with SalesforceClient(auth_strategy=auth) as sf:
    results = await sf.collections.insert(
        records=records,
        sobject_type="Account",
        batch_size=[200, 100, 25],         # Shrink batch size on retry
        max_concurrent_batches=[5, 3, 1],  # Reduce concurrency on retry
        max_attempts=5,                    # Retry up to 5 times
        on_batch_complete=on_progress,     # Progress callback
    )

Exporter

The Exporter library contains a streamlined and "opinionated" way to export data from Salesforce to various formats.

3. Export to Parquet

# With full installation (default), you can import directly from aio_sf
from aio_sf import SalesforceClient, ClientCredentialsAuth, bulk_query, write_query_to_parquet

# Or import from the exporter module (both work)
# from aio_sf.exporter import bulk_query, write_query_to_parquet

async def main():
    # ... authentication code from above ...
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        # Query with proper schema
        query_result = await bulk_query(
            sf=sf,
            soql_query="SELECT Id, Name, Email, CreatedDate FROM Contact"
        )
        
        # Export to Parquet
        write_query_to_parquet(
            query_result=query_result,
            file_path="contacts.parquet"
        )
        
        print(f"✅ Exported {len(query_result)} contacts to Parquet")

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_sf-0.1.0b9.tar.gz (95.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_sf-0.1.0b9-py3-none-any.whl (44.8 kB view details)

Uploaded Python 3

File details

Details for the file aio_sf-0.1.0b9.tar.gz.

File metadata

  • Download URL: aio_sf-0.1.0b9.tar.gz
  • Upload date:
  • Size: 95.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b9.tar.gz
Algorithm Hash digest
SHA256 c9a4fadd70150912b5ef14b717823ce40e08538763c9b9e8315ef7598cce593a
MD5 92906df46b53bbd255e3c3a822b8f49f
BLAKE2b-256 867c857331c883236f9d770350decc396291db34ec28201e7aee5268b72d96d9

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b9.tar.gz:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aio_sf-0.1.0b9-py3-none-any.whl.

File metadata

  • Download URL: aio_sf-0.1.0b9-py3-none-any.whl
  • Upload date:
  • Size: 44.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aio_sf-0.1.0b9-py3-none-any.whl
Algorithm Hash digest
SHA256 35e9e7fb1de9bc12d618e7d9d59371f8978fc000d4e718d937983393129b1e81
MD5 f0aefbfd251c50614c42067ba471ffd1
BLAKE2b-256 6464b677f3c2e9f77d493e3e548c8584ce2472630f97f7315f26f62115d7dfcf

See more details on using hashes here.

Provenance

The following attestation bundles were made for aio_sf-0.1.0b9-py3-none-any.whl:

Publisher: publish.yml on callawaycloud/aio-salesforce

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page