Skip to main content

A Python SDK for the Nexla API

Project description

Nexla Python SDK

A Python SDK for interacting with the Nexla API.

Installation

pip install nexla-sdk

Authentication

The Nexla SDK requires a Service Key for authentication. You can create a service key from the Nexla UI:

  1. Go to your Nexla UI instance (e.g., https://dataops.nexla.io)
  2. Navigate to the Authentication screen in the Settings section
  3. Click the Create Service Key button
  4. Store the service key securely - it should be treated as highly sensitive since it is equivalent to your account password

Quick Start

from nexla_sdk import NexlaClient

# Initialize the client with your service key
client = NexlaClient(service_key="your_nexla_service_key")

# List flows — returns a list with one FlowResponse that contains flow nodes
flow_responses = client.flows.list()
for flow_response in flow_responses:
    for flow in flow_response.flows:
        print(f"Flow node: {flow.name}, ID: {flow.id}")

# List sources - returns a list of Source objects
sources = client.sources.list()
for source in sources:
    print(f"Source name: {source.name}, ID: {source.id}")

# Get a specific source - returns a Source object
source = client.sources.get(source_id)
print(f"Source details: {source.name}, type: {source.source_type}")

# Create a credential
credential_data = {
    "name": "My S3 Credential",
    "credentials_type": "s3",
    "credentials": {
        "access_key_id": "your_access_key",
        "secret_access_key": "your_secret_key", 
        "region": "us-east-1"
    }
}
credential = client.credentials.create(credential_data)
print(f"Created credential: {credential.id}")

# Create a data source
source_data = {
    "name": "My New Source",
    "description": "Created via SDK",
    "source_type": "s3",
    "data_credentials_id": credential.id,
    "source_config": {
        "path": "bucket/path/",
        "file_format": "json"
    }
}
new_source = client.sources.create(source_data)
print(f"Created source: {new_source.id}, name: {new_source.name}")

Authentication Methods

The SDK supports two authentication methods:

1. Service Key Authentication (Recommended)

Service keys are long-lived credentials that obtain session tokens on demand (no refresh endpoint is used):

client = NexlaClient(service_key="your_service_key")

# Or use environment variables
# export NEXLA_SERVICE_KEY="your_service_key"
# Optional: export NEXLA_API_URL="https://your-nexla-instance.com/nexla-api"
client = NexlaClient()

2. Direct Access Token Authentication

For temporary access using pre-obtained tokens (no refresh available):

client = NexlaClient(access_token="your_access_token")

# Or use environment variables
# export NEXLA_ACCESS_TOKEN="your_access_token"
# Optional: export NEXLA_API_URL="https://your-nexla-instance.com/nexla-api"
client = NexlaClient()

Core Resources

Credentials

# Create a credential
credential_data = {
    "name": "My S3 Credential",
    "credentials_type": "s3",
    "credentials": {
        "access_key_id": "xxx",
        "secret_access_key": "xxx",
        "region": "us-east-1"
    }
}
credential = client.credentials.create(credential_data)

# Test credential
probe_result = client.credentials.probe(credential.id)
print(f"Credential valid: {probe_result.get('status') in ('ok', 'success')}")

# Get credential tree structure
from nexla_sdk.models.credentials.requests import ProbeTreeRequest
tree_request = ProbeTreeRequest(depth=1, path="/")
tree = client.credentials.probe_tree(credential.id, tree_request)

# Get sample data
from nexla_sdk.models.credentials.requests import ProbeSampleRequest
sample_request = ProbeSampleRequest(path="/data/file.json")
sample = client.credentials.probe_sample(credential.id, sample_request)

Sources

# Create a source
source_data = {
    "name": "My Data Source",
    "source_type": "s3",
    "data_credentials_id": credential.id,
    "source_config": {
        "path": "bucket/path/",
        "file_format": "json"
    }
}
source = client.sources.create(source_data)

# Activate source
activated_source = client.sources.activate(source.id)

# Copy source
from nexla_sdk.models.sources.requests import SourceCopyOptions
copy_options = SourceCopyOptions(copy_access_controls=True)
copied_source = client.sources.copy(source.id, copy_options)

Nexsets (Data Sets)

# List nexsets
nexsets = client.nexsets.list()

# Get samples from a nexset
if nexsets:
    samples = client.nexsets.get_samples(
        set_id=nexsets[0].id,
        count=10,
        include_metadata=True
    )

# Create a transformed nexset
nexset_data = {
    "name": "Transformed Data",
    "parent_data_set_id": parent_nexset.id,
    "has_custom_transform": True,
    "transform": {
        "version": 1,
        "operations": [...]
    }
}
transformed = client.nexsets.create(nexset_data)

Destinations

# Create a destination
destination_data = {
    "name": "My Output",
    "sink_type": "s3",
    "data_credentials_id": credential.id,
    "data_set_id": nexset.id,
    "sink_config": {
        "path": "output/path/",
        "file_format": "parquet"
    }
}
destination = client.destinations.create(destination_data)

# Activate destination
activated_destination = client.destinations.activate(destination.id)

Flows

# Get flow by resource
flow = client.flows.get_by_resource(
    resource_type="data_sources",
    resource_id=source.id
)

# Activate entire flow
client.flows.activate(flow.flows[0].id, all=True)

# Pause flow
client.flows.pause(flow.flows[0].id, all=True)

# Copy flow
from nexla_sdk.models.flows.requests import FlowCopyOptions
copy_options = FlowCopyOptions(copy_access_controls=True)
copied_flow = client.flows.copy(flow.flows[0].id, copy_options)

Pagination

The SDK provides built-in pagination support:

# Get a paginator
paginator = client.sources.paginate(per_page=20)

# Iterate through all items
for source in paginator:
    print(source.name)

# Or iterate by pages
for page in paginator.iter_pages():
    print(f"Page {page.page_info.current_page}")
    for source in page:
        print(f"  - {source.name}")

Observability: OpenTelemetry Tracing (Optional)

You can instrument the Nexla SDK with OpenTelemetry to emit spans for each outgoing API request. Tracing is optional and a no‑op unless enabled.

  • Install optional tracing extras in your application environment:
pip install "nexla-sdk[tracing]"
# or install explicitly
pip install opentelemetry-distro opentelemetry-exporter-otlp
  • Auto‑detection: If your app configures a global tracer provider, NexlaClient auto‑enables tracing. To control explicitly, use trace_enabled=True|False.

Example setup with console exporter:

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

trace.set_tracer_provider(TracerProvider(resource=Resource({"service.name": "my-nexla-app"})))
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))

