Salesforce connector for aisquare.pipe
Project description
aisquare-pipe-salesforce
Salesforce source and sink connectors for aisquare.pipe. Ships with the aisquare-pipe[full] extras bundle.
Install
pip install aisquare-pipe-salesforce
aisquare-pipe is pulled in transitively — you don't need to install it separately.
For development (from the addon directory):
cd connectors/salesforce
pip install -e ".[dev]"
Configuration
Two authentication flows are supported. The connector picks the right one based on which keys are present in config.
# Flow 1: Username + password + security token (quick start / sandboxes)
config = {
"username": "you@example.com",
"password": "your-password",
"security_token": "XXXX",
"domain": "login", # optional — "test" for sandboxes, default "login"
}
# Flow 2: OAuth2 refresh-token (recommended for production)
config = {
"client_id": "CONNECTED_APP_CONSUMER_KEY",
"client_secret": "CONNECTED_APP_CONSUMER_SECRET",
"refresh_token": "REFRESH_TOKEN",
"instance_url": "https://your-org.my.salesforce.com",
}
Usage
Pull records (generic SOQL — any standard or custom SObject)
from aisquare.pipe import PullParams
from aisquare_pipe_salesforce import SalesforceSource
source = SalesforceSource()
params = PullParams(params={
"object_type": "Account", # required
"fields": ["Id", "Name", "Industry"], # optional, default: Id/Name/CreatedDate/LastModifiedDate
"where": "Industry = 'Technology'", # optional
"order_by": "CreatedDate DESC", # optional
"limit": 100, # optional
"modified_after": "2024-01-01T00:00:00Z", # optional — incremental sync
})
for envelope in source.pull(config, params):
print(envelope.metadata["salesforce_id"], envelope.data)
Need a more complex query? Pass soql directly and the connector will use it verbatim:
params = PullParams(params={
"object_type": "Account", # still required for metadata tagging
"soql": "SELECT Id, Name, (SELECT Id FROM Contacts) FROM Account LIMIT 10",
})
Push records (insert / update / upsert)
from aisquare.pipe import DataEnvelope, PushParams
from aisquare_pipe_salesforce import SalesforceSink
sink = SalesforceSink()
# Insert (no salesforce_id in metadata)
envelope = DataEnvelope(
content_type="application/json",
data={"Name": "Acme Corp", "Industry": "Technology"},
source_id="my-app",
metadata={"object_type": "Account"},
)
result = sink.push(envelope, config)
print(f"Created Account: {result.ref}")
# Update (salesforce_id in metadata → inferred as update)
envelope = DataEnvelope(
content_type="application/json",
data={"Industry": "Healthcare"},
source_id="my-app",
metadata={"object_type": "Account", "salesforce_id": "0011x0000XXXXXX"},
)
sink.push(envelope, config)
# Upsert by external id
envelope = DataEnvelope(
content_type="application/json",
data={"Name": "Acme Corp", "Industry": "Tech"},
source_id="my-app",
metadata={"object_type": "Account", "external_id_field": "External_Id__c"},
)
sink.push(envelope, config, PushParams(params={"external_id_value": "ext-123"}))
Pipeline (Salesforce → local)
from aisquare.pipe import Pipeline, PullParams
result = Pipeline(source=SalesforceSource(), sink=LocalSink()).run(
{
"salesforce-source": config,
"local-sink": {"output_dir": "./accounts"},
},
pull_params=PullParams(params={"object_type": "Account", "limit": 50}),
)
Features
- Generic SObject support — Account, Contact, Lead, Opportunity, custom objects (
Foo__c), all via one connector - Dual auth — username/password+token (dev) and OAuth2 refresh-token (production)
- Incremental sync —
modified_afterparam appendsLastModifiedDate > <ts>to the SOQL - SOQL escape hatch — pass
soqlverbatim for joins, sub-queries, aggregates - Insert / update / upsert dispatch — inferred from metadata, or set explicitly via
params["operation"] - Rate-limit-aware — exponential backoff on
REQUEST_LIMIT_EXCEEDED/ HTTP 503
Notes
- For very large pulls, set a reasonable
limitand usemodified_afterfor incremental syncs to avoid hitting per-call governor limits. - OAuth2 access tokens are exchanged at client creation; for long-running pulls that span the access-token TTL, re-create the source.
- Salesforce
attributeskeys (the SDK's metadata noise) are stripped from each record before yielding.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aisquare_pipe_salesforce-0.1.0.tar.gz.
File metadata
- Download URL: aisquare_pipe_salesforce-0.1.0.tar.gz
- Upload date:
- Size: 13.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c9a1c64e8d6c1172a8936bbeef164854a9cfda589f44b9d26c235fe9076275ad
|
|
| MD5 |
8f063353dcb24d3edb5ac9292800de20
|
|
| BLAKE2b-256 |
dd24384cd068ae559cf66e70161897f89588806de3948482da74562da24d6103
|
Provenance
The following attestation bundles were made for aisquare_pipe_salesforce-0.1.0.tar.gz:
Publisher:
publish.yml on AISquare-Studio/pipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aisquare_pipe_salesforce-0.1.0.tar.gz -
Subject digest:
c9a1c64e8d6c1172a8936bbeef164854a9cfda589f44b9d26c235fe9076275ad - Sigstore transparency entry: 1973384426
- Sigstore integration time:
-
Permalink:
AISquare-Studio/pipe@59f471b3275a13163851e93c0ad05a381d77c63d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/AISquare-Studio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@59f471b3275a13163851e93c0ad05a381d77c63d -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file aisquare_pipe_salesforce-0.1.0-py3-none-any.whl.
File metadata
- Download URL: aisquare_pipe_salesforce-0.1.0-py3-none-any.whl
- Upload date:
- Size: 10.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
157c25cfb03ad00dea6a65465d092ce3f5926fe817be1d71848c8ccfc1ad0142
|
|
| MD5 |
408201372483445e6019b534c154772e
|
|
| BLAKE2b-256 |
5342eb84bdeeb7eb8cab344313904150cc63a4957b91abf77f5800fa6f16a8c5
|
Provenance
The following attestation bundles were made for aisquare_pipe_salesforce-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on AISquare-Studio/pipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aisquare_pipe_salesforce-0.1.0-py3-none-any.whl -
Subject digest:
157c25cfb03ad00dea6a65465d092ce3f5926fe817be1d71848c8ccfc1ad0142 - Sigstore transparency entry: 1973384491
- Sigstore integration time:
-
Permalink:
AISquare-Studio/pipe@59f471b3275a13163851e93c0ad05a381d77c63d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/AISquare-Studio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@59f471b3275a13163851e93c0ad05a381d77c63d -
Trigger Event:
workflow_dispatch
-
Statement type: