Skip to main content

Python SDK for the Agimus Platform

Project description

Agimus Python SDK

Official Python SDK for the Agimus Platform.

Installation

pip install agimus

Requirements: Python 3.9+

Quick Start

from agimus import AgimusClient

client = AgimusClient(api_key="agm_your_key_here")

# Query objects (ontology)
customers = client.objects("Customer").filter(status="active").all()

# Download raw datasets
df = client.datasets.get("customers").to_pandas()

Ontology — Objects API

The Agimus Object Store is built on an ontology that defines your data model. Each element has an apiName which is what you use in the SDK:

  • Entities — Data types (e.g., Customer, Order, Product)
  • Properties — Fields on entities (e.g., customerId, name, createdAt)
  • Links — Relationships between entities, with forward and reverse API names

Use client.list_entities() and client.get_entity_schema("EntityName") to discover available entities, properties, and links.

Primary Keys

Every entity has a primary key property defined in the ontology.

Valid PK types: string, integer, long, short, byte

# These are equivalent
client.objects("Customer").get(123)
client.objects("Customer").get("123")

Property Types

The SDK auto-fetches each entity's schema on first access (cached on the client) and uses it to convert wire values into typed Python objects on read. Most users never see the wire format — decimal rows come back as Decimal, bytes rows come back as bytes, and you can pass the matching Python type directly on writes.

Primitives:

Type Read returns Write accepts Wire (JSON) Notes
string str str "text"
integer int int 123 32-bit signed
long int int 123 64-bit signed
short int int 123 16-bit signed
byte int int 123 8-bit signed
float float float 1.5 32-bit
double float float 1.5 64-bit
decimal Decimal Decimal or numeric str "123.45" Arbitrary precision; wire format is a JSON string to preserve precision
boolean bool bool true
date str datetime.date or ISO str "2024-01-15" ISO 8601 date
timestamp str datetime.datetime or ISO str "2024-01-15T10:30:00Z" ISO 8601 datetime
time str datetime.time or ISO str "10:30:00" ISO 8601 time
bytes bytes bytes or base64 str "base64..." Base64 on the wire; SDK decodes on read

UUID: primary key values typed as string accept uuid.UUID directly on write — encoded as the canonical 8-4-4-4-12 string form.

Complex types (stored as JSON objects):

Type Description
struct Nested object with defined fields
geopoint {"lat": 40.7, "lng": -74.0}
geoshape GeoJSON geometry object
attachment File attachment metadata
media_reference Media file reference

Arrays: Properties can be arrays (e.g., tags: string[]). Pass as Python lists. Arrays of decimal and bytes are coerced element-wise on read, the same way scalars are.

Nullable: Check nullable in the schema. Non-nullable fields are required on create.

Authentication

API keys are created in the Agimus dashboard under Settings > API Access. Keys use the format agm_<prefix>_<secret> and inherit permissions from their associated Service User.

client = AgimusClient(
    api_key="agm_...",
    base_url="https://api.agimus.ai",  # Optional: override base URL
    timeout=30.0,                       # Optional: request timeout (default: 30s)
)

Querying Objects

All queries start with client.objects("EntityName") and support method chaining.

Filtering

Use Django-style double-underscore syntax for operators:

# Equals (default)
.filter(status="active")

# Comparison
.filter(age__gt=18)        # greater than
.filter(age__gte=18)       # greater than or equal
.filter(age__lt=65)        # less than
.filter(age__lte=65)       # less than or equal
.filter(age__ne=0)         # not equal
.filter(age__between=[18, 65])  # between (inclusive)

# Lists
.filter(region__in=["US", "EU"])      # in list
.filter(status__nin=["deleted"])      # not in list

# Strings
.filter(name__like="Acme%")           # SQL LIKE (case-sensitive)
.filter(name__ilike="%acme%")         # SQL LIKE (case-insensitive)
.filter(name__starts_with="A")        # starts with
.filter(name__ends_with="Corp")       # ends with

