Skip to main content

Python SDK for NomadicAI's DriveMonitor API

Project description

Nomadic Python SDK

A Python client library for the Nomadic DriveMonitor API, allowing you to upload and analyze driving videos programmatically.

Installation

From PyPI (for users)

pip install nomadic

For Development (from source)

To install the package in development mode, where changes to the code will be immediately reflected without reinstallation:

# Clone the repository
git clone https://github.com/nomadic-ml/drivemonitor.git
cd sdk

# For development: Install in editable mode
pip install -e .

With this installation, any changes you make to the code will be immediately available when you import the package.

Quick Start

from nomadic import NomadicAI

# Initialize the client with your API key
client = NomadicAI(api_key="your_api_key")

# Upload a video and analyze it in one step
result = client.video.upload_and_analyze("path/to/your/video.mp4")

# Print the detected events
for event in result["events"]:
    print(f"Event: {event['type']} at {event['time']}s - {event['description']}")
# For a batch upload

videos_list = ["path/to/video-1.mp4", "path/to/video-2.mp4"]
batch_results = client.video.upload_and_analyze_videos(videos_list, wait_for_completion=False)

    
video_ids = [
    res.get("video_id")
    for res in batch_results
    if res                                         # safety for None
    ]

    
full_results = client.video.wait_for_analyses(video_ids)

Authentication

You need an API key to use the Nomadic API. You can get one by:

  1. Log in to your DriveMonitor account
  2. Go to Profile > API Key
  3. Generate a new API key

Then use this key when initializing the client:

client = NomadicAI(api_key="your_api_key")

Video Upload and Analysis

Upload a video

# Preferred: upload with the high-level helper
upload_result = client.video.upload(
    "path/to/video.mp4",
    metadata_file="path/to/overlay_schema.json",  # optional
    wait_for_uploaded=True,
)
video_id = upload_result["video_id"]

# Legacy helpers remain available if you need fine-grained control
result = client.video.upload_video(
    source="file",
    file_path="path/to/video.mp4"
)

The metadata_file argument is optional and accepts any of the following:

  • Path to a JSON metadata file describing per-frame overlay fields
  • A Python dict that can be serialised to the Nomadic overlay schema
  • Raw JSON string or UTF-8 bytes containing the schema

When provided, the SDK sends the schema to /api/upload-video so the backend can extract on-screen telemetry (timestamps, GPS, speed, etc.) during later analyses. If you specify metadata_file while uploading multiple videos at once, the SDK will raise a ValidationError—attach metadata on single uploads only.

Upload videos stored in Google Cloud Storage

You can import .mp4 objects directly from GCS once you have saved their credentials as a cloud integration:

# Trigger imports without re-downloading files locally
upload_result = client.video.upload([
    "gs://drive-monitor/uploads/trip-042/video_front.mp4",
    "gs://drive-monitor/uploads/trip-042/video_rear.mp4",
],
    folder="Fleet Library",
    wait_for_uploaded=False,    # async import – poll later if you prefer
)

# Provide an explicit integration id when you have multiple saved credentials
upload_result = client.video.upload([
    "gs://drive-monitor/uploads/trip-042/video_front.mp4",
],
    integration_id="gcs_int_123",
)

Rules for the GCS path:

  • Only .mp4 objects are accepted today.
  • All URIs within a single call must share the same bucket.
  • Pass either a single string or a list of literal blob URIs—wildcards are not supported.
  • If you omit integration_id, the SDK tries each saved integration whose bucket matches the URI until one succeeds. Provide the id explicitly when multiple integrations share the bucket.

To discover the ids you have already saved (for example, those created through the DriveMonitor UI) call:

for item in client.cloud_integrations.list(type="gcs"):
    print(item["name"], item["bucket"], item["id"])

Import videos from Hugging Face Buckets

You can save a Hugging Face bucket integration and then use that integration with client.upload("hf://buckets/..."):

integration = client.cloud_integrations.add_hf_bucket(
    name="HF footage",
    bucket="JohnnyMnenonic/test",
    token="hf_xxx",
    prefix="incoming/",
)

result = client.upload(
    "hf://buckets/JohnnyMnenonic/test/incoming/front.mp4",
    integration_id=integration["id"],
    wait_for_uploaded=False,
)

print(result["import_job_id"])

NomadicAI stores the Hugging Face token for saved integrations. Prefer a fine-grained token if Hugging Face supports the required bucket access; if not, use a dedicated token or account reserved for storage imports.

You can also call client.upload("hf://buckets/namespace/name/path.mp4") without integration_id. The backend will first try a saved hf_bucket integration for that bucket and then fall back to public access.

Import MCAPs from S3

MCAP cloud ingest uses AWS IAM role federation and Google Storage Transfer Service instead of downloading the file through the backend. Save the role once, then call client.upload("s3://...mcap"):

integration = client.cloud_integrations.add_s3_storage_transfer(
    name="MCAP archive",
    bucket="drive-archive",
    prefix="mcap/",
    role_arn="arn:aws:iam::123456789012:role/NomadicMcapTransferRole",
)