from nexla_sdk import NexlaClient

client = NexlaClient(service_key="<YOUR_SERVICE_KEY>")  # auto‑detects tracing
# or force: NexlaClient(service_key=..., trace_enabled=True)

flows = client.flows.list(page=1, per_page=1)  # emits a span

Notes:

  • Tracing is a no‑op unless OpenTelemetry is installed and enabled.
  • Spans include attributes like http.method, url.full, server.address, and http.status_code.

## Access Control

Manage access to resources:

```python
# Get current access rules
accessors = client.sources.get_accessors(source.id)

# Add user access
new_accessors = [{
    "type": "USER",
    "email": "user@example.com", 
    "access_roles": ["collaborator"]
}]
client.sources.add_accessors(source.id, new_accessors)

# Add team access
team_accessor = [{
    "type": "TEAM",
    "id": team_id,
    "access_roles": ["operator"]
}]
client.sources.add_accessors(source.id, team_accessor)

# Replace all accessors
client.sources.replace_accessors(source.id, new_accessors)

Organizations and Teams

# List organizations
orgs = client.organizations.list()

# Get organization members
members = client.organizations.get_members(org.id)

# Update organization members
from nexla_sdk.models.organizations.requests import OrgMemberList
member_list = OrgMemberList(members=[
    {"email": "new@example.com", "access_role": "admin"}
])
client.organizations.update_members(org.id, member_list)

# Create a team
team_data = {
    "name": "Data Team",
    "description": "Team for data operations"
}
team = client.teams.create(team_data)

# Add team members
from nexla_sdk.models.teams.requests import TeamMemberList
team_members = TeamMemberList(members=[
    {"email": "lead@example.com", "admin": True},
    {"email": "member@example.com", "admin": False}
])
client.teams.add_members(team.id, team_members)

Projects

# Create a project
project_data = {
    "name": "My Project",
    "description": "Data integration project"
}
project = client.projects.create(project_data)

# Add flows to project
from nexla_sdk.models.projects.requests import ProjectFlowList

# Option 1: Provide flow node IDs directly
flow_list = ProjectFlowList(flows=[flow_id])  # replace with actual flow node IDs
client.projects.add_flows(project.id, flow_list)

# Option 2: Provide resource identifiers (data flow edges)
# from nexla_sdk.models.projects.requests import ProjectFlowIdentifier
# flow_list = ProjectFlowList(data_flows=[ProjectFlowIdentifier(data_set_id=nexset.id)])
# client.projects.add_flows(project.id, flow_list)

# Get project flows
project_flows = client.projects.get_flows(project.id)

Notifications

# List unread notifications
notifications = client.notifications.list(read=0)

# Mark notification as read
client.notifications.mark_read([notification.id])

# Mark all notifications as read
client.notifications.mark_read("all")

# Get notification types
notification_types = client.notifications.get_types()

# Create notification setting
setting_data = {
    "notification_type_id": notification_type.id,
    "notification_channel_setting_id": channel_setting.id,
    "status": "ACTIVE"
}
setting = client.notifications.create_setting(setting_data)

Metrics and Monitoring

# Get daily metrics for a resource
metrics = client.metrics.get_resource_daily_metrics(
    resource_type="data_sources",
    resource_id=source.id,
    from_date="2024-01-01",
    to_date="2024-01-31"
)

# Get metrics by run
run_metrics = client.metrics.get_resource_metrics_by_run(
    resource_type="data_sources",
    resource_id=source.id,
    groupby="runId"
)

# Get rate limits
limits = client.metrics.get_rate_limits()
print(f"Rate limit: {limits}")

Advanced Features

Lookups (Data Maps)

# Create a lookup table
lookup_data = {
    "name": "Product Mapping",
    "data_type": "string",
    "map_primary_key": "sku",
    "data_map": [
        {"sku": "ABC123", "name": "Product A", "price": 99.99},
        {"sku": "XYZ789", "name": "Product B", "price": 149.99}
    ]
}
lookup = client.lookups.create(lookup_data)

# Get lookup entries
entries = client.lookups.get_entries(lookup.id, "ABC*")

# Upsert lookup entries
new_entries = [
    {"sku": "DEF456", "name": "Product C", "price": 199.99}
]
client.lookups.upsert_entries(lookup.id, new_entries)

User Management

# Get user settings for the current authenticated user
settings = client.users.get_settings()

# Get user dashboard metrics
dashboard_metrics = client.users.get_dashboard_metrics(user_id)  # replace with a valid user_id (int)

# Update quarantine settings for a user
quarantine_config = {
    "cron_schedule": "0 0 * * *",
    "path": "/quarantine/exports/"
}
client.users.create_quarantine_settings(
    user_id=user_id,  # replace with a valid user_id (int)
    data_credentials_id=credential.id,
    config=quarantine_config
)

Additional Example Features

The SDK examples cover advanced operations such as:

  • Audit logging and compliance tracking
  • User and team permissions management
  • Data transformation and schema handling
  • Metrics collection and monitoring
  • Notification systems integration
  • Quarantine settings for data validation

See the examples/api/ directory for detailed examples of these operations.

Error Handling

The SDK provides specific exception types:

from nexla_sdk.exceptions import (
    NexlaError,
    AuthenticationError,
    NotFoundError,
    ValidationError,
    RateLimitError
)

try:
    source = client.sources.get(999999)
except NotFoundError as e:
    print(f"Source not found: {e}")
except AuthenticationError as e:
    print(f"Auth failed: {e}")
except RateLimitError as e:
    print(f"Rate limited: {e}")

Resource Path Mappings

The SDK uses the following API path mappings:

  • Credentials/data_credentials
  • Sources/data_sources
  • Destinations/data_sinks
  • Nexsets/data_sets
  • Lookups/data_maps
  • Flows/flows
  • Users/users
  • Organizations/orgs
  • Teams/teams
  • Projects/projects
  • Notifications/notifications
  • Metrics → Various endpoints

Common Resource Methods

All resources support these base methods:

  • list() - List all resources
  • get(id) - Get specific resource by ID
  • create(data) - Create new resource
  • update(id, data) - Update existing resource
  • delete(id) - Delete resource
  • activate(id) - Activate resource
  • pause(id) - Pause resource
  • copy(id, options) - Copy resource
  • paginate() - Get paginated results
  • get_accessors(id) - Get access control rules
  • add_accessors(id, accessors) - Add access control rules
  • get_audit_log(id) - Get audit log entries
  • replace_accessors(id, accessors) - Replace access control rules
  • delete_accessors(id, accessors=None) - Delete some or all access control rules

Development

Running Tests

# Install development dependencies
pip install -e ".[dev]"

# Run unit tests
pytest tests/

# Run integration tests (requires API credentials)
export NEXLA_SERVICE_KEY="your_service_key"
export NEXLA_API_URL="https://your-nexla-instance.com/nexla-api"
pytest tests/integration/

Setting Up Environment

# Create .env file
cat > .env << EOF
NEXLA_SERVICE_KEY=your_service_key
NEXLA_API_URL=https://dataops.nexla.io/nexla-api
EOF

License

This project is licensed under the terms of the MIT license.

Support

For API documentation and support:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nexla_sdk-1.0.0.tar.gz (415.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nexla_sdk-1.0.0-py3-none-any.whl (68.2 kB view details)

Uploaded Python 3

File details

Details for the file nexla_sdk-1.0.0.tar.gz.

File metadata

  • Download URL: nexla_sdk-1.0.0.tar.gz
  • Upload date:
  • Size: 415.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nexla_sdk-1.0.0.tar.gz
Algorithm Hash digest
SHA256 44b59dec48776e58c83eebbf0ff090fb7a0a7cfadc25c670f6168d0116352ffc
MD5 9b1577f0f26d2dccf1b9ebabd4ad9fa4
BLAKE2b-256 a7038598fee8da52f67acd65c21fd7beffae1ab6629371783f252cda0e9c3fad

See more details on using hashes here.

Provenance

The following attestation bundles were made for nexla_sdk-1.0.0.tar.gz:

Publisher: release.yml on nexla-opensource/nexla-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file nexla_sdk-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: nexla_sdk-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 68.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for nexla_sdk-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9d5aeabd89b079d12bb142d9209821802ab1756d8aaa32c19e3be5362db9f0f7
MD5 2f21a30a6bfb856cf225ae525a3498f3
BLAKE2b-256 f6d86cb876dbb74eca31d8768344265a48fe0003e6f07af587f425091092bb40

See more details on using hashes here.

Provenance

The following attestation bundles were made for nexla_sdk-1.0.0-py3-none-any.whl:

Publisher: release.yml on nexla-opensource/nexla-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page