# Null checks
.filter(deletedAt__is_null=True)      # is null
.filter(verifiedAt__is_not_null=True) # is not null

# Empty checks (arrays/strings)
.filter(tags__is_empty=True)          # is empty
.filter(tags__is_not_empty=True)      # is not empty

# Array operations
.filter(tags__contains="vip")         # array contains value
.filter(tags__overlaps=["a", "b"])    # arrays overlap

# Multiple filters (AND)
.filter(status="active", region="US")

Sorting

.sort("name")                    # ascending
.sort("-createdAt")              # descending (prefix with -)
.sort("-createdAt", "name")      # multiple fields

Alias: .order_by()

Field Selection

.fields("customerId", "name", "email").all()

Alias: .select()

Expanding Relations

Include related objects inline using link apiName:

.expand("orders").all()
.expand("orders", "orders.items").all()  # nested

# Or on single object fetch
.get(123, expand=["orders"])

Alias: .include()

Pagination

# Limit total results returned
.limit(100).all()

# Set page size for API calls (max 100, default 50)
.page_size(25).all()

# Auto-pagination with iteration
for customer in client.objects("Customer").filter(status="active"):
    print(customer["name"])

Executing Queries

.all()      # Get all results as list
.first()    # Get first result (or None)
.exists()   # Check if any results exist (bool)
.count()    # Get count of matching objects
.iter()     # Iterator with auto-pagination

Single Object Operations

# Get by primary key (raises NotFoundError if not found)
customer = client.objects("Customer").get(123)

# Get by primary key (returns None if not found)
customer = client.objects("Customer").get_or_none(123)

# Get multiple by primary keys (max 100)
result = client.objects("Customer").batch_get([1, 2, 3])
# Returns: {"data": [...], "found": 3, "requested": 3}

Distinct Values

Returns distinct values for a field, sorted by frequency (most common first).

regions = client.objects("Customer").distinct("region")
# Returns: ["US", "EU", "APAC", ...]

# With filter
regions = client.objects("Customer").filter(status="active").distinct("region")

# With counts (returns [{"value": v, "count": n}, ...] sorted by count DESC)
distribution = client.objects("Customer").distinct("region", with_counts=True)
# Returns: [{"value": "US", "count": 1245}, {"value": "EU", "count": 308}, ...]

Aggregation

result = client.objects("Order").filter(status="completed").aggregate(
    metrics=[
        {"op": "count", "alias": "orderCount"},
        {"op": "sum", "field": "total", "alias": "revenue"},
        {"op": "avg", "field": "total", "alias": "avgOrder"},
    ],
    group_by=[
        {"field": "region"},
        {"field": "createdAt", "granularity": "month"}
    ],
    sort=["-revenue"],
    limit=100
)

Operators: count, count_distinct, sum, avg, min, max, first, last

Time Granularities: year, quarter, month, week, day, hour

Link Traversal

Navigate relationships using the link's API name. Pagination is cursor-based (keyset on the target entity's primary key) — performance stays constant regardless of how deep you page.

# Customer -> orders (forward link)
orders = client.objects("Customer").links(123, "orders")
# Returns: {"data": [...], "cursor": "<opaque>" or None, "hasMore": False}

# Order -> customer (reverse link)
customer = client.objects("Order").links(456, "customer")

# Walk every page
cursor = None
while True:
    page = client.objects("Customer").links(123, "orders", page_size=50, cursor=cursor)
    for order in page["data"]:
        process(order)
    if not page["hasMore"]:
        break
    cursor = page["cursor"]

# Count related objects
count = client.objects("Customer").count_links(123, "orders")

Write Operations

Create

customer = client.objects("Customer").create({
    "customerId": 1,        # PK required
    "name": "Acme Corp",    # non-nullable fields required
    "email": "contact@acme.com",
    "status": "active"
})

Update

# Partial update - only specified fields change
updated = client.objects("Customer").update(1, {"status": "premium"})

