Skip to main content

Python SDK for the Agimus Platform

Project description

Agimus Python SDK

Official Python SDK for the Agimus Platform.

Installation

pip install agimus

Requirements: Python 3.9+

Quick Start

from agimus import AgimusClient

client = AgimusClient(api_key="agm_your_key_here")

# Query objects (ontology)
customers = client.objects("Customer").filter(status="active").all()

# Download raw datasets
df = client.datasets.get("customers").to_pandas()

Ontology — Objects API

The Agimus Object Store is built on an ontology that defines your data model. Each element has an apiName which is what you use in the SDK:

  • Entities — Data types (e.g., Customer, Order, Product)
  • Properties — Fields on entities (e.g., customerId, name, createdAt)
  • Links — Relationships between entities, with forward and reverse API names

Use client.list_entities() and client.get_entity_schema("EntityName") to discover available entities, properties, and links.

Primary Keys

Every entity has a primary key property defined in the ontology.

Valid PK types: string, integer, long, short, byte

# These are equivalent
client.objects("Customer").get(123)
client.objects("Customer").get("123")

Property Types

Primitives:

Type Python JSON Notes
string str "text"
integer int 123 32-bit signed
long int 123 64-bit signed
short int 123 16-bit signed
byte int 123 8-bit signed
float float 1.5 32-bit
double float 1.5 64-bit
decimal str or Decimal "123.45" Arbitrary precision
boolean bool true
date str "2024-01-15" ISO 8601 date
timestamp str "2024-01-15T10:30:00Z" ISO 8601 datetime
time str "10:30:00" ISO 8601 time
bytes str "base64..." Base64 encoded

Complex types (stored as JSON objects):

Type Description
struct Nested object with defined fields
geopoint {"lat": 40.7, "lng": -74.0}
geoshape GeoJSON geometry object
attachment File attachment metadata
media_reference Media file reference

Arrays: Properties can be arrays (e.g., tags: string[]). Pass as Python lists.

Nullable: Check nullable in the schema. Non-nullable fields are required on create.

Authentication

API keys are created in the Agimus dashboard under Settings > API Access. Keys use the format agm_<prefix>_<secret> and inherit permissions from their associated Service User.

client = AgimusClient(
    api_key="agm_...",
    base_url="https://api.agimus.ai",  # Optional: override base URL
    timeout=30.0,                       # Optional: request timeout (default: 30s)
)

Querying Objects

All queries start with client.objects("EntityName") and support method chaining.

Filtering

Use Django-style double-underscore syntax for operators:

# Equals (default)
.filter(status="active")

# Comparison
.filter(age__gt=18)        # greater than
.filter(age__gte=18)       # greater than or equal
.filter(age__lt=65)        # less than
.filter(age__lte=65)       # less than or equal
.filter(age__ne=0)         # not equal
.filter(age__between=[18, 65])  # between (inclusive)

# Lists
.filter(region__in=["US", "EU"])      # in list
.filter(status__nin=["deleted"])      # not in list

# Strings
.filter(name__like="Acme%")           # SQL LIKE (case-sensitive)
.filter(name__ilike="%acme%")         # SQL LIKE (case-insensitive)
.filter(name__starts_with="A")        # starts with
.filter(name__ends_with="Corp")       # ends with

# Null checks
.filter(deletedAt__is_null=True)      # is null
.filter(verifiedAt__is_not_null=True) # is not null

# Empty checks (arrays/strings)
.filter(tags__is_empty=True)          # is empty
.filter(tags__is_not_empty=True)      # is not empty

# Array operations
.filter(tags__contains="vip")         # array contains value
.filter(tags__overlaps=["a", "b"])    # arrays overlap

# Multiple filters (AND)
.filter(status="active", region="US")

Sorting

.sort("name")                    # ascending
.sort("-createdAt")              # descending (prefix with -)
.sort("-createdAt", "name")      # multiple fields

Alias: .order_by()

Field Selection

.fields("customerId", "name", "email").all()

Alias: .select()

Expanding Relations

Include related objects inline using link apiName:

.expand("orders").all()
.expand("orders", "orders.items").all()  # nested

# Or on single object fetch
.get(123, expand=["orders"])

Alias: .include()

Pagination

# Limit total results returned
.limit(100).all()

# Set page size for API calls (max 100, default 50)
.page_size(25).all()

# Auto-pagination with iteration
for customer in client.objects("Customer").filter(status="active"):
    print(customer["name"])

Executing Queries

.all()      # Get all results as list
.first()    # Get first result (or None)
.exists()   # Check if any results exist (bool)
.count()    # Get count of matching objects
.iter()     # Iterator with auto-pagination

Single Object Operations

# Get by primary key (raises NotFoundError if not found)
customer = client.objects("Customer").get(123)

# Get by primary key (returns None if not found)
customer = client.objects("Customer").get_or_none(123)

# Get multiple by primary keys (max 100)
result = client.objects("Customer").batch_get([1, 2, 3])
# Returns: {"data": [...], "found": 3, "requested": 3}

