Skip to main content

Python client for the BioMapper2 API — map biological entities to standardized knowledge graph identifiers

Project description

biomapper

Python client for the BioMapper2 API — map biological entity names to standardized knowledge-graph identifiers (CHEBI, HMDB, PubChem, RefMet, and more).

from biomapper import map_entity

result = map_entity("L-Histidine")
print(result.primary_curie)     # RM:0129894
print(result.confidence_tier)   # high
print(result.ids_for("CHEBI"))  # ['15971']
print(result.equivalent_ids_for("HMDB"))  # ['HMDB0000177']

Installation

# Core (async HTTP client + Pydantic models)
pip install biomapper

Getting an API key

The BioMapper2 API requires an API key. To request access, email trent.leslie@phenomehealth.org.

Once you have a key, set it in your environment:

export BIOMAPPER_API_KEY=your-key-here

Or add it to a .env file in your project root:

BIOMAPPER_API_KEY=your-key-here

biomapper will pick it up automatically from either location.


Quick start

Single lookup (synchronous)

from biomapper import map_entity

result = map_entity("L-Histidine")

print(result.resolved)          # True
print(result.primary_curie)     # RM:0129894
print(result.chosen_kg_id)      # CHEBI:15971
print(result.confidence_score)  # 2.489
print(result.confidence_tier)   # high  (≥2.0)
print(result.ids_for("CHEBI"))  # ['15971']
print(result.ids_for("refmet_id"))  # ['RM0129894']

# KG equivalent IDs — all identifiers from the resolved knowledge graph node
print(result.kg_equivalent_ids)            # {'CHEBI': ['15971', '44637'], 'HMDB': ['HMDB0000177'], ...}
print(result.equivalent_ids_for("HMDB"))   # ['HMDB0000177']

Batch mapping (synchronous)

from biomapper import map_entities, summarize

records = [
    {"name": "L-Histidine"},
    {"name": "Glucose", "identifiers": {"HMDB": "HMDB00122"}},
    {"name": "Sphinganine"},
]

results = map_entities(records, progress=True)  # tqdm bar with [notebook]
summary = summarize(results)

print(f"{summary.resolved}/{summary.total_queried} resolved")
print(f"Resolution rate: {summary.resolution_rate:.1%}")
print(summary.vocabulary_coverage)

Inputs are auto-chunked at 1000 entities per request against the native POST /map/batch endpoint, so 10,000 records cost 10 round-trips.

Dataset upload (synchronous)

For larger inputs, hand the server a TSV/CSV file directly and stream results back. The server processes the file row-by-row over the POST /map/dataset/stream endpoint:

from pathlib import Path
from biomapper import map_dataset_file_sync

result = map_dataset_file_sync(
    Path("compounds.tsv"),
    name_column="name",
    provided_id_columns=["hmdb_id"],
    progress=True,         # tqdm bar
    total_hint=1000,       # optional; enables % progress
)
result.raise_for_error()   # opt-in: raise BioMapperError if the stream truncated
print(f"resolved {sum(1 for r in result.results if r.resolved)} of {len(result.results)}")

name_column and provided_id_columns are required — the server uses them to map your file's columns to entity names and identifier hints. For per-result streaming into a UI or custom processing, use the async BioMapperClient.map_dataset_file_iter method (see the tutorial notebook in notebooks/).

Discovering what the API supports

from biomapper import list_annotators, list_vocabularies, list_entity_types

for a in list_annotators():
    print(f"{a.slug:30s} {a.name}")

# 300+ supported vocabularies (CHEBI, HMDB, PubChem, …)
vocabs = list_vocabularies()
print(f"{len(vocabs)} vocabularies supported")

# Biolink entity types with their known aliases
for et in list_entity_types():
    print(f"{et.type}: {', '.join(et.aliases)}")

Async usage

import asyncio
from biomapper import BioMapperClient

async def main() -> None:
    async with BioMapperClient() as client:
        # Verify connectivity
        health = await client.health_check()
        print(health)  # {'status': 'healthy', ...}

        # Single
        result = await client.map_entity(
            "L-Histidine",
            identifiers={"HMDB": "HMDB00177"},
        )

        # Batch — auto-chunked at 1000 entities per request
        results = await client.map_entities(
            [{"name": "L-Histidine"}, {"name": "Glucose"}],
            progress=True,
        )

        # Stream from a file — per-result as they arrive
        from pathlib import Path
        async for r in client.map_dataset_file_iter(
            Path("compounds.tsv"),
            name_column="name",
            provided_id_columns=["hmdb_id"],
        ):
            print(r.query_name, r.primary_curie)

asyncio.run(main())

map_dataset_file_iter is the primitive for UIs and custom processing that want per-result reactivity. Callers needing a blocking, fully-collected result should use map_dataset_file_sync instead (see above).

Jupyter notebooks

Apply nest_asyncio before using sync helpers inside a running event loop:

import nest_asyncio
nest_asyncio.apply()  # required in Jupyter

from biomapper import map_entities
results = map_entities([{"name": "L-Histidine"}], progress=True)

Preprocessing functions

from biomapper.extras.metabolon import clean_compound_name, extract_hmdb_id

# Strip quotes and collision-energy suffixes
clean_compound_name('"1,3-Diphenylguanidine_CE45"')  # '1,3-Diphenylguanidine'
clean_compound_name('"4,6-DIOXOHEPTANOIC ACID"')     # '4,6-DIOXOHEPTANOIC ACID'
clean_compound_name('L-Histidine')                   # 'L-Histidine'  (unchanged)

