Official Python SDK for the Rosenbound clinical-AI platform
Project description
rosenbound
Official Python SDK for the Rosenbound clinical-AI platform — a causal, auditable clinical decision-support platform for structured medical and pharmacological data.
The SDK is a typed, Pydantic-based wrapper over the platform's REST API (platform API v1). It is sync-first; an asynchronous client is reserved for a future release.
Status — v0.1.0 (alpha). This release implements the Cohorts (list / get / create), Studies (list / get / create / run / results), and Certificates (reproducibility certificate + methodology PDF) surfaces — the full cohort → study → run → result → certificate loop. An asynchronous client, SSO auth, certificate-chain
verify(), and per-feature resources (MedDRA, E2B, drug-coder, …) are reserved for a future release.
Installation
pip install rosenbound
Requires Python ≥ 3.10. Runtime dependencies: httpx, pydantic (v2),
typing-extensions.
During pre-release the package installs from a local checkout:
cd code/sdk/python
pip install -e ".[dev]"
Authentication
The SDK authenticates with a platform API token, sent on every request as
Authorization: Bearer <token>. Tokens are issued from the in-product API-key
management surface (which requires an admin role). The token is opaque to the
SDK; the platform validates it and resolves your tenant + role from it.
from rosenbound import Client
client = Client(api_key="rk_your_token_here")
By default the client talks to https://api.rosenbound.com. Point it elsewhere
(e.g. a staging host) with base_url:
client = Client(api_key="rk_...", base_url="https://staging.api.rosenbound.com")
Quick start — cohorts
from rosenbound import Client
with Client(api_key="rk_...") as client:
# Create a cohort from a CSV file plus a cohort-definition DSL.
cohort = client.cohorts.create(
name="metformin_t2dm_2026",
archetype="treatment_policy",
dsl_yaml="path/to/definition.yaml", # YAML text or a Path to a file
csv_path="my_cohort.csv",
)
print(cohort.id, cohort.status)
# List cohorts (paginated, filterable by archetype + name search).
page = client.cohorts.list(archetype="treatment_policy", page=1, page_size=50)
print(f"{len(page.items)} of {page.total} cohorts")
for c in page.items:
print(c.id, c.name, c.archetype)
# Fetch one cohort by id.
detail = client.cohorts.get(cohort.id)
print(detail.cohort_def_hash, detail.data_hash)
Cohort archetypes
archetype is one of:
| value | meaning |
|---|---|
pv |
pharmacovigilance pentagon cohort |
longitudinal |
repeated-measures / longitudinal cohort |
treatment_policy |
treatment-policy (A-vs-B) cohort |
time_to_event |
survival / time-to-event cohort |
The dsl_yaml cohort definition must validate against the chosen archetype,
and every column it references must exist in the uploaded CSV header — the
platform rejects a mismatch with a ValidationError at upload time.
Studies
A study runs a causal protocol against a cohort and produces a 5-method (Pentagon) estimate set. The workflow is create → run → fetch results:
with Client(api_key="rk_...") as client:
study = client.studies.create(
cohort_id=cohort.id,
protocol_id="pv_pentagon",
estimator_selection={}, # per-protocol config; {} = protocol defaults
)
print(study.id, study.status) # -> "draft"
# run() executes synchronously: it returns once the Pentagon run reaches a
# terminal state, so the returned study is normally status="complete".
completed = client.studies.run(study.id, seed=17) # seed pins reproducibility
print(completed.status) # -> "complete"
result = client.studies.get_results(study.id)
print(result.pentagon_result_payload) # the full 5-method estimate set
# List + filter (single status value, optional cohort filter, paginated).
page = client.studies.list(cohort_id=cohort.id, status="complete")
print(f"{len(page.items)} of {page.total} studies")
protocol_id is one of:
| value | meaning |
|---|---|
pv_pentagon |
pharmacovigilance 5-method Pentagon |
cr_longitudinal |
clinical-research longitudinal protocol |
cr_treatment_policy |
treatment-policy (A-vs-B) protocol |
cr_tte |
time-to-event / survival protocol |
run() is synchronous today. If a run is already in progress it raises
ConflictError (409); a server-side run failure raises ServerError (500).
When the platform later adds an asynchronous job model, a blocking
wait_for_completion poller will land alongside the async client (v0.2).
Certificates
Every completed run mints a reproducibility certificate — the hashes and
environment pins an external auditor re-derives the result from. The
certificate is embedded in the study response; certificates.get() surfaces it:
with Client(api_key="rk_...") as client:
cert = client.certificates.get(study.id)
print(cert.code_version) # the code version that produced the result
print(cert.cohort_def_yaml_hash, cert.cohort_data_hash)
print(cert.library_versions) # {package: version} pin map
print(cert.generated_at)
# Download the human-readable methodology PDF.
client.certificates.download_pdf(study.id, output_path="methodology.pdf")
| field | meaning |
|---|---|
cohort_def_yaml_hash |
SHA-256 of the cohort-definition YAML |
cohort_data_hash |
SHA-256 of the input cohort data |
code_version |
resolved code version (git SHA) that produced the result |
library_versions |
{package: version} environment pin map |
generated_at |
server-side timestamp the certificate was minted |
certificates.get() raises NotFoundError if the study has no completed run
yet. download_pdf() raises ServerError if the methodology PDF has not been
materialized for the latest run (the platform returns 503 until the generator
has produced the artifact). Re-validating the certificate chain against the
platform's authoritative ledger — certificates.verify() — is a v0.2 stub that
currently raises NotImplementedError.
End-to-end example
The full loop in one block — upload a cohort, define and run a study, read the result, and fetch its reproducibility certificate:
from rosenbound import Client
with Client(api_key="rk_...") as client:
cohort = client.cohorts.create(
name="metformin_t2dm_2026",
archetype="treatment_policy",
dsl_yaml="definition.yaml",
csv_path="my_cohort.csv",
)
study = client.studies.create(
cohort_id=cohort.id,
protocol_id="pv_pentagon",
)
client.studies.run(study.id, seed=17)
result = client.studies.get_results(study.id)
cert = client.certificates.get(study.id)
print("estimates:", result.pentagon_result_payload)
print("reproducible under code version:", cert.code_version)
Error handling
Every SDK error derives from RosenboundError. The HTTP layer maps response
status codes to specific subclasses:
from rosenbound import (
Client,
AuthenticationError, # 401 — invalid / missing API key
NotFoundError, # 404 — absent, or not visible to your tenant
ValidationError, # 422 — payload rejected (carries .errors)
ConflictError, # 409 — state conflict (e.g. study already running)
RateLimitError, # 429 — rate limit (carries .retry_after_seconds)
ServerError, # 5xx — platform fault, after retries are exhausted
RosenboundError, # base class — catch-all
)
with Client(api_key="rk_...") as client:
try:
client.cohorts.create(
name="bad",
archetype="pv",
dsl_yaml=open("definition.yaml").read(),
csv_path="cohort.csv",
)
except ValidationError as exc:
for err in exc.errors: # the platform's structured error body
print(err)
except RateLimitError as exc:
print("retry after", exc.retry_after_seconds, "s")
A 404 means "not found for you" — the platform returns 404 (never 403) for cross-tenant reads so resource existence never leaks across tenants.
Retries and timeouts
The client retries transient failures (HTTP 429 and 5xx) up to
max_retries times with deterministic exponential backoff
(0.5 * 2 ** attempt seconds). A 429 carrying a numeric Retry-After header
honors that value instead. Non-retryable errors (4xx other than 429) raise
immediately.
client = Client(
api_key="rk_...",
timeout=30.0, # per-request timeout, seconds (default 30)
max_retries=3, # additional attempts on 429 / 5xx (default 3)
)
Network-layer failures (connection refused, DNS, read timeout) propagate as the
underlying httpx transport exceptions and are not wrapped.
Resource management
Client owns a connection pool. Use it as a context manager, or call
close() explicitly:
with Client(api_key="rk_...") as client:
... # pool is released on exit
# or
client = Client(api_key="rk_...")
try:
...
finally:
client.close()
A single client is safe for serial use from one thread; construct one client per thread for concurrent use.
Compatibility
- Python: ≥ 3.10
- Platform API: v1
- Concurrency: synchronous client only (async reserved for v0.2)
- SDK version: see
rosenbound.__version__
License
Apache-2.0. See LICENSE.
Contact
Issues and questions: the Rosenbound platform team.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rosenbound-0.1.0.tar.gz.
File metadata
- Download URL: rosenbound-0.1.0.tar.gz
- Upload date:
- Size: 25.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1ae616f886e94ad0e1ec7c446fb6c1e79e3c05e413beccc6b9a99f21a388120
|
|
| MD5 |
4df93271ed5567e332b84172302d5001
|
|
| BLAKE2b-256 |
f976252b3e3d8f2ecbf9e0cde7a49b4012350baa14e50354021a7595833ef62f
|
File details
Details for the file rosenbound-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rosenbound-0.1.0-py3-none-any.whl
- Upload date:
- Size: 25.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
279b8ff03de07ab277ce7df0ec2648a21343b02ec5b8459c9d2e261a29da317a
|
|
| MD5 |
fa65ee718a7eec5e02eee72d5b11e864
|
|
| BLAKE2b-256 |
004744c26480b712d816a6f07216fb89d8ec4922e6a52f1d797db22b0b81c264
|