Distinct Values

Returns distinct values for a field, sorted by frequency (most common first).

regions = client.objects("Customer").distinct("region")
# Returns: ["US", "EU", "APAC", ...]

# With filter
regions = client.objects("Customer").filter(status="active").distinct("region")

# With counts (returns [{"value": v, "count": n}, ...] sorted by count DESC)
distribution = client.objects("Customer").distinct("region", with_counts=True)
# Returns: [{"value": "US", "count": 1245}, {"value": "EU", "count": 308}, ...]

Aggregation

result = client.objects("Order").filter(status="completed").aggregate(
    metrics=[
        {"op": "count", "alias": "orderCount"},
        {"op": "sum", "field": "total", "alias": "revenue"},
        {"op": "avg", "field": "total", "alias": "avgOrder"},
    ],
    group_by=[
        {"field": "region"},
        {"field": "createdAt", "granularity": "month"}
    ],
    sort=["-revenue"],
    limit=100
)

Operators: count, count_distinct, sum, avg, min, max, first, last

Time Granularities: year, quarter, month, week, day, hour

Link Traversal

Navigate relationships using the link's API name. Pagination is cursor-based (keyset on the target entity's primary key) — performance stays constant regardless of how deep you page.

# Customer -> orders (forward link)
orders = client.objects("Customer").links(123, "orders")
# Returns: {"data": [...], "cursor": "<opaque>" or None, "hasMore": False}

# Order -> customer (reverse link)
customer = client.objects("Order").links(456, "customer")

# Walk every page
cursor = None
while True:
    page = client.objects("Customer").links(123, "orders", page_size=50, cursor=cursor)
    for order in page["data"]:
        process(order)
    if not page["hasMore"]:
        break
    cursor = page["cursor"]

# Count related objects
count = client.objects("Customer").count_links(123, "orders")

Write Operations

Create

customer = client.objects("Customer").create({
    "customerId": 1,        # PK required
    "name": "Acme Corp",    # non-nullable fields required
    "email": "contact@acme.com",
    "status": "active"
})

Update

# Partial update - only specified fields change
updated = client.objects("Customer").update(1, {"status": "premium"})

Upsert

# Create if not exists, update if exists
customer = client.objects("Customer").upsert(1, {
    "name": "Acme Corp",
    "status": "active"
})

Delete

deleted = client.objects("Customer").delete(1)  # Returns: True

Batch Operations

result = client.objects("Customer").batch([
    {"op": "create", "data": {"customerId": 1, "name": "Customer 1"}},
    {"op": "update", "pk": 2, "data": {"status": "active"}},
    {"op": "delete", "pk": 3},
])
# Returns: {"results": [...], "succeeded": 2, "failed": 1}

Datasets — Raw Data Access

Read raw dataset data as DataFrames. Data transfer uses Arrow Flight (gRPC + columnar streaming) — column selection is pushed down to the server so only requested columns are read and transferred.

List Datasets

datasets = client.datasets.list()
for ds in datasets:
    print(f"{ds.name}: {ds.total_rows:,} rows ({ds.source_type})")

# Filter by source type
external = client.datasets.list(source_type="external")
uploads = client.datasets.list(source_type="file_upload")
pipeline = client.datasets.list(source_type="pipeline")

# Search by name
results = client.datasets.list(search="customer")

Get Dataset

# By name
dataset = client.datasets.get("customers")

# By ID
dataset = client.datasets.get("a1b2c3d4-...")

# Inspect metadata
print(dataset.name)             # "customers"
print(dataset.total_rows)       # 7800000
print(dataset.total_size_bytes) # 524288000
print(dataset.source_type)      # "external"
print(dataset.column_names)     # ["customer_id", "name", "email", ...]
print(dataset.columns)          # [{"name": "customer_id", "type": "string", ...}, ...]

Download as DataFrame

# Full dataset
df = dataset.to_pandas()

# Column selection (only the requested columns are read and transferred)
df = dataset.to_pandas(columns=["customer_id", "revenue", "region"])

# Polars
df = dataset.to_polars(columns=["customer_id", "revenue"])

# Raw PyArrow Table
table = dataset.to_arrow()

Stream Large Datasets

For datasets too large to fit in memory, use iter_batches():

for batch in dataset.iter_batches():
    # batch is a pyarrow.RecordBatch
    chunk_df = batch.to_pandas()
    process(chunk_df)

# With column selection
for batch in dataset.iter_batches(columns=["customer_id", "revenue"]):
    process(batch)

Schema Discovery

Entities

# List all accessible entities (compact summaries with counts)
entities = client.list_entities()
for e in entities:
    print(
        f"{e['apiName']}: {e['displayName']} "
        f"({e['propertyCount']} props, {e['linkCount']} links, pk={e['primaryKey']})"
    )

# Get full entity schema
schema = client.get_entity_schema("Customer")
print(f"Primary key: {schema['primaryKey']}")
for prop in schema["properties"]:
    print(f"  {prop['apiName']}: {prop['baseType']} (nullable: {prop['nullable']})")
