Skip to main content

A Python SDK for the Nexla API

Project description

Nexla Python SDK

A Python SDK for interacting with the Nexla API.

Installation

pip install nexla-sdk

Authentication

The Nexla SDK requires a Service Key for authentication. You can create a service key from the Nexla UI:

  1. Go to your Nexla UI instance (e.g., https://dataops.nexla.io)
  2. Navigate to the Authentication screen in the Settings section
  3. Click the Create Service Key button
  4. Store the service key securely - it should be treated as highly sensitive since it is equivalent to your account password

Quick Start

from nexla_sdk import NexlaClient

# Initialize the client with your service key
client = NexlaClient(service_key="your_nexla_service_key")

# List flows — returns a list with one FlowResponse that contains flow nodes
flow_responses = client.flows.list()
for flow_response in flow_responses:
    for flow in flow_response.flows:
        print(f"Flow node: {flow.name}, ID: {flow.id}")

# List sources - returns a list of Source objects
sources = client.sources.list()
for source in sources:
    print(f"Source name: {source.name}, ID: {source.id}")

# Get a specific source - returns a Source object
source = client.sources.get(source_id)
print(f"Source details: {source.name}, type: {source.source_type}")

# Create a credential
credential_data = {
    "name": "My S3 Credential",
    "credentials_type": "s3",
    "credentials": {
        "access_key_id": "your_access_key",
        "secret_access_key": "your_secret_key", 
        "region": "us-east-1"
    }
}
credential = client.credentials.create(credential_data)
print(f"Created credential: {credential.id}")

# Create a data source
source_data = {
    "name": "My New Source",
    "description": "Created via SDK",
    "source_type": "s3",
    "data_credentials_id": credential.id,
    "source_config": {
        "path": "bucket/path/",
        "file_format": "json"
    }
}
new_source = client.sources.create(source_data)
print(f"Created source: {new_source.id}, name: {new_source.name}")

Authentication Methods

The SDK supports two authentication methods:

1. Service Key Authentication (Recommended)

Service keys are long-lived credentials that obtain session tokens on demand (no refresh endpoint is used):

client = NexlaClient(service_key="your_service_key")

# Or use environment variables
# export NEXLA_SERVICE_KEY="your_service_key"
# Optional: export NEXLA_API_URL="https://your-nexla-instance.com/nexla-api"
client = NexlaClient()

2. Direct Access Token Authentication

For temporary access using pre-obtained tokens (no refresh available):

client = NexlaClient(access_token="your_access_token")

# Or use environment variables
# export NEXLA_ACCESS_TOKEN="your_access_token"
# Optional: export NEXLA_API_URL="https://your-nexla-instance.com/nexla-api"
client = NexlaClient()

Core Resources

Credentials

# Create a credential
credential_data = {
    "name": "My S3 Credential",
    "credentials_type": "s3",
    "credentials": {
        "access_key_id": "xxx",
        "secret_access_key": "xxx",
        "region": "us-east-1"
    }
}
credential = client.credentials.create(credential_data)

# Test credential
probe_result = client.credentials.probe(credential.id)
print(f"Credential valid: {probe_result.get('status') in ('ok', 'success')}")

# Get credential tree structure
from nexla_sdk.models.credentials.requests import ProbeTreeRequest
tree_request = ProbeTreeRequest(depth=1, path="/")
tree = client.credentials.probe_tree(credential.id, tree_request)

# Get sample data
from nexla_sdk.models.credentials.requests import ProbeSampleRequest
sample_request = ProbeSampleRequest(path="/data/file.json")
sample = client.credentials.probe_sample(credential.id, sample_request)

Sources

# Create a source
source_data = {
    "name": "My Data Source",
    "source_type": "s3",
    "data_credentials_id": credential.id,
    "source_config": {
        "path": "bucket/path/",
        "file_format": "json"
    }
}
source = client.sources.create(source_data)

# Activate source
activated_source = client.sources.activate(source.id)

# Copy source
from nexla_sdk.models.sources.requests import SourceCopyOptions
copy_options = SourceCopyOptions(copy_access_controls=True)
copied_source = client.sources.copy(source.id, copy_options)

Nexsets (Data Sets)

# List nexsets
nexsets = client.nexsets.list()

# Get samples from a nexset
if nexsets:
    samples = client.nexsets.get_samples(
        set_id=nexsets[0].id,
        count=10,
        include_metadata=True
    )

# Create a transformed nexset
nexset_data = {
    "name": "Transformed Data",
    "parent_data_set_id": parent_nexset.id,
    "has_custom_transform": True,
    "transform": {
        "version": 1,
        "operations": [...]
    }
}
transformed = client.nexsets.create(nexset_data)

Destinations

# Create a destination
destination_data = {
    "name": "My Output",
    "sink_type": "s3",
    "data_credentials_id": credential.id,
    "data_set_id": nexset.id,
    "sink_config": {
        "path": "output/path/",
        "file_format": "parquet"
    }
}
destination = client.destinations.create(destination_data)

# Activate destination
activated_destination = client.destinations.activate(destination.id)

Flows

# Get flow by resource
flow = client.flows.get_by_resource(
    resource_type="data_sources",
    resource_id=source.id
)

# Activate entire flow
client.flows.activate(flow.flows[0].id, all=True)

# Pause flow
client.flows.pause(flow.flows[0].id, all=True)

# Copy flow
from nexla_sdk.models.flows.requests import FlowCopyOptions
copy_options = FlowCopyOptions(copy_access_controls=True)
copied_flow = client.flows.copy(flow.flows[0].id, copy_options)

Pagination

The SDK provides built-in pagination support:

# Get a paginator
paginator = client.sources.paginate(per_page=20)

# Iterate through all items
for source in paginator:
    print(source.name)

# Or iterate by pages
for page in paginator.iter_pages():
    print(f"Page {page.page_info.current_page}")
    for source in page:
        print(f"  - {source.name}")

Observability: OpenTelemetry Tracing (Optional)

You can instrument the Nexla SDK with OpenTelemetry to emit spans for each outgoing API request. Tracing is optional and a no‑op unless enabled.

  • Install optional tracing extras in your application environment:
pip install "nexla-sdk[tracing]"
# or install explicitly
pip install opentelemetry-distro opentelemetry-exporter-otlp
  • Auto‑detection: If your app configures a global tracer provider, NexlaClient auto‑enables tracing. To control explicitly, use trace_enabled=True|False.

Example setup with console exporter:

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

trace.set_tracer_provider(TracerProvider(resource=Resource({"service.name": "my-nexla-app"})))
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))