Upsert

# Create if not exists, update if exists
customer = client.objects("Customer").upsert(1, {
    "name": "Acme Corp",
    "status": "active"
})

Delete

deleted = client.objects("Customer").delete(1)  # Returns: True

Batch Operations

result = client.objects("Customer").batch([
    {"op": "create", "data": {"customerId": 1, "name": "Customer 1"}},
    {"op": "update", "pk": 2, "data": {"status": "active"}},
    {"op": "delete", "pk": 3},
])
# Returns: {"results": [...], "succeeded": 2, "failed": 1}

Datasets — Raw Data Access

Read raw dataset data as DataFrames. Data transfer uses Arrow Flight (gRPC + columnar streaming) — column selection is pushed down to the server so only requested columns are read and transferred.

List Datasets

datasets = client.datasets.list()
for ds in datasets:
    print(f"{ds.name}: {ds.total_rows:,} rows ({ds.source_type})")

# Filter by source type
external = client.datasets.list(source_type="external")
uploads = client.datasets.list(source_type="file_upload")
pipeline = client.datasets.list(source_type="pipeline")

# Search by name
results = client.datasets.list(search="customer")

Get Dataset

# By name
dataset = client.datasets.get("customers")

# By ID
dataset = client.datasets.get("a1b2c3d4-...")

# Inspect metadata
print(dataset.name)             # "customers"
print(dataset.total_rows)       # 7800000
print(dataset.total_size_bytes) # 524288000
print(dataset.source_type)      # "external"
print(dataset.column_names)     # ["customer_id", "name", "email", ...]
print(dataset.columns)          # [{"name": "customer_id", "type": "string", ...}, ...]

Download as DataFrame

# Full dataset
df = dataset.to_pandas()

# Column selection (only the requested columns are read and transferred)
df = dataset.to_pandas(columns=["customer_id", "revenue", "region"])

# Polars
df = dataset.to_polars(columns=["customer_id", "revenue"])

# Raw PyArrow Table
table = dataset.to_arrow()

Stream Large Datasets

For datasets too large to fit in memory, use iter_batches():

for batch in dataset.iter_batches():
    # batch is a pyarrow.RecordBatch
    chunk_df = batch.to_pandas()
    process(chunk_df)

# With column selection
for batch in dataset.iter_batches(columns=["customer_id", "revenue"]):
    process(batch)

Schema Discovery

Entities

# List all accessible entities (compact summaries with counts)
entities = client.list_entities()
for e in entities:
    print(
        f"{e['apiName']}: {e['displayName']} "
        f"({e['propertyCount']} props, {e['linkCount']} links, pk={e['primaryKey']})"
    )

# Get full entity schema
schema = client.get_entity_schema("Customer")
print(f"Primary key: {schema['primaryKey']}")
for prop in schema["properties"]:
    print(f"  {prop['apiName']}: {prop['baseType']} (nullable: {prop['nullable']})")
for link in schema["links"]:
    print(
        f"  -> {link['apiName']} ({link['cardinality']}): "
        f"{link['targetEntity']}{link['description'] or ''}"
    )

Each link in the schema includes everything needed to write correct write-side code:

Field Meaning
apiName, displayName, pluralDisplayName, description Naming + semantics for this direction
targetEntity, cardinality, direction Where the link points + multiplicity
implementationType "foreign_key" or "junction" — how the link is realized
foreignKeyEntity, foreignKeyProperty Which entity holds the FK column and the property name (FK links only)
onSourceDelete, onTargetDelete "block" / "cascade" / "set_null" — what happens when a side is deleted

Links

# Compact view of every link in the ontology (both directions per link)
for link in client.list_links():
    print(
        f"{link['sourceEntity']} --{link['apiName']}--> "
        f"{link['targetEntity']} ({link['cardinality']}, {link['direction']})"
    )