for link in schema["links"]:
    print(
        f"  -> {link['apiName']} ({link['cardinality']}): "
        f"{link['targetEntity']}{link['description'] or ''}"
    )

Each link in the schema includes everything needed to write correct write-side code:

Field Meaning
apiName, displayName, pluralDisplayName, description Naming + semantics for this direction
targetEntity, cardinality, direction Where the link points + multiplicity
implementationType "foreign_key" or "junction" — how the link is realized
foreignKeyEntity, foreignKeyProperty Which entity holds the FK column and the property name (FK links only)
onSourceDelete, onTargetDelete "block" / "cascade" / "set_null" — what happens when a side is deleted

Links

# Compact view of every link in the ontology (both directions per link)
for link in client.list_links():
    print(
        f"{link['sourceEntity']} --{link['apiName']}--> "
        f"{link['targetEntity']} ({link['cardinality']}, {link['direction']})"
    )

# Full details for a single link by api_name (returns a list — an api_name
# can match more than one direction or — across the ontology — distinct
# link types that share a name).
matches = client.get_link_schema("orders")
for m in matches:
    print(f"{m['sourceEntity']}.{m['apiName']} -> {m['targetEntity']}")

Convenience methods

properties = client.get_properties("Customer")
pk = client.get_primary_key("Customer")
links = client.get_links("Customer")

Async Client

AsyncAgimusClient mirrors the sync API for everything that's genuinely non-blocking: object queries, schema discovery, link traversal, aggregation, and dataset metadata.

from agimus import AsyncAgimusClient

async with AsyncAgimusClient(api_key="agm_...") as client:
    # Objects
    customers = await client.objects("Customer").filter(status="active").all()
    customer = await client.objects("Customer").get(123)

    async for c in client.objects("Customer").filter(status="active"):
        print(c["name"])

    # Schema
    entities = await client.list_entities()
    schema = await client.get_entity_schema("Customer")
    all_links = await client.list_links()

    # Dataset metadata (list and inspect)
    for ds in await client.datasets.list():
        print(ds.name, ds.total_rows)

    meta = await client.datasets.get_metadata("customers")
    print(meta.column_names)

Why async datasets are metadata-only

AsyncAgimusClient.datasets exposes list() and get_metadata() only — no to_pandas, to_polars, to_arrow, or iter_batches. The underlying pyarrow.flight Python bindings are synchronous, so any "async download" would just be a thread-pool wrapper pretending to be async.

For data-transfer use the synchronous AgimusClient:

from agimus import AgimusClient

with AgimusClient(api_key="agm_...") as client:
    df = client.datasets.get("customers").to_pandas()

This split keeps the async surface honest and the sync surface fully Flight-backed.

Error Handling

All errors inherit from AgimusError:

from agimus import (
    AgimusError,          # Base class for all errors
    AuthenticationError,  # Invalid or missing API key (401)
    AccessDeniedError,    # Permission denied (403)
    NotFoundError,        # Entity or object not found (404)
    ValidationError,      # Invalid request data (400/422)
    RateLimitError,       # Rate limit exceeded (429)
    ServerError,          # Server error (5xx)
)

try:
    df = client.datasets.get("customers").to_pandas()
except NotFoundError as e:
    print(f"Not found: {e.message}")
except AuthenticationError:
    print("Invalid API key")
except ServerError as e:
    print(f"Server error ({e.status_code}): {e.message}")
except AgimusError as e:
    print(f"Error: {e.message}")

Utility Methods

# Health check
client.health()
# {"status": "healthy", "version": "..."}

# Current API key info
client.me()
# {"tenantName": "...", "scope": "read_write", ...}

Context Manager

with AgimusClient(api_key="agm_...") as client:
    customers = client.objects("Customer").all()
    df = client.datasets.get("orders").to_pandas()
# Connections automatically closed

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agimus-0.4.0.tar.gz (36.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agimus-0.4.0-py3-none-any.whl (35.3 kB view details)

Uploaded Python 3

File details

Details for the file agimus-0.4.0.tar.gz.

File metadata

  • Download URL: agimus-0.4.0.tar.gz
  • Upload date:
  • Size: 36.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for agimus-0.4.0.tar.gz
Algorithm Hash digest
SHA256 5d03775f8bff79e8c8c8e984c5bfd2898517311dec5abe0cf2c84a571cd8f898
MD5 a1f6c8aac3eb5491385e2d730d3b43e4
BLAKE2b-256 1ab8dcb084ba292f326f7985d8ed111df486fb09f833e8cff13d6ece78401bf6

See more details on using hashes here.

File details

Details for the file agimus-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: agimus-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 35.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.8

File hashes

Hashes for agimus-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 12eaf81d9661129dfbb2cbc1370de345a95a6657474bc840b878650f17a058ab
MD5 9945ae794b24b58fc2e3332956a616e6
BLAKE2b-256 0c555673cf0b30bfcb7cd94de5345e12f968d4110e5e8a5e304bfd8d25dd0ec2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page