Skip to main content

Python client for the Netrias harmonization API

Project description

Netrias Client

A Python client for the Netrias discovery and harmonization services. Transform tabular datasets such as CSV, TSV, and XLSX files to conform to standard data models (e.g., CCDI) with AI-powered column mapping.

Installation

With uv (recommended)

curl -LsSf https://astral.sh/uv/install.sh | sh  # install uv once
uv add netrias_client

With pip

python -m pip install netrias_client

API Reference

NetriasClient(api_key)

Create a new client instance with your API key. The client is ready to use immediately with default settings.

from netrias_client import NetriasClient

# Provide your API key securely (e.g., from a secrets manager, environment variable, or config file)
client = NetriasClient(api_key="your-api-key")
Parameter Type Description
api_key str Required. Bearer token for Netrias API authentication. Store securely and never commit to version control.

configure(...)

Optionally adjust settings after initialization. All parameters are optional.

client.configure(
    timeout=1200.0,                    # Optional: request timeout in seconds (default: 20 minutes)
    log_level="INFO",                  # Optional: CRITICAL, ERROR, WARNING, INFO, DEBUG
    log_directory=Path("./logs"),      # Optional: directory for log files
)
Parameter Type Default Description
timeout float | None 1200.0 Request timeout in seconds (default: 20 minutes).
log_level str | None "INFO" Logging verbosity: "CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG".
log_directory Path | str | None None Directory for per-client log files. When omitted, logs go to stdout only.
discovery_url str | None Production URL Override discovery API URL (for staging/testing).
harmonization_url str | None Production URL Override harmonization API URL (for staging/testing).
data_model_store_url str | None Production URL Override Data Model Store API URL (for staging/testing).

Note: Calling configure() with partial parameters preserves previously-set values. Only the parameters you specify are updated.


Discovery Methods

Discover how source columns map to target schema CDEs using AI recommendations.

Tabular files and column identity

CSV, TSV, and XLSX are file formats at the SDK boundary. Inside the client, data is represented as a positional tabular dataset:

from pathlib import Path

from netrias_client import read_tabular

dataset = read_tabular(Path("data/patients.tsv"))

print(dataset.source_format)          # TabularFormat.TSV
print(dataset.headers)                # display headers, duplicates preserved
print(dataset.columns[0].key)         # "col_0000"
print(dataset.rows[0])                # first data row as positional cells

For XLSX workbooks, select one worksheet at the boundary:

from netrias_client import list_workbook_sheets, read_tabular

sheets = list_workbook_sheets(Path("data/patients.xlsx"))
dataset = read_tabular(Path("data/patients.xlsx"), sheet_name=sheets[0].name)

Duplicate headers are allowed. The stable column key (col_0000, col_0001, ...) is the identity; the header text is only the display label. This prevents data loss from duplicate column names and keeps file formats from forcing CSV-shaped assumptions into the rest of the client.

At the CDE recommendation boundary, the client sends display headers as column_name and relies on the ordered response to map results back to stable column keys. For this API version, request column columns[N] must map to response result results[N]; column_name is display metadata and may be blank. This keeps matching semantic while preserving duplicate and blank headers locally.

Supported tabular formats are exposed in code:

from netrias_client import SUPPORTED_TABULAR_FORMATS, SUPPORTED_TABULAR_SUFFIXES, TabularFormat

assert tuple(SUPPORTED_TABULAR_FORMATS) == (TabularFormat.CSV, TabularFormat.TSV, TabularFormat.XLSX)
assert set(SUPPORTED_TABULAR_SUFFIXES) == {".csv", ".tsv", ".xlsx"}

discover_mapping_from_tabular(...)

Reads a supported tabular file, samples values, and returns a manifest keyed by stable source column keys.

manifest = client.discover_mapping_from_tabular(
    source_path=Path("data/patients.xlsx"),
    target_schema="ccdi",
    target_version="latest",
    sheet_name="Patients",                      # optional; XLSX defaults to the first sheet
    sample_limit=25,
    top_k=3,
    confidence_threshold=0.8,
)
Parameter Type Default Description
source_path Path - Required. Path to a supported tabular file (.csv, .tsv, or .xlsx).
target_schema str - Required. Target schema key.
target_version str "latest" Schema version to target.
sheet_name str | None None Worksheet to read for XLSX input. Defaults to the first sheet.
sample_limit int 25 Maximum rows to sample for discovery.
top_k int 3 Number of top recommendations to return per column.
confidence_threshold float | None 0.8 Minimum confidence score (0-1) for keeping recommendations.