# Full details for a single link by api_name (returns a list — an api_name
# can match more than one direction or — across the ontology — distinct
# link types that share a name).
matches = client.get_link_schema("orders")
for m in matches:
    print(f"{m['sourceEntity']}.{m['apiName']} -> {m['targetEntity']}")

Convenience methods

properties = client.get_properties("Customer")
pk = client.get_primary_key("Customer")
links = client.get_links("Customer")

Async Client

AsyncAgimusClient mirrors the sync API for everything that's genuinely non-blocking: object queries, schema discovery, link traversal, aggregation, and dataset metadata.

from agimus import AsyncAgimusClient

async with AsyncAgimusClient(api_key="agm_...") as client:
    # Objects
    customers = await client.objects("Customer").filter(status="active").all()
    customer = await client.objects("Customer").get(123)

    async for c in client.objects("Customer").filter(status="active"):
        print(c["name"])

    # Schema
    entities = await client.list_entities()
    schema = await client.get_entity_schema("Customer")
    all_links = await client.list_links()

    # Dataset metadata (list and inspect)
    for ds in await client.datasets.list():
        print(ds.name, ds.total_rows)

    meta = await client.datasets.get_metadata("customers")
    print(meta.column_names)

Why async datasets are metadata-only

AsyncAgimusClient.datasets exposes list() and get_metadata() only — no to_pandas, to_polars, to_arrow, or iter_batches. The underlying pyarrow.flight Python bindings are synchronous, so any "async download" would just be a thread-pool wrapper pretending to be async.

For data-transfer use the synchronous AgimusClient:

from agimus import AgimusClient

with AgimusClient(api_key="agm_...") as client:
    df = client.datasets.get("customers").to_pandas()

This split keeps the async surface honest and the sync surface fully Flight-backed.

Error Handling

All errors inherit from AgimusError:

from agimus import (
    AgimusError,          # Base class for all errors
    AuthenticationError,  # Invalid or missing API key (401)
    AccessDeniedError,    # Permission denied (403)
    NotFoundError,        # Entity or object not found (404)
    ValidationError,      # Invalid request data (400/422)
    RateLimitError,       # Rate limit exceeded (429)
    ServerError,          # Server error (5xx)
)

try:
    df = client.datasets.get("customers").to_pandas()
except NotFoundError as e:
    print(f"Not found: {e.message}")
except AuthenticationError:
    print("Invalid API key")
except ServerError as e:
    print(f"Server error ({e.status_code}): {e.message}")
except AgimusError as e:
    print(f"Error: {e.message}")

Utility Methods

# Health check
client.health()
# {"status": "healthy", "version": "..."}

# Current API key info
client.me()
# {"tenantName": "...", "scope": "read_write", ...}

Context Manager

with AgimusClient(api_key="agm_...") as client:
    customers = client.objects("Customer").all()
    df = client.datasets.get("orders").to_pandas()
# Connections automatically closed

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agimus-0.5.0.tar.gz (43.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agimus-0.5.0-py3-none-any.whl (42.3 kB view details)

Uploaded Python 3

File details

Details for the file agimus-0.5.0.tar.gz.

File metadata

  • Download URL: agimus-0.5.0.tar.gz
  • Upload date:
  • Size: 43.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for agimus-0.5.0.tar.gz
Algorithm Hash digest
SHA256 14bad03a919a6a1a2e25ca1a8809631f602b076d841b986d06cce366566dddf8
MD5 ab855e23a2b3b2574acb5f41c556ae04
BLAKE2b-256 0a2b9f373d264c4ed78723ba4c6219b0cec077474ec5bd32806b57861a2d27a8

See more details on using hashes here.

File details

Details for the file agimus-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: agimus-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 42.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for agimus-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 40c12fe30da7078682dbd63a86c1a09b992dd1586f5be0263067546f35e9360d
MD5 52b922251424182892e48a1a8efbc36d
BLAKE2b-256 fe547112d27b300353064164c505828e235960601983f433320f99288fcafac2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page