job = client.upload(
    "s3://drive-archive/mcap/run-042.mcap",
    integration_id=integration["id"],
    wait_for_uploaded=False,
)

print(job["mcap_import_job_id"])
print(job["ingests"][0]["mcap_ingest_id"])

final = client.wait_for_mcap_import_job(job["mcap_import_job_id"])
print(final["status"], final["video_ids"])

If you omit integration_id, the SDK looks for a saved s3_storage_transfer integration with the same bucket. MCAP cloud ingest currently supports S3 only; gs://...mcap and mixed .mp4/.mcap calls are rejected.

Analyze a video

analysis = client.video.analyze(
    video_id,
    prompt="Did the driver stop before the crosswalk? Include timestamp and GPS evidence if available.",
)

events = analysis.get("events", [])

Telemetry extraction is prompt-driven:

  • Upload metadata alongside the video when you have structured per-frame telemetry.
  • Request the telemetry you need directly in the prompt, such as timestamps, GPS, speed, altitude, or other fields supplied in metadata.

Each event returned by the SDK now includes an overlay dictionary. Overlay entries are keyed by the field name (for example frame_timestamp, frame_speed, etc.) and map to {"start": ..., "end": ...} pairs with the values that were read from the video frames or metadata.

Generate an ASAM OpenODD CSV

The client exposes a top-level helper, client.generate_structured_odd(...), that mirrors the DriveMonitor UI workflow and accepts the same column schema. You can reuse the SDK’s built-in DEFAULT_STRUCTURED_ODD_COLUMNS constant or pass your own list of definitions.

from nomadic import NomadicAI, DEFAULT_STRUCTURED_ODD_COLUMNS

client = NomadicAI(api_key="your_api_key")

# Optionally customise the column schema before calling the export.
columns = [
    {
        "name": "timestamp",
        "prompt": "Log the timestamp in ISO 8601 format (placeholder date 2024-01-01).",
        "type": "YYYY-MM-DDTHH:MM:SSZ",
    },
    {
        "name": "scenery.road.type",
        "prompt": "The type of road the vehicle is on.",
        "type": "categorical",
        "literals": ["motorway", "rural", "urban_street", "parking_lot", "unpaved", "unknown"],
    },
    # ...add or tweak additional columns...
]

odd = client.generate_structured_odd(
    video_id="VIDEO_ID_FROM_UPLOAD",
    columns=columns or DEFAULT_STRUCTURED_ODD_COLUMNS,
)

csv_text = odd["csv"]
share_url = odd.get("share_url")
print(csv_text.splitlines()[0])  # Header row

If you customise the schema in the DriveMonitor UI, use the Copy SDK snippet button to paste a ready-made Python snippet that mirrors the on-screen column configuration. The SDK automatically mirrors the Firestore reasoning trace path and returns any generated share links together with the CSV data.

Upload and analyze in one step

# Upload and analyze a video, waiting for results
analysis = client.video.upload_and_analyze("path/to/video.mp4")

# Or just start the process without waiting
result = client.video.upload_and_analyze("path/to/video.mp4", wait_for_completion=False)

Advanced Usage

Filter events by severity or type

# Get only high severity events
high_severity_events = client.video.get_video_events(
    video_id=video_id,
    severity="high"
)

# Get only traffic violation events
traffic_violations = client.video.get_video_events(
    video_id=video_id,
    event_type="Traffic Violation"
)

Custom timeout and polling interval

# Wait for analysis with a custom timeout and polling interval
client.video.wait_for_analysis(
    video_id=video_id,
    timeout=1200,  # 20 minutes
    poll_interval=10  # Check every 10 seconds
)

Batch analyses across many videos

When you provide a list of video IDs to client.video.analyze(...), the SDK now creates a backend batch automatically for prompt analysis, then keeps polling the /batch/{batch_id}/status endpoint until the orchestrator finishes. The return value is a dictionary with two keys:

  • batch_metadata — contains the batch_id, a fully-qualified batch_viewer_url pointing at the Batch Results Viewer, and a batch_type flag. Prompt-analysis batches may currently appear as the legacy internal "ask" category.
  • results — the list of per-video analysis dictionaries (exactly the same schema you would get from calling analyze() on a single video).

List videos in a folder

Use my_videos() to list videos and check their upload status:

# List all videos in a folder
videos = client.my_videos(folder="My-Fleet-Videos")

# Check which videos are ready for analysis
for video in videos:
    print(f"{video['video_name']}: {video['status']}")

# Filter to only uploaded (ready) videos
ready_videos = [v for v in videos if v["status"] == "uploaded"]

Each video dict contains:

Field Description
video_id Unique identifier
video_name Original filename
duration_s Video duration in seconds
folder_id Folder identifier
status Upload status (see below)
folder_name Folder name (if in a folder)
org_id Organization ID (if org-scoped)

Upload status values:

Status Meaning
processing Upload in progress
uploading_failed Upload failed
uploaded Ready for analysis

Manage cloud integrations

The SDK exposes a dedicated helper to manage saved cloud credentials:

# List every integration visible to your user/org
integrations = client.cloud_integrations.list()

# Filter by provider (either "gcs" or "s3")
gcs_only = client.cloud_integrations.list(type="gcs")

# Fetch the Google identity to trust in an AWS IAM role for MCAP transfer
setup = client.cloud_integrations.get_s3_storage_transfer_setup()
print(setup["google_service_account_subject_id"])

# Add a new S3 integration using AWS keys
client.cloud_integrations.add(
    type="s3",
    name="AWS archive",
    bucket="drive-archive",
    prefix="raw/",
    region="us-east-1",
    credentials={
        "accessKeyId": "...",
        "secretAccessKey": "...",
        "sessionToken": "...",  # optional
    },
)

# Add a Cloudflare R2 integration using the same S3 helper
client.cloud_integrations.add(
    type="s3",
    name="Customer R2",
    bucket="customer-r2",
    prefix="incoming/",
    endpoint_url="https://<account>.r2.cloudflarestorage.com",
    region="auto",
    credentials={
        "accessKeyId": "...",
        "secretAccessKey": "...",
    },
)

# Add a role-based S3 integration for MCAP cloud ingest
client.cloud_integrations.add_s3_storage_transfer(
    name="MCAP archive",
    bucket="drive-archive",
    prefix="mcap/",
    role_arn="arn:aws:iam::123456789012:role/NomadicMcapTransferRole",
)

Once an integration exists, you only need its id when pulling files directly from the bucket. Call client.upload("gs://bucket/path.mp4", integration_id="...") or client.upload("s3://bucket/path.mp4", integration_id="...") for videos, or client.upload("s3://bucket/path.mcap", integration_id="...") for MCAPs. The SDK hands the request to the correct backend importer. Credentials are never embedded in the upload request body.

Running SDK integration tests locally

The integration suite is tagged with calls_api and exercises the live backend endpoints. Make sure you have a valid API key and a backend domain reachable from your environment, then run:

cd sdk
export NOMADICML_API_KEY=YOUR_API_KEY
export VITE_BACKEND_DOMAIN=http://127.0.0.1:8099
python -u -m pytest -m calls_api -vvs -rPfE --durations=0 --capture=no tests/test_integration.py

The command disables pytest's output capture so you can follow streaming logs while the long-running tests execute.

batch = client.video.analyze(
    ["video_id_1", "video_id_2", "video_id_3"],
    prompt="Did the driver stop before the crosswalk?",
)

print(batch["batch_metadata"])
for item in batch["results"]:
    print(item["video_id"], item["analysis_id"], len(item.get("events", [])))

Custom API endpoint

If you're using a custom deployment of the DriveMonitor backend:

# Connect to a local or custom deployment
client = NomadicAI(
    api_key="your_api_key",
    base_url="http://localhost:8099"
)

Search across videos

Run a semantic search on several of your videos at once:

results = client.video.search(
    "red pickup truck overtaking",
    ["video_id_1", "video_id_2"]
)
for match in results["matches"]:
    print(match["videoId"], match["eventIndex"], match["similarity"])

Error Handling

The SDK provides specific exceptions for different error types:

from nomadic import NomadicAIError, AuthenticationError, VideoUploadError

try:
    client.video.upload_and_analyze("path/to/video.mp4")
except AuthenticationError:
    print("API key is invalid or expired")
except VideoUploadError as e:
    print(f"Failed to upload video: {e}")
except NomadicAIError as e:
    print(f"An error occurred: {e}")

Development

Setup

Clone the repository and install development dependencies:

git clone https://github.com/nomadic-ml/drivemonitor.git
cd drivemonitor/sdk
pip install -e ".[dev]"

Running tests

pytest

License

MIT License. See LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nomadic-0.1.54.tar.gz (110.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nomadic-0.1.54-py3-none-any.whl (112.7 kB view details)

Uploaded Python 3

File details

Details for the file nomadic-0.1.54.tar.gz.

File metadata

  • Download URL: nomadic-0.1.54.tar.gz
  • Upload date:
  • Size: 110.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for nomadic-0.1.54.tar.gz
Algorithm Hash digest
SHA256 21fa2017545bb341b87bf267f0b49d0cb067989a7e5937f8772750c65deb4f0a
MD5 c608e1ed44e890527914244c387b93e4
BLAKE2b-256 5dd53c3a0e8654d0ee67ad2bbef14919966ad552dccb5768df4809f33b40355d

See more details on using hashes here.

File details

Details for the file nomadic-0.1.54-py3-none-any.whl.

File metadata

  • Download URL: nomadic-0.1.54-py3-none-any.whl
  • Upload date:
  • Size: 112.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for nomadic-0.1.54-py3-none-any.whl
Algorithm Hash digest
SHA256 a9672b39a56718c83e7d76b3492170f3ba59ec4e993df2ea4c8124ae049a7d4f
MD5 8ebe1638ffc779bfddb4f28800bc4dda
BLAKE2b-256 13807fd406b8a5790a2137175b98d9a0cf78aa0d1955ceb20cff7327cb082c8d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page