Skip to main content

Salesforce connector for aisquare.pipe

Project description

aisquare-pipe-salesforce

Salesforce source and sink connectors for aisquare.pipe. Ships with the aisquare-pipe[full] extras bundle.

Install

pip install aisquare-pipe-salesforce

aisquare-pipe is pulled in transitively — you don't need to install it separately.

For development (from the addon directory):

cd connectors/salesforce
pip install -e ".[dev]"

Configuration

Two authentication flows are supported. The connector picks the right one based on which keys are present in config.

# Flow 1: Username + password + security token (quick start / sandboxes)
config = {
    "username": "you@example.com",
    "password": "your-password",
    "security_token": "XXXX",
    "domain": "login",       # optional — "test" for sandboxes, default "login"
}

# Flow 2: OAuth2 refresh-token (recommended for production)
config = {
    "client_id":     "CONNECTED_APP_CONSUMER_KEY",
    "client_secret": "CONNECTED_APP_CONSUMER_SECRET",
    "refresh_token": "REFRESH_TOKEN",
    "instance_url":  "https://your-org.my.salesforce.com",
}

Usage

Pull records (generic SOQL — any standard or custom SObject)

from aisquare.pipe import PullParams
from aisquare_pipe_salesforce import SalesforceSource

source = SalesforceSource()
params = PullParams(params={
    "object_type": "Account",                 # required
    "fields": ["Id", "Name", "Industry"],     # optional, default: Id/Name/CreatedDate/LastModifiedDate
    "where": "Industry = 'Technology'",       # optional
    "order_by": "CreatedDate DESC",           # optional
    "limit": 100,                             # optional
    "modified_after": "2024-01-01T00:00:00Z", # optional — incremental sync
})

for envelope in source.pull(config, params):
    print(envelope.metadata["salesforce_id"], envelope.data)

Need a more complex query? Pass soql directly and the connector will use it verbatim:

params = PullParams(params={
    "object_type": "Account",  # still required for metadata tagging
    "soql": "SELECT Id, Name, (SELECT Id FROM Contacts) FROM Account LIMIT 10",
})

Push records (insert / update / upsert)

from aisquare.pipe import DataEnvelope, PushParams
from aisquare_pipe_salesforce import SalesforceSink

sink = SalesforceSink()

# Insert (no salesforce_id in metadata)
envelope = DataEnvelope(
    content_type="application/json",
    data={"Name": "Acme Corp", "Industry": "Technology"},
    source_id="my-app",
    metadata={"object_type": "Account"},
)
result = sink.push(envelope, config)
print(f"Created Account: {result.ref}")

# Update (salesforce_id in metadata → inferred as update)
envelope = DataEnvelope(
    content_type="application/json",
    data={"Industry": "Healthcare"},
    source_id="my-app",
    metadata={"object_type": "Account", "salesforce_id": "0011x0000XXXXXX"},
)
sink.push(envelope, config)

# Upsert by external id
envelope = DataEnvelope(
    content_type="application/json",
    data={"Name": "Acme Corp", "Industry": "Tech"},
    source_id="my-app",
    metadata={"object_type": "Account", "external_id_field": "External_Id__c"},
)
sink.push(envelope, config, PushParams(params={"external_id_value": "ext-123"}))

Pipeline (Salesforce → local)

from aisquare.pipe import Pipeline, PullParams

result = Pipeline(source=SalesforceSource(), sink=LocalSink()).run(
    {
        "salesforce-source": config,
        "local-sink": {"output_dir": "./accounts"},
    },
    pull_params=PullParams(params={"object_type": "Account", "limit": 50}),
)

Features

  • Generic SObject support — Account, Contact, Lead, Opportunity, custom objects (Foo__c), all via one connector
  • Dual auth — username/password+token (dev) and OAuth2 refresh-token (production)
  • Incremental syncmodified_after param appends LastModifiedDate > <ts> to the SOQL
  • SOQL escape hatch — pass soql verbatim for joins, sub-queries, aggregates
  • Insert / update / upsert dispatch — inferred from metadata, or set explicitly via params["operation"]
  • Rate-limit-aware — exponential backoff on REQUEST_LIMIT_EXCEEDED / HTTP 503

Notes

  • For very large pulls, set a reasonable limit and use modified_after for incremental syncs to avoid hitting per-call governor limits.
  • OAuth2 access tokens are exchanged at client creation; for long-running pulls that span the access-token TTL, re-create the source.
  • Salesforce attributes keys (the SDK's metadata noise) are stripped from each record before yielding.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aisquare_pipe_salesforce-0.1.0.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aisquare_pipe_salesforce-0.1.0-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file aisquare_pipe_salesforce-0.1.0.tar.gz.

File metadata

  • Download URL: aisquare_pipe_salesforce-0.1.0.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for aisquare_pipe_salesforce-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c9a1c64e8d6c1172a8936bbeef164854a9cfda589f44b9d26c235fe9076275ad
MD5 8f063353dcb24d3edb5ac9292800de20
BLAKE2b-256 dd24384cd068ae559cf66e70161897f89588806de3948482da74562da24d6103

See more details on using hashes here.

Provenance

The following attestation bundles were made for aisquare_pipe_salesforce-0.1.0.tar.gz:

Publisher: publish.yml on AISquare-Studio/pipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aisquare_pipe_salesforce-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for aisquare_pipe_salesforce-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 157c25cfb03ad00dea6a65465d092ce3f5926fe817be1d71848c8ccfc1ad0142
MD5 408201372483445e6019b534c154772e
BLAKE2b-256 5342eb84bdeeb7eb8cab344313904150cc63a4957b91abf77f5800fa6f16a8c5

See more details on using hashes here.

Provenance

The following attestation bundles were made for aisquare_pipe_salesforce-0.1.0-py3-none-any.whl:

Publisher: publish.yml on AISquare-Studio/pipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page