Python SDK for the Agimus Platform
Project description
Agimus Python SDK
Official Python SDK for the Agimus Platform.
Installation
pip install agimus
Requirements: Python 3.9+
Quick Start
from agimus import AgimusClient
client = AgimusClient(api_key="agm_your_key_here")
# Query objects (ontology)
customers = client.objects("Customer").filter(status="active").all()
# Download raw datasets
df = client.datasets.get("customers").to_pandas()
Ontology — Objects API
The Agimus Object Store is built on an ontology that defines your data model. Each element has an apiName which is what you use in the SDK:
- Entities — Data types (e.g.,
Customer,Order,Product) - Properties — Fields on entities (e.g.,
customerId,name,createdAt) - Links — Relationships between entities, with forward and reverse API names
Use client.list_entities() and client.get_entity_schema("EntityName") to discover available entities, properties, and links.
Primary Keys
Every entity has a primary key property defined in the ontology.
Valid PK types: string, integer, long, short, byte
# These are equivalent
client.objects("Customer").get(123)
client.objects("Customer").get("123")
Property Types
Primitives:
| Type | Python | JSON | Notes |
|---|---|---|---|
string |
str |
"text" |
|
integer |
int |
123 |
32-bit signed |
long |
int |
123 |
64-bit signed |
short |
int |
123 |
16-bit signed |
byte |
int |
123 |
8-bit signed |
float |
float |
1.5 |
32-bit |
double |
float |
1.5 |
64-bit |
decimal |
str or Decimal |
"123.45" |
Arbitrary precision |
boolean |
bool |
true |
|
date |
str |
"2024-01-15" |
ISO 8601 date |
timestamp |
str |
"2024-01-15T10:30:00Z" |
ISO 8601 datetime |
time |
str |
"10:30:00" |
ISO 8601 time |
bytes |
str |
"base64..." |
Base64 encoded |
Complex types (stored as JSON objects):
| Type | Description |
|---|---|
struct |
Nested object with defined fields |
geopoint |
{"lat": 40.7, "lng": -74.0} |
geoshape |
GeoJSON geometry object |
attachment |
File attachment metadata |
media_reference |
Media file reference |
Arrays: Properties can be arrays (e.g., tags: string[]). Pass as Python lists.
Nullable: Check nullable in the schema. Non-nullable fields are required on create.
Authentication
API keys are created in the Agimus dashboard under Settings > API Access. Keys use the format agm_<prefix>_<secret> and inherit permissions from their associated Service User.
client = AgimusClient(
api_key="agm_...",
base_url="https://api.agimus.ai", # Optional: override base URL
timeout=30.0, # Optional: request timeout (default: 30s)
)
Querying Objects
All queries start with client.objects("EntityName") and support method chaining.
Filtering
Use Django-style double-underscore syntax for operators:
# Equals (default)
.filter(status="active")
# Comparison
.filter(age__gt=18) # greater than
.filter(age__gte=18) # greater than or equal
.filter(age__lt=65) # less than
.filter(age__lte=65) # less than or equal
.filter(age__ne=0) # not equal
.filter(age__between=[18, 65]) # between (inclusive)
# Lists
.filter(region__in=["US", "EU"]) # in list
.filter(status__nin=["deleted"]) # not in list
# Strings
.filter(name__like="Acme%") # SQL LIKE (case-sensitive)
.filter(name__ilike="%acme%") # SQL LIKE (case-insensitive)
.filter(name__starts_with="A") # starts with
.filter(name__ends_with="Corp") # ends with
# Null checks
.filter(deletedAt__is_null=True) # is null
.filter(verifiedAt__is_not_null=True) # is not null
# Empty checks (arrays/strings)
.filter(tags__is_empty=True) # is empty
.filter(tags__is_not_empty=True) # is not empty
# Array operations
.filter(tags__contains="vip") # array contains value
.filter(tags__overlaps=["a", "b"]) # arrays overlap
# Multiple filters (AND)
.filter(status="active", region="US")
Sorting
.sort("name") # ascending
.sort("-createdAt") # descending (prefix with -)
.sort("-createdAt", "name") # multiple fields
Alias: .order_by()
Field Selection
.fields("customerId", "name", "email").all()
Alias: .select()
Expanding Relations
Include related objects inline using link apiName:
.expand("orders").all()
.expand("orders", "orders.items").all() # nested
# Or on single object fetch
.get(123, expand=["orders"])
Alias: .include()
Pagination
# Limit total results returned
.limit(100).all()
# Set page size for API calls (max 100, default 50)
.page_size(25).all()
# Auto-pagination with iteration
for customer in client.objects("Customer").filter(status="active"):
print(customer["name"])
Executing Queries
.all() # Get all results as list
.first() # Get first result (or None)
.exists() # Check if any results exist (bool)
.count() # Get count of matching objects
.iter() # Iterator with auto-pagination
Single Object Operations
# Get by primary key (raises NotFoundError if not found)
customer = client.objects("Customer").get(123)
# Get by primary key (returns None if not found)
customer = client.objects("Customer").get_or_none(123)
# Get multiple by primary keys (max 100)
result = client.objects("Customer").batch_get([1, 2, 3])
# Returns: {"data": [...], "found": 3, "requested": 3}
Distinct Values
Returns distinct values for a field, sorted by frequency (most common first).
regions = client.objects("Customer").distinct("region")
# Returns: ["US", "EU", "APAC", ...]
# With filter
regions = client.objects("Customer").filter(status="active").distinct("region")
# With counts (returns [{"value": v, "count": n}, ...] sorted by count DESC)
distribution = client.objects("Customer").distinct("region", with_counts=True)
# Returns: [{"value": "US", "count": 1245}, {"value": "EU", "count": 308}, ...]
Aggregation
result = client.objects("Order").filter(status="completed").aggregate(
metrics=[
{"op": "count", "alias": "orderCount"},
{"op": "sum", "field": "total", "alias": "revenue"},
{"op": "avg", "field": "total", "alias": "avgOrder"},
],
group_by=[
{"field": "region"},
{"field": "createdAt", "granularity": "month"}
],
sort=["-revenue"],
limit=100
)
Operators: count, count_distinct, sum, avg, min, max, first, last
Time Granularities: year, quarter, month, week, day, hour
Link Traversal
Navigate relationships using the link's API name. Pagination is cursor-based (keyset on the target entity's primary key) — performance stays constant regardless of how deep you page.
# Customer -> orders (forward link)
orders = client.objects("Customer").links(123, "orders")
# Returns: {"data": [...], "cursor": "<opaque>" or None, "hasMore": False}
# Order -> customer (reverse link)
customer = client.objects("Order").links(456, "customer")
# Walk every page
cursor = None
while True:
page = client.objects("Customer").links(123, "orders", page_size=50, cursor=cursor)
for order in page["data"]:
process(order)
if not page["hasMore"]:
break
cursor = page["cursor"]
# Count related objects
count = client.objects("Customer").count_links(123, "orders")
Write Operations
Create
customer = client.objects("Customer").create({
"customerId": 1, # PK required
"name": "Acme Corp", # non-nullable fields required
"email": "contact@acme.com",
"status": "active"
})
Update
# Partial update - only specified fields change
updated = client.objects("Customer").update(1, {"status": "premium"})
Upsert
# Create if not exists, update if exists
customer = client.objects("Customer").upsert(1, {
"name": "Acme Corp",
"status": "active"
})
Delete
deleted = client.objects("Customer").delete(1) # Returns: True
Batch Operations
result = client.objects("Customer").batch([
{"op": "create", "data": {"customerId": 1, "name": "Customer 1"}},
{"op": "update", "pk": 2, "data": {"status": "active"}},
{"op": "delete", "pk": 3},
])
# Returns: {"results": [...], "succeeded": 2, "failed": 1}
Datasets — Raw Data Access
Read raw dataset data as DataFrames. Data transfer uses Arrow Flight (gRPC + columnar streaming) — column selection is pushed down to the server so only requested columns are read and transferred.
List Datasets
datasets = client.datasets.list()
for ds in datasets:
print(f"{ds.name}: {ds.total_rows:,} rows ({ds.source_type})")
# Filter by source type
external = client.datasets.list(source_type="external")
uploads = client.datasets.list(source_type="file_upload")
pipeline = client.datasets.list(source_type="pipeline")
# Search by name
results = client.datasets.list(search="customer")
Get Dataset
# By name
dataset = client.datasets.get("customers")
# By ID
dataset = client.datasets.get("a1b2c3d4-...")
# Inspect metadata
print(dataset.name) # "customers"
print(dataset.total_rows) # 7800000
print(dataset.total_size_bytes) # 524288000
print(dataset.source_type) # "external"
print(dataset.column_names) # ["customer_id", "name", "email", ...]
print(dataset.columns) # [{"name": "customer_id", "type": "string", ...}, ...]
Download as DataFrame
# Full dataset
df = dataset.to_pandas()
# Column selection (only the requested columns are read and transferred)
df = dataset.to_pandas(columns=["customer_id", "revenue", "region"])
# Polars
df = dataset.to_polars(columns=["customer_id", "revenue"])
# Raw PyArrow Table
table = dataset.to_arrow()
Stream Large Datasets
For datasets too large to fit in memory, use iter_batches():
for batch in dataset.iter_batches():
# batch is a pyarrow.RecordBatch
chunk_df = batch.to_pandas()
process(chunk_df)
# With column selection
for batch in dataset.iter_batches(columns=["customer_id", "revenue"]):
process(batch)
Schema Discovery
Entities
# List all accessible entities (compact summaries with counts)
entities = client.list_entities()
for e in entities:
print(
f"{e['apiName']}: {e['displayName']} "
f"({e['propertyCount']} props, {e['linkCount']} links, pk={e['primaryKey']})"
)
# Get full entity schema
schema = client.get_entity_schema("Customer")
print(f"Primary key: {schema['primaryKey']}")
for prop in schema["properties"]:
print(f" {prop['apiName']}: {prop['baseType']} (nullable: {prop['nullable']})")
for link in schema["links"]:
print(
f" -> {link['apiName']} ({link['cardinality']}): "
f"{link['targetEntity']} — {link['description'] or ''}"
)
Each link in the schema includes everything needed to write correct write-side code:
| Field | Meaning |
|---|---|
apiName, displayName, pluralDisplayName, description |
Naming + semantics for this direction |
targetEntity, cardinality, direction |
Where the link points + multiplicity |
implementationType |
"foreign_key" or "junction" — how the link is realized |
foreignKeyEntity, foreignKeyProperty |
Which entity holds the FK column and the property name (FK links only) |
onSourceDelete, onTargetDelete |
"block" / "cascade" / "set_null" — what happens when a side is deleted |
Links
# Compact view of every link in the ontology (both directions per link)
for link in client.list_links():
print(
f"{link['sourceEntity']} --{link['apiName']}--> "
f"{link['targetEntity']} ({link['cardinality']}, {link['direction']})"
)
# Full details for a single link by api_name (returns a list — an api_name
# can match more than one direction or — across the ontology — distinct
# link types that share a name).
matches = client.get_link_schema("orders")
for m in matches:
print(f"{m['sourceEntity']}.{m['apiName']} -> {m['targetEntity']}")
Convenience methods
properties = client.get_properties("Customer")
pk = client.get_primary_key("Customer")
links = client.get_links("Customer")
Async Client
AsyncAgimusClient mirrors the sync API for everything that's genuinely
non-blocking: object queries, schema discovery, link traversal, aggregation,
and dataset metadata.
from agimus import AsyncAgimusClient
async with AsyncAgimusClient(api_key="agm_...") as client:
# Objects
customers = await client.objects("Customer").filter(status="active").all()
customer = await client.objects("Customer").get(123)
async for c in client.objects("Customer").filter(status="active"):
print(c["name"])
# Schema
entities = await client.list_entities()
schema = await client.get_entity_schema("Customer")
all_links = await client.list_links()
# Dataset metadata (list and inspect)
for ds in await client.datasets.list():
print(ds.name, ds.total_rows)
meta = await client.datasets.get_metadata("customers")
print(meta.column_names)
Why async datasets are metadata-only
AsyncAgimusClient.datasets exposes list() and get_metadata() only —
no to_pandas, to_polars, to_arrow, or iter_batches. The underlying
pyarrow.flight Python bindings are synchronous, so any "async download"
would just be a thread-pool wrapper pretending to be async.
For data-transfer use the synchronous AgimusClient:
from agimus import AgimusClient
with AgimusClient(api_key="agm_...") as client:
df = client.datasets.get("customers").to_pandas()
This split keeps the async surface honest and the sync surface fully Flight-backed.
Error Handling
All errors inherit from AgimusError:
from agimus import (
AgimusError, # Base class for all errors
AuthenticationError, # Invalid or missing API key (401)
AccessDeniedError, # Permission denied (403)
NotFoundError, # Entity or object not found (404)
ValidationError, # Invalid request data (400/422)
RateLimitError, # Rate limit exceeded (429)
ServerError, # Server error (5xx)
)
try:
df = client.datasets.get("customers").to_pandas()
except NotFoundError as e:
print(f"Not found: {e.message}")
except AuthenticationError:
print("Invalid API key")
except ServerError as e:
print(f"Server error ({e.status_code}): {e.message}")
except AgimusError as e:
print(f"Error: {e.message}")
Utility Methods
# Health check
client.health()
# {"status": "healthy", "version": "..."}
# Current API key info
client.me()
# {"tenantName": "...", "scope": "read_write", ...}
Context Manager
with AgimusClient(api_key="agm_...") as client:
customers = client.objects("Customer").all()
df = client.datasets.get("orders").to_pandas()
# Connections automatically closed
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agimus-0.4.0.tar.gz.
File metadata
- Download URL: agimus-0.4.0.tar.gz
- Upload date:
- Size: 36.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d03775f8bff79e8c8c8e984c5bfd2898517311dec5abe0cf2c84a571cd8f898
|
|
| MD5 |
a1f6c8aac3eb5491385e2d730d3b43e4
|
|
| BLAKE2b-256 |
1ab8dcb084ba292f326f7985d8ed111df486fb09f833e8cff13d6ece78401bf6
|
File details
Details for the file agimus-0.4.0-py3-none-any.whl.
File metadata
- Download URL: agimus-0.4.0-py3-none-any.whl
- Upload date:
- Size: 35.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12eaf81d9661129dfbb2cbc1370de345a95a6657474bc840b878650f17a058ab
|
|
| MD5 |
9945ae794b24b58fc2e3332956a616e6
|
|
| BLAKE2b-256 |
0c555673cf0b30bfcb7cd94de5345e12f968d4110e5e8a5e304bfd8d25dd0ec2
|