# Extract HMDB accessions from ms1_compound_name format
extract_hmdb_id('HMDB:HMDB03349-2257 L-Dihydroorotic acid')  # 'HMDB03349'
extract_hmdb_id('HMDB00177')                                  # 'HMDB00177'
extract_hmdb_id(None)                                         # None

API reference

MappingResult

Attribute Type Description
query_name str Name submitted to the API
resolved bool Whether any identifier was returned
primary_curie str | None First CURIE in the response
chosen_kg_id str | None Resolver-selected knowledge graph ID
confidence_score float | None Highest score across annotators
confidence_tier str "high" (≥2.0) / "medium" (1–2) / "low" (<1) / "unknown"
identifiers dict[str, list[str]] Vocabulary → IDs, e.g. {"CHEBI": ["15971"]}
kg_equivalent_ids dict[str, list[str]] All equivalent IDs from the resolved KG node, by CURIE prefix
hmdb_hint str | None HMDB hint passed in the request
error str | None Error message if mapping failed
result.ids_for("CHEBI")        # ['15971']
result.ids_for("refmet_id")    # ['RM0129894']
result.ids_for("PUBCHEM.COMPOUND")  # []

# KG equivalent IDs — all identifiers from the resolved knowledge graph node
result.equivalent_ids_for("HMDB")  # ['HMDB0000177']
result.equivalent_ids_for("LM")   # ['ST01010001', 'ST01010093']

DatasetMappingResult

Return type of map_dataset_file_sync. Captures per-row results plus an opt-in error signal for partial runs.

Attribute Type Description
results list[MappingResult] Per-row mapping outcomes in server-emitted order
stats dict[str, Any] Server-provided summary. Empty unless the stream emits a terminal summary line
metadata ApiMetadata Request metadata; stays at defaults when the stream truncates before completion
error str | None Mid-stream transport failure text. None on clean runs
result.raise_for_error()   # raises BioMapperError if .error is set; else no-op

raise_for_error mirrors httpx.Response.raise_for_status and turns the partial-result contract into an explicit caller opt-in — silent consumption of a truncated run (using .results without checking .error) is the footgun this model is designed to prevent.

Note: confidence_score on dataset-stream results is always None — the /map/dataset/stream endpoint emits a slimmer per-row payload than /map/batch and does not include the annotator assigned_ids block. Use map_entity / map_entities if you need confidence tiers.

Confidence tiers

Score Tier Recommended action
≥ 2.0 high Accept without review
1.0–2.0 medium Quick sanity check
< 1.0 low Manual review recommended
None unknown No score returned (e.g. HMDB-hint resolved)

Error handling

from biomapper import (
    BioMapperError,       # base class
    BioMapperAuthError,   # 401/403 — bad API key
    BioMapperRateLimitError,  # 429 — throttled
    BioMapperServerError,     # 5xx
    BioMapperTimeoutError,    # request timeout
    BioMapperConfigError,     # missing API key / bad config
)

try:
    result = map_entity("Glucose")
except BioMapperRateLimitError as e:
    print(f"Throttled. Retry after: {e.retry_after}s")
except BioMapperAuthError:
    print("Check your BIOMAPPER_API_KEY")

In batch mode (map_entities), per-record errors are caught and returned as MappingResult(error=...) rather than aborting the batch.

Dataset streaming (map_dataset_file_sync) uses a two-tier contract:

  • Initial-request errors (401, 422, 500, connect timeout) raise as typed exceptions — these happen before any row is processed, so partial results don't exist to preserve.
  • Mid-stream transport failures are captured into DatasetMappingResult.error with the partial results preserved in .results. Call .raise_for_error() to get exception semantics, or inspect .error directly for "accept partial, log the rest" workflows.

Callback exceptions raised from on_result propagate unwrapped and replace the return value — partial results collected up to that point are lost. For UI consumers with failure-prone callbacks, wrap the callback body in your own try/except if you want partial data to survive.


Development

git clone https://github.com/trentleslie/biomapper
cd biomapper
poetry install --with dev --extras all

make check          # format → lint → type-check → test
make test           # tests only
make coverage       # HTML coverage report

License

MIT — see LICENSE.


Related

  • BioMapper2 API: https://biomapper.expertintheloop.io

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

biomapper-1.1.0.tar.gz (27.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

biomapper-1.1.0-py3-none-any.whl (29.9 kB view details)

Uploaded Python 3

File details

Details for the file biomapper-1.1.0.tar.gz.

File metadata

  • Download URL: biomapper-1.1.0.tar.gz
  • Upload date:
  • Size: 27.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for biomapper-1.1.0.tar.gz
Algorithm Hash digest
SHA256 312b208e6120cf8f3314afc8d4a1b6af5011d69617acc3ec510befa11cc94f09
MD5 a21121029c74bc96dfb5c2cd49a3cedd
BLAKE2b-256 1b24513acfbe18b09ce0f3ecbce4211a882fd956d2891b43f2563517d457f903

See more details on using hashes here.

File details

Details for the file biomapper-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: biomapper-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 29.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for biomapper-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6a5a4f28a47f1b5b549a2cb7e209ee79cf759e81b3c138ac944325ecf4978bfc
MD5 1809ce296d9f1120afd9f858e8a407c3
BLAKE2b-256 148f9f4a0b2e195b6a5621bb325f935bfd3ef987ffe09dbf2c81ec8f678d5509

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page