Returns: ColumnKeyedManifestPayload — A dictionary suitable for passing to harmonize().

{
    "column_mappings": {
        "col_0000": {
            "column_name": "name",
            "cde_key": "participant_name",
            "cde_id": 101,
            "harmonization": "harmonizable",
            "alternatives": [
                {"target": "participant_name", "confidence": 0.95, "harmonization": "harmonizable", "cde_id": 101}
            ],
        },
        "col_0001": {
            "column_name": "name",
            "cde_key": "family_name",
            "cde_id": 102,
            "harmonization": "harmonizable",
            "alternatives": [
                {"target": "family_name", "confidence": 0.89, "harmonization": "harmonizable", "cde_id": 102}
            ],
        },
    }
}

Harmonization Methods

Transform source tabular data using the discovered column mappings.

harmonize(...)

Execute the harmonization workflow: submit job, poll for completion, download result.

result = client.harmonize(
    source_path=Path("data/patients.xlsx"),
    manifest=manifest,                           # from discover_*
    data_commons_key="GC",                       # target data commons
    sheet_name="Patients",                       # optional; XLSX defaults to the first sheet
    output_path=Path("output/harmonized.xlsx"),  # optional
    manifest_output_path=Path("output/manifest.json"),  # optional
    use_cache=True,                              # optional; set False to bypass cached runs
)

print(result.status)       # "succeeded", "failed", or "timeout"
print(result.file_path)    # Path to the harmonized file
print(result.description)  # Human-readable status message
print(result.job_id)       # API job id for tracking
Parameter Type Default Description
source_path Path - Required. Path to the source tabular file (.csv, .tsv, or .xlsx).
manifest Path | Mapping[str, object] - Required. Mapping manifest (from discovery) or path to a JSON manifest file.
data_commons_key str - Required. Target data commons identifier (e.g., "GC").
output_path Path | None None Where to write the harmonized file. Auto-generated with the same suffix as the source, such as source.harmonized.tsv for TSV input.
manifest_output_path Path | None None Where to write the manifest JSON for debugging.
sheet_name str | None None Worksheet to read and update for XLSX input. Defaults to the first sheet.
use_cache bool True When False, asks the service to bypass cached harmonization results.

Returns: HarmonizationResult with these fields:

Field Type Description
file_path Path Path to the output file.
status "succeeded" | "failed" | "timeout" Job outcome.
description str Human-readable status message.
job_id str | None API job identifier, when submission succeeded.
mapping_id str | None Internal mapping identifier (if available).
manifest_path Path | None Path to the downloaded manifest parquet file (if available). The SDK derives this path from the harmonized output path and versions it rather than overwriting existing files.

Data Model Store Methods

Query reference data for validation: available data models, CDEs, and permissible values.

list_data_models(...)

Fetch available data models (data commons).

models = client.list_data_models(
    query="ccdi",              # optional: search by key/name
    include_versions=True,     # optional: include version metadata
    include_counts=True,       # optional: include CDE/PV counts
    limit=100,                 # optional: max results
    offset=0,                  # optional: skip N results
)

for model in models:
    print(f"{model.key}: {model.name}")
Parameter Type Default Description
query str | None None Substring search on model key or name.
include_versions bool False Include version metadata per model.
include_counts bool False Include CDE/PV counts per version.
limit int | None None Maximum number of results.
offset int 0 Number of results to skip (for pagination).

Returns: tuple[DataModel, ...] where each DataModel has:

Field Type Description
data_commons_id int Internal ID.
key str Model key (e.g., "ccdi").
name str Display name.
description str | None Optional description.
is_active bool Whether the model is active.

Example:

(
    DataModel(data_commons_id=1, key="ccdi", name="CCDI", description="Childhood Cancer Data Initiative", is_active=True),
    DataModel(data_commons_id=2, key="gc", name="Genomic Commons", description=None, is_active=True),
)

list_cdes(...)

Fetch Common Data Elements for a specific model version.

