Dagster integration library for Salesforce
Project description
dagster-salesforce
A Dagster integration for Salesforce that provides resources for interacting with the Salesforce API, including querying data, managing records, and performing bulk operations.
Features
- Multiple authentication methods: Password, OAuth 2.0, JWT Bearer Token, and Session-based authentication
- Bulk operations: Efficiently handle large-scale data operations using Salesforce Bulk API 2.0
- Query support: Execute SOQL queries and export results to CSV
- CRUD operations: Create, read, update, and delete individual records
- Upsert support: Update or insert records based on external IDs
- Object metadata: Access Salesforce object schemas and field information
- Type-safe: Full type hints and Pydantic models for data validation
Installation
uv pip install dagster-salesforce
Example Usage
Password Authentication
The simplest authentication method using username, password, and security token:
import dagster as dg
from dagster_salesforce import SalesforceResource
from dagster_salesforce.credentials import SalesforceUserPasswordCredentials
from pathlib import Path
@dg.asset
def salesforce_accounts(context: dg.AssetExecutionContext, salesforce: SalesforceResource):
"""Download all Account records from Salesforce."""
# Get the Account object client
account_obj = salesforce.get_object_client("Account")
# Query accounts to CSV
output_dir = Path("/tmp/salesforce_accounts")
results = account_obj.query_to_csv(
query="SELECT Id, Name, Type, Industry FROM Account",
output_directory=output_dir,
batch_size=10000
)
context.log.info(f"Downloaded {sum(r.number_of_records for r in results)} accounts")
return {"output_files": [r.file for r in results if r.file]}
defs = dg.Definitions(
assets=[salesforce_accounts],
resources={
"salesforce": SalesforceResource(
credentials=SalesforceUserPasswordCredentials(
username=dg.EnvVar("SALESFORCE_USERNAME"),
password=dg.EnvVar("SALESFORCE_PASSWORD"),
security_token=dg.EnvVar("SALESFORCE_SECURITY_TOKEN"),
domain="login" # or "test" for sandbox
)
)
}
)
JWT Bearer Token Authentication (Recommended for Production)
For production environments, JWT authentication provides secure, token-based access:
import dagster as dg
from dagster_salesforce import SalesforceResource
from dagster_salesforce.credentials import SalesforceJWTOAuthCredentials
from pathlib import Path
@dg.asset
def salesforce_contacts(salesforce: SalesforceResource):
"""Upsert contacts into Salesforce."""
contact_obj = salesforce.get_object_client("Contact")
# Prepare records to upsert
records = [
{"Email": "john.doe@example.com", "FirstName": "John", "LastName": "Doe"},
{"Email": "jane.smith@example.com", "FirstName": "Jane", "LastName": "Smith"},
]
# Upsert using email as external ID
result = contact_obj.upsert_records(
result_output_directory=Path("/tmp/upsert_results"),
records=records,
external_id_field="Email",
wait=10
)
return {
"successful": result.successfulRecords,
"failed": result.failedRecords,
"files": result.files
}
defs = dg.Definitions(
assets=[salesforce_contacts],
resources={
"salesforce": SalesforceResource(
credentials=SalesforceJWTOAuthCredentials(
username=dg.EnvVar("SALESFORCE_USERNAME"),
consumer_key=dg.EnvVar("SALESFORCE_CONSUMER_KEY"),
privatekey_file=dg.EnvVar("SALESFORCE_PRIVATE_KEY_PATH")
# Or use privatekey=dg.EnvVar("SALESFORCE_PRIVATE_KEY") for key content
)
)
}
)
OAuth 2.0 Connected App Authentication
For web applications using OAuth 2.0 flow:
import dagster as dg
from dagster_salesforce import SalesforceResource
from dagster_salesforce.credentials import SalesforceConnectedAppOAuthCredentials
defs = dg.Definitions(
resources={
"salesforce": SalesforceResource(
credentials=SalesforceConnectedAppOAuthCredentials(
consumer_key=dg.EnvVar("SALESFORCE_CONSUMER_KEY"),
consumer_secret=dg.EnvVar("SALESFORCE_CONSUMER_SECRET"),
domain="login"
)
)
}
)
Direct Session Authentication
For using an existing session token:
import dagster as dg
from dagster_salesforce import SalesforceResource
from dagster_salesforce.credentials import SalesforceSessionCredentials
defs = dg.Definitions(
resources={
"salesforce": SalesforceResource(
credentials=SalesforceSessionCredentials(
session_id=dg.EnvVar("SALESFORCE_SESSION_ID"),
instance_url="https://your-instance.salesforce.com"
)
)
}
)
Working with Salesforce Objects
Query Data
import dagster as dg
from dagster_salesforce import SalesforceResource
from datetime import datetime, timedelta
from pathlib import Path
@dg.asset
def query_opportunities(salesforce: SalesforceResource):
"""Query opportunities with filtering."""
opportunity_obj = salesforce.get_object_client("Opportunity")
# Build query with date filter
last_month = datetime.now() - timedelta(days=30)
query = opportunity_obj.get_query(
incremental_key="LastModifiedDate",
incremental_from=last_month,
limit=1000
)
# Execute query and save to CSV
results = opportunity_obj.query_to_csv(
query=query,
output_directory=Path("/tmp/opportunities")
)
return results
Create, Update, and Delete Records
import dagster as dg
from dagster_salesforce import SalesforceResource
@dg.asset
def manage_leads(salesforce: SalesforceResource):
"""Demonstrate CRUD operations on Lead records."""
lead_obj = salesforce.get_object_client("Lead")
# Create a new lead
lead_id = lead_obj.create_record({
"FirstName": "Test",
"LastName": "Lead",
"Company": "Test Company",
"Email": "test@example.com"
})
# Update the lead
lead_obj.update_record(lead_id, {
"Status": "Qualified",
"Rating": "Hot"
})
# Delete the lead (if needed)
# lead_obj.delete_record(lead_id)
return {"created_lead_id": lead_id}
Bulk Operations
import dagster as dg
from dagster_salesforce import SalesforceResource
from pathlib import Path
@dg.asset
def bulk_import_accounts(salesforce: SalesforceResource):
"""Import accounts using Bulk API 2.0."""
account_obj = salesforce.get_object_client("Account")
# For large datasets, you can use CSV files
csv_file = Path("/path/to/accounts.csv")
result = account_obj.upsert_records(
result_output_directory=Path("/tmp/bulk_results"),
csv_file=str(csv_file),
external_id_field="External_Id__c",
batch_size=10000, # Salesforce automatically handles batching
wait=30
)
return {
"total_processed": result.successfulRecords + result.failedRecords,
"success_rate": result.successfulRecords / (result.successfulRecords + result.failedRecords)
if (result.successfulRecords + result.failedRecords) > 0 else 0
}
Access Field Metadata
import dagster as dg
from dagster_salesforce import SalesforceResource
@dg.asset
def analyze_object_schema(salesforce: SalesforceResource):
"""Analyze Salesforce object schema and fields."""
# Get Account object metadata
account_obj = salesforce.get_object_client("Account")
# Access field information
field_summary = {}
for field_name, field in account_obj.fields.items():
if field.custom: # Only custom fields
field_summary[field_name] = {
"label": field.label,
"type": field.type,
"required": not field.nillable,
"updateable": field.updateable
}
return field_summary
Notes
- Bulk API 2.0: This integration uses Salesforce Bulk API 2.0, which automatically handles internal batching. When uploading data, Salesforce automatically splits it into batches of 10,000 records.
- Rate Limits: The resource includes built-in handling for Salesforce API rate limits. Use
get_limits()to monitor your API usage. - Field Types: The integration preserves Salesforce field types and handles type conversions appropriately.
- Error Handling: Failed records in bulk operations are captured and reported in the result objects.
API Reference
SalesforceResource
The main resource for interacting with Salesforce:
get_object_client(sobject: str) -> SalesforceObject: Get a client for a specific Salesforce objectget_available_objects() -> List[str]: List all available Salesforce objectsget_limits() -> Dict: Get API usage limitscheck_connection() -> Dict: Verify connection and get instance information
SalesforceObject
Represents a Salesforce object (table) with methods for data operations:
query_to_csv(query: str, output_directory: Path, batch_size: int) -> List[SalesforceQueryResult]: Execute SOQL query and save results to CSV filescreate_record(data: Dict) -> str: Create a single recordupdate_record(record_id: str, data: Dict): Update a single recorddelete_record(record_id: str): Delete a single recordupsert_records(...): Bulk upsert operationget_query(incremental_key: str, incremental_from: datetime, limit: int) -> str: Build SOQL query with filters
Credentials Classes
SalesforceUserPasswordCredentials: Username/password authenticationSalesforceJWTOAuthCredentials: JWT Bearer Token authenticationSalesforceConnectedAppOAuthCredentials: OAuth 2.0 Connected App authenticationSalesforceSessionCredentials: Direct session access
Testing
# Run tests
make test
# Run linting and formatting
make ruff
# Run type checking
make check
Development
# Install development dependencies
make install
# Build the package
make build
License
See LICENSE file in the repository.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_salesforce-0.0.2.tar.gz.
File metadata
- Download URL: dagster_salesforce-0.0.2.tar.gz
- Upload date:
- Size: 13.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
972b6dd93c352c1ef00d6bcbb0c9793e16e340a7577c7d600efb73010f49ca35
|
|
| MD5 |
14b6c43333c9728862e531b08c8357cb
|
|
| BLAKE2b-256 |
2f32858f361b57aeb55731bc4dc3c86c8ae1278a1875dc783afb285378057cb7
|
File details
Details for the file dagster_salesforce-0.0.2-py3-none-any.whl.
File metadata
- Download URL: dagster_salesforce-0.0.2-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06050b026c0fbd9b735182840632a4346e30d24027036f3c5e2bf61c6a64ba5d
|
|
| MD5 |
349303c82bc29b5950d30a3abfbb704d
|
|
| BLAKE2b-256 |
d51d5d61b2c567153d6206ea555997a781555a5505024c03c6031528085c6db3
|