Python client for the Netrias harmonization API
Project description
Netrias Client
A Python client for the Netrias discovery and harmonization services. Transform tabular datasets such as CSV, TSV, and XLSX files to conform to standard data models (e.g., CCDI) with AI-powered column mapping.
Installation
With uv (recommended)
curl -LsSf https://astral.sh/uv/install.sh | sh # install uv once
uv add netrias_client
With pip
python -m pip install netrias_client
API Reference
NetriasClient(api_key)
Create a new client instance with your API key. The client is ready to use immediately with default settings.
from netrias_client import NetriasClient
# Provide your API key securely (e.g., from a secrets manager, environment variable, or config file)
client = NetriasClient(api_key="your-api-key")
| Parameter | Type | Description |
|---|---|---|
api_key |
str |
Required. Bearer token for Netrias API authentication. Store securely and never commit to version control. |
configure(...)
Optionally adjust settings after initialization. All parameters are optional.
client.configure(
timeout=1200.0, # Optional: request timeout in seconds (default: 20 minutes)
log_level="INFO", # Optional: CRITICAL, ERROR, WARNING, INFO, DEBUG
log_directory=Path("./logs"), # Optional: directory for log files
)
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout |
float | None |
1200.0 |
Request timeout in seconds (default: 20 minutes). |
log_level |
str | None |
"INFO" |
Logging verbosity: "CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG". |
log_directory |
Path | str | None |
None |
Directory for per-client log files. When omitted, logs go to stdout only. |
discovery_url |
str | None |
Production URL | Override discovery API URL (for staging/testing). |
harmonization_url |
str | None |
Production URL | Override harmonization API URL (for staging/testing). |
data_model_store_url |
str | None |
Production URL | Override Data Model Store API URL (for staging/testing). |
Note: Calling configure() with partial parameters preserves previously-set values. Only the parameters you specify are updated.
Discovery Methods
Discover how source columns map to target schema CDEs using AI recommendations.
Tabular files and column identity
CSV, TSV, and XLSX are file formats at the SDK boundary. Inside the client, data is represented as a positional tabular dataset:
from pathlib import Path
from netrias_client import read_tabular
dataset = read_tabular(Path("data/patients.tsv"))
print(dataset.source_format) # TabularFormat.TSV
print(dataset.headers) # display headers, duplicates preserved
print(dataset.columns[0].key) # "col_0000"
print(dataset.rows[0]) # first data row as positional cells
For XLSX workbooks, select one worksheet at the boundary:
from netrias_client import list_workbook_sheets, read_tabular
sheets = list_workbook_sheets(Path("data/patients.xlsx"))
dataset = read_tabular(Path("data/patients.xlsx"), sheet_name=sheets[0].name)
Duplicate headers are allowed. The stable column key (col_0000, col_0001, ...) is the identity; the header text is only the display label. This prevents data loss from duplicate column names and keeps file formats from forcing CSV-shaped assumptions into the rest of the client.
At the CDE recommendation boundary, the client sends display headers as
column_name and relies on the ordered response to map results back to stable
column keys. For this API version, request column columns[N] must map to
response result results[N]; column_name is display metadata and may be
blank. This keeps matching semantic while preserving duplicate and blank
headers locally.
Supported tabular formats are exposed in code:
from netrias_client import SUPPORTED_TABULAR_FORMATS, SUPPORTED_TABULAR_SUFFIXES, TabularFormat
assert tuple(SUPPORTED_TABULAR_FORMATS) == (TabularFormat.CSV, TabularFormat.TSV, TabularFormat.XLSX)
assert set(SUPPORTED_TABULAR_SUFFIXES) == {".csv", ".tsv", ".xlsx"}
discover_mapping_from_tabular(...)
Reads a supported tabular file, samples values, and returns a manifest keyed by stable source column keys.
manifest = client.discover_mapping_from_tabular(
source_path=Path("data/patients.xlsx"),
target_schema="ccdi",
target_version="latest",
sheet_name="Patients", # optional; XLSX defaults to the first sheet
sample_limit=25,
top_k=3,
confidence_threshold=0.8,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
source_path |
Path |
- | Required. Path to a supported tabular file (.csv, .tsv, or .xlsx). |
target_schema |
str |
- | Required. Target schema key. |
target_version |
str |
"latest" |
Schema version to target. |
sheet_name |
str | None |
None |
Worksheet to read for XLSX input. Defaults to the first sheet. |
sample_limit |
int |
25 |
Maximum rows to sample for discovery. |
top_k |
int |
3 |
Number of top recommendations to return per column. |
confidence_threshold |
float | None |
0.8 |
Minimum confidence score (0-1) for keeping recommendations. |
Returns: ColumnKeyedManifestPayload — A dictionary suitable for passing to harmonize().
{
"column_mappings": {
"col_0000": {
"column_name": "name",
"cde_key": "participant_name",
"cde_id": 101,
"harmonization": "harmonizable",
"alternatives": [
{"target": "participant_name", "confidence": 0.95, "harmonization": "harmonizable", "cde_id": 101}
],
},
"col_0001": {
"column_name": "name",
"cde_key": "family_name",
"cde_id": 102,
"harmonization": "harmonizable",
"alternatives": [
{"target": "family_name", "confidence": 0.89, "harmonization": "harmonizable", "cde_id": 102}
],
},
}
}
Harmonization Methods
Transform source tabular data using the discovered column mappings.
harmonize(...)
Execute the harmonization workflow: submit job, poll for completion, download result.
result = client.harmonize(
source_path=Path("data/patients.xlsx"),
manifest=manifest, # from discover_*
data_commons_key="GC", # target data commons
sheet_name="Patients", # optional; XLSX defaults to the first sheet
output_path=Path("output/harmonized.xlsx"), # optional
manifest_output_path=Path("output/manifest.json"), # optional
use_cache=True, # optional; set False to bypass cached runs
)
print(result.status) # "succeeded", "failed", or "timeout"
print(result.file_path) # Path to the harmonized file
print(result.description) # Human-readable status message
print(result.job_id) # API job id for tracking
| Parameter | Type | Default | Description |
|---|---|---|---|
source_path |
Path |
- | Required. Path to the source tabular file (.csv, .tsv, or .xlsx). |
manifest |
Path | Mapping[str, object] |
- | Required. Mapping manifest (from discovery) or path to a JSON manifest file. |
data_commons_key |
str |
- | Required. Target data commons identifier (e.g., "GC"). |
output_path |
Path | None |
None |
Where to write the harmonized file. Auto-generated with the same suffix as the source, such as source.harmonized.tsv for TSV input. |
manifest_output_path |
Path | None |
None |
Where to write the manifest JSON for debugging. |
sheet_name |
str | None |
None |
Worksheet to read and update for XLSX input. Defaults to the first sheet. |
use_cache |
bool |
True |
When False, asks the service to bypass cached harmonization results. |
Returns: HarmonizationResult with these fields:
| Field | Type | Description |
|---|---|---|
file_path |
Path |
Path to the output file. |
status |
"succeeded" | "failed" | "timeout" |
Job outcome. |
description |
str |
Human-readable status message. |
job_id |
str | None |
API job identifier, when submission succeeded. |
mapping_id |
str | None |
Internal mapping identifier (if available). |
manifest_path |
Path | None |
Path to the downloaded manifest parquet file (if available). The SDK derives this path from the harmonized output path and versions it rather than overwriting existing files. |
Data Model Store Methods
Query reference data for validation: available data models, CDEs, and permissible values.
list_data_models(...)
Fetch available data models (data commons).
models = client.list_data_models(
query="ccdi", # optional: search by key/name
include_versions=True, # optional: include version metadata
include_counts=True, # optional: include CDE/PV counts
limit=100, # optional: max results
offset=0, # optional: skip N results
)
for model in models:
print(f"{model.key}: {model.name}")
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str | None |
None |
Substring search on model key or name. |
include_versions |
bool |
False |
Include version metadata per model. |
include_counts |
bool |
False |
Include CDE/PV counts per version. |
limit |
int | None |
None |
Maximum number of results. |
offset |
int |
0 |
Number of results to skip (for pagination). |
Returns: tuple[DataModel, ...] where each DataModel has:
| Field | Type | Description |
|---|---|---|
data_commons_id |
int |
Internal ID. |
key |
str |
Model key (e.g., "ccdi"). |
name |
str |
Display name. |
description |
str | None |
Optional description. |
is_active |
bool |
Whether the model is active. |
Example:
(
DataModel(data_commons_id=1, key="ccdi", name="CCDI", description="Childhood Cancer Data Initiative", is_active=True),
DataModel(data_commons_id=2, key="gc", name="Genomic Commons", description=None, is_active=True),
)
list_cdes(...)
Fetch Common Data Elements for a specific model version.
cdes = client.list_cdes(
model_key="ccdi",
version="v1",
include_description=True, # optional
query="diagnosis", # optional: search by cde_key
limit=100, # optional
offset=0, # optional
)
for cde in cdes:
print(f"{cde.cde_key}: {cde.description}")
| Parameter | Type | Default | Description |
|---|---|---|---|
model_key |
str |
— | Required. Data model key (e.g., "ccdi"). |
version |
str |
— | Required. Version label (e.g., "v1"). |
include_description |
bool |
False |
Include CDE descriptions. |
query |
str | None |
None |
Substring search on cde_key. |
limit |
int | None |
None |
Maximum number of results. |
offset |
int |
0 |
Number of results to skip. |
Returns: tuple[CDE, ...] where each CDE has:
| Field | Type | Description |
|---|---|---|
cde_key |
str |
CDE identifier (e.g., "sex_at_birth"). |
cde_id |
int |
Internal CDE ID. |
cde_version_id |
int |
Internal version ID. |
description |
str | None |
Optional description (if include_description=True). |
Example:
(
CDE(cde_key="sex_at_birth", cde_id=12345, cde_version_id=100, description="Biological sex assigned at birth"),
CDE(cde_key="primary_diagnosis", cde_id=12346, cde_version_id=101, description="Primary cancer diagnosis"),
)
list_pvs(...)
Fetch permissible values for a specific CDE.
pvs = client.list_pvs(
model_key="ccdi",
version="v1",
cde_key="sex_at_birth",
include_inactive=False, # optional
query="Male", # optional: search by value
limit=100, # optional
offset=0, # optional
)
for pv in pvs:
print(f"{pv.value} (active={pv.is_active})")
| Parameter | Type | Default | Description |
|---|---|---|---|
model_key |
str |
— | Required. Data model key. |
version |
str |
— | Required. Version label. |
cde_key |
str |
— | Required. CDE key (e.g., "sex_at_birth"). |
include_inactive |
bool |
False |
Include inactive permissible values. |
query |
str | None |
None |
Substring search on PV value. |
limit |
int | None |
None |
Maximum number of results. |
offset |
int |
0 |
Number of results to skip. |
Returns: tuple[PermissibleValue, ...] where each PermissibleValue has:
| Field | Type | Description |
|---|---|---|
pv_id |
int |
Internal PV ID. |
value |
str |
The permissible value string. |
description |
str | None |
Optional description. |
is_active |
bool |
Whether the PV is active. |
Example:
(
PermissibleValue(pv_id=1001, value="Male", description="Male sex at birth", is_active=True),
PermissibleValue(pv_id=1002, value="Female", description="Female sex at birth", is_active=True),
PermissibleValue(pv_id=1003, value="Unknown", description="Sex at birth unknown", is_active=True),
)
get_pv_set(...)
Fetch all permissible values as a frozenset for O(1) membership testing. Auto-paginates to retrieve all values.
pv_set = client.get_pv_set(
model_key="ccdi",
version="v1",
cde_key="sex_at_birth",
include_inactive=False, # optional
)
# O(1) membership testing
if "Male" in pv_set:
print("Valid value!")
| Parameter | Type | Default | Description |
|---|---|---|---|
model_key |
str |
— | Required. Data model key. |
version |
str |
— | Required. Version label. |
cde_key |
str |
— | Required. CDE key. |
include_inactive |
bool |
False |
Include inactive permissible values. |
Returns: frozenset[str] — All permissible value strings for the CDE.
Example:
frozenset({"Male", "Female", "Unknown"})
validate_value(...)
Check if a single value is valid for a CDE. Convenience wrapper around get_pv_set().
is_valid = client.validate_value(
value="Male",
model_key="ccdi",
version="v1",
cde_key="sex_at_birth",
)
# Returns: True
| Parameter | Type | Default | Description |
|---|---|---|---|
value |
str |
— | Required. The value to validate. |
model_key |
str |
— | Required. Data model key. |
version |
str |
— | Required. Version label. |
cde_key |
str |
— | Required. CDE key. |
Returns: bool — True if the value is in the CDE's permissible values.
Note: This method makes a network call on each invocation. For validating multiple values against the same CDE, call
get_pv_set()once and reuse the returnedfrozenset.
Async Support
All methods have async variants with the _async suffix. Use these when running in an async context (FastAPI, aiohttp, etc.):
import asyncio
from pathlib import Path
from netrias_client import NetriasClient
client = NetriasClient(api_key="your-api-key")
# Sync usage (scripts, Jupyter notebooks)
manifest = client.discover_mapping_from_tabular(
source_path=Path("data/patients.tsv"),
target_schema="ccdi",
)
result = client.harmonize(source_path=Path("data/patients.tsv"), manifest=manifest, data_commons_key="GC")
# Async usage (FastAPI, async frameworks)
async def process_file():
manifest = await client.discover_mapping_from_tabular_async(
source_path=Path("data/patients.tsv"),
target_schema="ccdi",
)
result = await client.harmonize_async(
source_path=Path("data/patients.tsv"),
manifest=manifest,
data_commons_key="GC",
)
return result
| Sync Method | Async Method |
|---|---|
discover_mapping_from_tabular() |
discover_mapping_from_tabular_async() |
harmonize() |
harmonize_async() |
list_data_models() |
list_data_models_async() |
list_cdes() |
list_cdes_async() |
list_pvs() |
list_pvs_async() |
get_pv_set() |
get_pv_set_async() |
validate_value() |
validate_value_async() |
Sync methods work correctly in Jupyter notebooks and async web frameworks without event loop conflicts.
Error Handling
The client raises typed exceptions that inherit from NetriasClientError:
| Exception | When Raised |
|---|---|
ClientConfigurationError |
Invalid configuration or configure() not called. |
FileValidationError |
Source file doesn't exist or is invalid. |
MappingDiscoveryError |
Discovery API returned a client error (4xx), or the response violated the position-indexed contract (length mismatch, reordered columns, missing/invalid harmonization). |
MappingValidationError |
Manifest validation failed (missing keys, wrong value types, or a tabular file with no header row). |
OutputLocationError |
Cannot write to the output path. |
NetriasAPIUnavailable |
Network error, timeout, or server error (5xx). |
HarmonizationJobError |
Harmonization job failed or timed out. |
DataModelStoreError |
Data Model Store API returned a client error (4xx). |
from netrias_client import NetriasClient, NetriasClientError, NetriasAPIUnavailable
try:
result = client.harmonize(source_path=csv_path, manifest=manifest, data_commons_key="GC")
except NetriasAPIUnavailable as e:
print(f"Service unavailable: {e}")
except NetriasClientError as e:
print(f"Client error: {e}")
Version
Access the installed package version:
from netrias_client import __version__
print(__version__) # e.g., "0.3.0"
Logging Configuration
The client uses the netrias_client logger namespace. Configure logging externally before creating a client:
import logging
# Set log level for all client instances
logging.getLogger("netrias_client").setLevel(logging.WARNING)
# Or add a custom handler
handler = logging.FileHandler("netrias.log")
handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
logging.getLogger("netrias_client").addHandler(handler)
# Then create your client
client = NetriasClient(api_key="your-api-key")
The LOGGER_NAMESPACE constant is exported for programmatic access:
from netrias_client import LOGGER_NAMESPACE
logging.getLogger(LOGGER_NAMESPACE).setLevel(logging.DEBUG)
Future Development
Gateway Bypass Removal: The boto3 dependency and discovery_use_gateway_bypass configuration option exist as a temporary workaround for API Gateway timeout limitations during CDE discovery. Once the API Gateway timeout issues are resolved, the direct Lambda bypass will be removed and boto3 will become an optional dependency. This change will be communicated in release notes.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file netrias_client-0.6.1.tar.gz.
File metadata
- Download URL: netrias_client-0.6.1.tar.gz
- Upload date:
- Size: 43.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c32612181a179009e9ad7a339c7949f2b7a944ced504204fca9139bfdee6add9
|
|
| MD5 |
947d0c978e2b8edba79ed421b9c4328d
|
|
| BLAKE2b-256 |
1d024bacf3fe9b44bc07e62502e6a75a17fbcfd014bf9d49dcf75e2883c3f6bb
|
File details
Details for the file netrias_client-0.6.1-py3-none-any.whl.
File metadata
- Download URL: netrias_client-0.6.1-py3-none-any.whl
- Upload date:
- Size: 51.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
72ba2975c00abdf53460d5e2b83b72e0290c6296d7ce4bad67f2facf88be1f0f
|
|
| MD5 |
4f94a169a6afcbf1ed778a5f801f81fd
|
|
| BLAKE2b-256 |
d7c3c54a0b55e7cb5060b38857dc37697dc6f73fb94c59ec296232b0bb0f9ae9
|