cdes = client.list_cdes(
    model_key="ccdi",
    version="v1",
    include_description=True,  # optional
    query="diagnosis",         # optional: search by cde_key
    limit=100,                 # optional
    offset=0,                  # optional
)

for cde in cdes:
    print(f"{cde.cde_key}: {cde.description}")
Parameter Type Default Description
model_key str Required. Data model key (e.g., "ccdi").
version str Required. Version label (e.g., "v1").
include_description bool False Include CDE descriptions.
query str | None None Substring search on cde_key.
limit int | None None Maximum number of results.
offset int 0 Number of results to skip.

Returns: tuple[CDE, ...] where each CDE has:

Field Type Description
cde_key str CDE identifier (e.g., "sex_at_birth").
cde_id int Internal CDE ID.
cde_version_id int Internal version ID.
description str | None Optional description (if include_description=True).

Example:

(
    CDE(cde_key="sex_at_birth", cde_id=12345, cde_version_id=100, description="Biological sex assigned at birth"),
    CDE(cde_key="primary_diagnosis", cde_id=12346, cde_version_id=101, description="Primary cancer diagnosis"),
)

list_pvs(...)

Fetch permissible values for a specific CDE.

pvs = client.list_pvs(
    model_key="ccdi",
    version="v1",
    cde_key="sex_at_birth",
    include_inactive=False,    # optional
    query="Male",              # optional: search by value
    limit=100,                 # optional
    offset=0,                  # optional
)

for pv in pvs:
    print(f"{pv.value} (active={pv.is_active})")
Parameter Type Default Description
model_key str Required. Data model key.
version str Required. Version label.
cde_key str Required. CDE key (e.g., "sex_at_birth").
include_inactive bool False Include inactive permissible values.
query str | None None Substring search on PV value.
limit int | None None Maximum number of results.
offset int 0 Number of results to skip.

Returns: tuple[PermissibleValue, ...] where each PermissibleValue has:

Field Type Description
pv_id int Internal PV ID.
value str The permissible value string.
description str | None Optional description.
is_active bool Whether the PV is active.

Example:

(
    PermissibleValue(pv_id=1001, value="Male", description="Male sex at birth", is_active=True),
    PermissibleValue(pv_id=1002, value="Female", description="Female sex at birth", is_active=True),
    PermissibleValue(pv_id=1003, value="Unknown", description="Sex at birth unknown", is_active=True),
)

get_pv_set(...)

Fetch all permissible values as a frozenset for O(1) membership testing. Auto-paginates to retrieve all values.

pv_set = client.get_pv_set(
    model_key="ccdi",
    version="v1",
    cde_key="sex_at_birth",
    include_inactive=False,  # optional
)

# O(1) membership testing
if "Male" in pv_set:
    print("Valid value!")
Parameter Type Default Description
model_key str Required. Data model key.
version str Required. Version label.
cde_key str Required. CDE key.
include_inactive bool False Include inactive permissible values.

Returns: frozenset[str] — All permissible value strings for the CDE.

Example:

frozenset({"Male", "Female", "Unknown"})

validate_value(...)

Check if a single value is valid for a CDE. Convenience wrapper around get_pv_set().

is_valid = client.validate_value(
    value="Male",
    model_key="ccdi",
    version="v1",
    cde_key="sex_at_birth",
)
# Returns: True
Parameter Type Default Description
value str Required. The value to validate.
model_key str Required. Data model key.
version str Required. Version label.
cde_key str Required. CDE key.

Returns: boolTrue if the value is in the CDE's permissible values.

Note: This method makes a network call on each invocation. For validating multiple values against the same CDE, call get_pv_set() once and reuse the returned frozenset.


Async Support

All methods have async variants with the _async suffix. Use these when running in an async context (FastAPI, aiohttp, etc.):

import asyncio
from pathlib import Path
from netrias_client import NetriasClient

client = NetriasClient(api_key="your-api-key")

# Sync usage (scripts, Jupyter notebooks)
manifest = client.discover_mapping_from_tabular(
    source_path=Path("data/patients.tsv"),
    target_schema="ccdi",
)
result = client.harmonize(source_path=Path("data/patients.tsv"), manifest=manifest, data_commons_key="GC")