from nexla_sdk import NexlaClient

client = NexlaClient(service_key="<YOUR_SERVICE_KEY>")  # auto‑detects tracing
# or force: NexlaClient(service_key=..., trace_enabled=True)

flows = client.flows.list(page=1, per_page=1)  # emits a span

Notes:

  • Tracing is a no‑op unless OpenTelemetry is installed and enabled.
  • Spans include attributes like http.method, url.full, server.address, and http.status_code.

## Access Control

Manage access to resources:

```python
# Get current access rules
accessors = client.sources.get_accessors(source.id)

# Add user access
new_accessors = [{
    "type": "USER",
    "email": "user@example.com", 
    "access_roles": ["collaborator"]
}]
client.sources.add_accessors(source.id, new_accessors)

# Add team access
team_accessor = [{
    "type": "TEAM",
    "id": team_id,
    "access_roles": ["operator"]
}]
client.sources.add_accessors(source.id, team_accessor)

# Replace all accessors
client.sources.replace_accessors(source.id, new_accessors)

Organizations and Teams

# List organizations
orgs = client.organizations.list()

# Get organization members
members = client.organizations.get_members(org.id)

# Update organization members
from nexla_sdk.models.organizations.requests import OrgMemberList
member_list = OrgMemberList(members=[
    {"email": "new@example.com", "access_role": "admin"}
])
client.organizations.update_members(org.id, member_list)

# Create a team
team_data = {
    "name": "Data Team",
    "description": "Team for data operations"
}
team = client.teams.create(team_data)

# Add team members
from nexla_sdk.models.teams.requests import TeamMemberList
team_members = TeamMemberList(members=[
    {"email": "lead@example.com", "admin": True},
    {"email": "member@example.com", "admin": False}
])
client.teams.add_members(team.id, team_members)

Projects

# Create a project
project_data = {
    "name": "My Project",
    "description": "Data integration project"
}
project = client.projects.create(project_data)

# Add flows to project
from nexla_sdk.models.projects.requests import ProjectFlowList

# Option 1: Provide flow node IDs directly
flow_list = ProjectFlowList(flows=[flow_id])  # replace with actual flow node IDs
client.projects.add_flows(project.id, flow_list)

# Option 2: Provide resource identifiers (data flow edges)
# from nexla_sdk.models.projects.requests import ProjectFlowIdentifier
# flow_list = ProjectFlowList(data_flows=[ProjectFlowIdentifier(data_set_id=nexset.id)])
# client.projects.add_flows(project.id, flow_list)

# Get project flows
project_flows = client.projects.get_flows(project.id)

Notifications

# List unread notifications
notifications = client.notifications.list(read=0)

# Mark notification as read
client.notifications.mark_read([notification.id])

# Mark all notifications as read
client.notifications.mark_read("all")

# Get notification types
notification_types = client.notifications.get_types()

# Create notification setting
setting_data = {
    "notification_type_id": notification_type.id,
    "notification_channel_setting_id": channel_setting.id,
    "status": "ACTIVE"
}
setting = client.notifications.create_setting(setting_data)

Metrics and Monitoring

# Get daily metrics for a resource
metrics = client.metrics.get_resource_daily_metrics(
    resource_type="data_sources",
    resource_id=source.id,
    from_date="2024-01-01",
    to_date="2024-01-31"
)

# Get metrics by run
run_metrics = client.metrics.get_resource_metrics_by_run(
    resource_type="data_sources",
    resource_id=source.id,
    groupby="runId"
)

# Get rate limits
limits = client.metrics.get_rate_limits()
print(f"Rate limit: {limits}")

Advanced Features

Lookups (Data Maps)

# Create a lookup table
lookup_data = {
    "name": "Product Mapping",
    "data_type": "string",
    "map_primary_key": "sku",
    "data_map": [
        {"sku": "ABC123", "name": "Product A", "price": 99.99},
        {"sku": "XYZ789", "name": "Product B", "price": 149.99}
    ]
}
lookup = client.lookups.create(lookup_data)

# Get lookup entries
entries = client.lookups.get_entries(lookup.id, "ABC*")

# Upsert lookup entries
new_entries = [
    {"sku": "DEF456", "name": "Product C", "price": 199.99}
]
client.lookups.upsert_entries(lookup.id, new_entries)

User Management

# Get user settings for the current authenticated user
settings = client.users.get_settings()

# Get user dashboard metrics
dashboard_metrics = client.users.get_dashboard_metrics(user_id)  # replace with a valid user_id (int)

# Update quarantine settings for a user
quarantine_config = {
    "cron_schedule": "0 0 * * *",
    "path": "/quarantine/exports/"
}
client.users.create_quarantine_settings(
    user_id=user_id,  # replace with a valid user_id (int)
    data_credentials_id=credential.id,
    config=quarantine_config
)

Additional Example Features

The SDK examples cover advanced operations such as:

  • Audit logging and compliance tracking
  • User and team permissions management
  • Data transformation and schema handling
  • Metrics collection and monitoring
  • Notification systems integration
  • Quarantine settings for data validation

See the examples/api/ directory for detailed examples of these operations.

New Resources and Usage

The SDK includes additional helpers beyond core flows/sources/sinks:

Code Containers, Transforms, Attribute Transforms

# List code containers
containers = client.code_containers.list()

# Create a reusable record transform (aliased via /transforms)
from nexla_sdk.models.transforms import TransformCreate, TransformCodeOp
create_payload = TransformCreate(
    name="Uppercase Names",
    output_type="record",
    reusable=True,
    code_type="jolt_custom",
    code_encoding="none",
    code=[TransformCodeOp(operation="nexla.custom", spec={
        "language": "python",
        "encoding": "base64",
        "script": "ZGVmIHRyYW5zZm9ybShpbnB1dCwgbWV0YWRhdGEsIGFyZ3MpOiByZXR1cm4gaW5wdXQ=",
    })],
)
transform = client.transforms.create(create_payload)

# Attribute transforms
attr_transforms = client.attribute_transforms.list()

Async Tasks

from nexla_sdk.models.async_tasks import AsyncTaskCreate

# Start an async task
task = client.async_tasks.create(AsyncTaskCreate(type="EXPORT_DATA", args={"data_set_id": 123}))

# Poll status
status = client.async_tasks.get(task.id)

# Fetch result (if available)
result = client.async_tasks.result(task.id)

# Download artifact link (may return str or DownloadLink)
link = client.async_tasks.download_link(task.id)

Approval Requests

pending = client.approval_requests.list_pending()
if pending:
    approved = client.approval_requests.approve(pending[0].id)

Runtimes

from nexla_sdk.models.runtimes import RuntimeCreate

rt = client.runtimes.create(RuntimeCreate(name="python-3-11", language="python", version="3.11"))
client.runtimes.activate(rt.id)
client.runtimes.pause(rt.id)

Marketplace

domains = client.marketplace.list_domains()
if domains:
    items = client.marketplace.list_domain_items(domains[0].id)

Org Auth Configs

auth_configs = client.org_auth_configs.list()
if auth_configs:
    cfg = client.org_auth_configs.get(auth_configs[0].id)

GenAI

configs = client.genai.list_configs()
active = client.genai.show_active_config(gen_ai_usage="rag")

Self Signup (Admin)

requests = client.self_signup.list_requests()
blocked = client.self_signup.list_blocked_domains()

Doc Containers and Data Schemas

doc_audit = client.doc_containers.get_audit_log(doc_container_id=1001)
schema_audit = client.data_schemas.get_audit_log(schema_id=5001)

Coverage Matrix

Mapping of major OpenAPI areas to SDK resources. All requests set Accept: application/vnd.nexla.api.v1+json and default base URL https://dataops.nexla.io/nexla-api.

  • Session Management
    • Login/Logout: handled by client auth; NexlaClient.logout() ends session
  • Flows: client.flows — list/get/get_by_resource/activate/pause/copy/delete; docs_recommendation; get_logs; get_metrics
  • Sources: client.sources — CRUD/activate/pause/copy
  • Destinations (Data Sinks): client.destinations — CRUD/activate/pause/copy
  • Nexsets (Data Sets): client.nexsets — CRUD/activate/pause/samples/copy/docs_recommendation
  • Credentials: client.credentials — CRUD/probe/probe_tree/probe_sample (async/request_id)
  • Data Maps (Lookups): client.lookups — CRUD; entries get/upsert/delete
  • Users: client.users — CRUD/settings/quarantine/metrics/audit_log/transfer
  • Organizations: client.organizations — CRUD/members/account metrics/audit log/auth settings/custodians
  • Teams: client.teams — CRUD/members
  • Projects: client.projects — CRUD/flows add/replace/remove/search/get
  • Notifications: client.notifications — list/delete/count/mark read/unread; channel/settings CRUD
  • Metrics: client.metrics — resource daily/by-run; flow logs/metrics; rate limits
  • Code Containers: client.code_containers — CRUD/copy/public list (accessors/audit via BaseResource)
  • Transforms: client.transforms — CRUD/copy/public list
  • Attribute Transforms: client.attribute_transforms — CRUD/public list
  • Async Tasks: client.async_tasks — list/create/get/delete/rerun/result/download_link/types/explain_arguments
  • Approval Requests: client.approval_requests — list_pending/list_requested/approve/reject
  • Runtimes: client.runtimes — CRUD/activate/pause
  • Marketplace: client.marketplace — domains CRUD; items list/create; custodians add/update/remove
  • Org Auth Configs: client.org_auth_configs — list/all/get/create/update/delete
  • GenAI Configurations/Org Settings: client.genai — configs CRUD; org settings CRUD; active_config
  • Doc Containers: client.doc_containers — audit_log; (access control via BaseResource helpers)
  • Data Schemas: client.data_schemas — audit_log; (access control via BaseResource helpers)
  • Webhooks: not included as a dedicated helper yet (use direct HTTP with API key per spec)

Error Handling

The SDK provides specific exception types:

from nexla_sdk.exceptions import (
    NexlaError,
    AuthenticationError,
    NotFoundError,
    ValidationError,
    RateLimitError
)

try:
    source = client.sources.get(999999)
except NotFoundError as e:
    print(f"Source not found: {e}")
except AuthenticationError as e:
    print(f"Auth failed: {e}")
except RateLimitError as e:
    print(f"Rate limited: {e}")

Resource Path Mappings

The SDK uses the following API path mappings:

  • Credentials/data_credentials
  • Sources/data_sources
  • Destinations/data_sinks
  • Nexsets/data_sets
  • Lookups/data_maps
  • Flows/flows
  • Users/users
  • Organizations/orgs
  • Teams/teams
  • Projects/projects
  • Notifications/notifications
  • Metrics → Various endpoints

Common Resource Methods

All resources support these base methods:

  • list() - List all resources
  • get(id) - Get specific resource by ID
  • create(data) - Create new resource
  • update(id, data) - Update existing resource
  • delete(id) - Delete resource
  • activate(id) - Activate resource
  • pause(id) - Pause resource
  • copy(id, options) - Copy resource
  • paginate() - Get paginated results
  • get_accessors(id) - Get access control rules
  • add_accessors(id, accessors) - Add access control rules
  • get_audit_log(id) - Get audit log entries
  • replace_accessors(id, accessors) - Replace access control rules
  • delete_accessors(id, accessors=None) - Delete some or all access control rules

Development

Running Tests

# Install development dependencies
pip install -e ".[dev]"

# Run unit tests
pytest tests/

# Run integration tests (requires API credentials)
export NEXLA_SERVICE_KEY="your_service_key"
export NEXLA_API_URL="https://your-nexla-instance.com/nexla-api"
pytest tests/integration/

Setting Up Environment

# Create .env file
cat > .env << EOF
NEXLA_SERVICE_KEY=your_service_key
NEXLA_API_URL=https://dataops.nexla.io/nexla-api
EOF

Releasing

Versions are derived from Git tags via setuptools_scm (no manual version bumps). To release:

  1. Create a GitHub Release with tag v<major>.<minor>.<patch> (e.g. v1.1.0) targeting main.
  2. The release.yml workflow automatically builds and publishes to PyPI via trusted publishing.

See CONTRIBUTING.md for the full release process, pre-release checklist, and troubleshooting.

License

This project is licensed under the terms of the MIT license.

Support

For API documentation and support:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nexla_sdk-1.0.7.tar.gz (581.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nexla_sdk-1.0.7-py3-none-any.whl (111.4 kB view details)

Uploaded Python 3

File details

Details for the file nexla_sdk-1.0.7.tar.gz.

File metadata

  • Download URL: nexla_sdk-1.0.7.tar.gz
  • Upload date:
  • Size: 581.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for nexla_sdk-1.0.7.tar.gz
Algorithm Hash digest
SHA256 36d0496ed459c45d659cafb31a971373339987d213819fa1a0059e0fa9e71800
MD5 edc270dd5f3a821746f6989a3ea9da10
BLAKE2b-256 c3ae495458e12992ad761948fb32df2a6d5ad6ab30294e688de569d12f5209e2

See more details on using hashes here.

Provenance

The following attestation bundles were made for nexla_sdk-1.0.7.tar.gz:

Publisher: release.yml on nexla-opensource/nexla-sdk-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file nexla_sdk-1.0.7-py3-none-any.whl.

File metadata

  • Download URL: nexla_sdk-1.0.7-py3-none-any.whl
  • Upload date:
  • Size: 111.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for nexla_sdk-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 6fc3104331d87afa3583a61a6dc392d3254a910ca9cedf6c80f5f7be1cd9a05f
MD5 606cabb7fba58662bcbf9fc9801bd91b
BLAKE2b-256 20c3c1fc6e3a3a1f598c69a5e7e14228c99657d8c56aa84d28242105e97438c6

See more details on using hashes here.

Provenance

The following attestation bundles were made for nexla_sdk-1.0.7-py3-none-any.whl:

Publisher: release.yml on nexla-opensource/nexla-sdk-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page