# Async usage (FastAPI, async frameworks)
async def process_file():
    manifest = await client.discover_mapping_from_tabular_async(
        source_path=Path("data/patients.tsv"),
        target_schema="ccdi",
    )
    result = await client.harmonize_async(
        source_path=Path("data/patients.tsv"),
        manifest=manifest,
        data_commons_key="GC",
    )
    return result
Sync Method Async Method
discover_mapping_from_tabular() discover_mapping_from_tabular_async()
harmonize() harmonize_async()
list_data_models() list_data_models_async()
list_cdes() list_cdes_async()
list_pvs() list_pvs_async()
get_pv_set() get_pv_set_async()
validate_value() validate_value_async()

Sync methods work correctly in Jupyter notebooks and async web frameworks without event loop conflicts.


Error Handling

The client raises typed exceptions that inherit from NetriasClientError:

Exception When Raised
ClientConfigurationError Invalid configuration or configure() not called.
FileValidationError Source file doesn't exist or is invalid.
MappingDiscoveryError Discovery API returned a client error (4xx), or the response violated the position-indexed contract (length mismatch, reordered columns, missing/invalid harmonization).
MappingValidationError Manifest validation failed (missing keys, wrong value types, or a tabular file with no header row).
OutputLocationError Cannot write to the output path.
NetriasAPIUnavailable Network error, timeout, or server error (5xx).
HarmonizationJobError Harmonization job failed or timed out.
DataModelStoreError Data Model Store API returned a client error (4xx).
from netrias_client import NetriasClient, NetriasClientError, NetriasAPIUnavailable

try:
    result = client.harmonize(source_path=csv_path, manifest=manifest, data_commons_key="GC")
except NetriasAPIUnavailable as e:
    print(f"Service unavailable: {e}")
except NetriasClientError as e:
    print(f"Client error: {e}")

Version

Access the installed package version:

from netrias_client import __version__
print(__version__)  # e.g., "0.3.0"

Logging Configuration

The client uses the netrias_client logger namespace. Configure logging externally before creating a client:

import logging

# Set log level for all client instances
logging.getLogger("netrias_client").setLevel(logging.WARNING)

# Or add a custom handler
handler = logging.FileHandler("netrias.log")
handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
logging.getLogger("netrias_client").addHandler(handler)

# Then create your client
client = NetriasClient(api_key="your-api-key")

The LOGGER_NAMESPACE constant is exported for programmatic access:

from netrias_client import LOGGER_NAMESPACE
logging.getLogger(LOGGER_NAMESPACE).setLevel(logging.DEBUG)

Future Development

Gateway Bypass Removal: The boto3 dependency and discovery_use_gateway_bypass configuration option exist as a temporary workaround for API Gateway timeout limitations during CDE discovery. Once the API Gateway timeout issues are resolved, the direct Lambda bypass will be removed and boto3 will become an optional dependency. This change will be communicated in release notes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

netrias_client-0.6.1.tar.gz (43.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

netrias_client-0.6.1-py3-none-any.whl (51.8 kB view details)

Uploaded Python 3

File details

Details for the file netrias_client-0.6.1.tar.gz.

File metadata

  • Download URL: netrias_client-0.6.1.tar.gz
  • Upload date:
  • Size: 43.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for netrias_client-0.6.1.tar.gz
Algorithm Hash digest
SHA256 c32612181a179009e9ad7a339c7949f2b7a944ced504204fca9139bfdee6add9
MD5 947d0c978e2b8edba79ed421b9c4328d
BLAKE2b-256 1d024bacf3fe9b44bc07e62502e6a75a17fbcfd014bf9d49dcf75e2883c3f6bb

See more details on using hashes here.

File details

Details for the file netrias_client-0.6.1-py3-none-any.whl.

File metadata

  • Download URL: netrias_client-0.6.1-py3-none-any.whl
  • Upload date:
  • Size: 51.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for netrias_client-0.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 72ba2975c00abdf53460d5e2b83b72e0290c6296d7ce4bad67f2facf88be1f0f
MD5 4f94a169a6afcbf1ed778a5f801f81fd
BLAKE2b-256 d7c3c54a0b55e7cb5060b38857dc37697dc6f73fb94c59ec296232b0bb0f9ae9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page