Python SDK for the Agimus Platform
Project description
Agimus Python SDK
Official Python SDK for the Agimus Platform.
Installation
pip install agimus
Requirements: Python 3.9+
Quick Start
from agimus import AgimusClient
client = AgimusClient(api_key="agm_your_key_here")
# Query objects (ontology)
customers = client.objects("Customer").filter(status="active").all()
# Download raw datasets
df = client.datasets.get("customers").to_pandas()
Ontology — Objects API
The Agimus Object Store is built on an ontology that defines your data model. Each element has an apiName which is what you use in the SDK:
- Entities — Data types (e.g.,
Customer,Order,Product) - Properties — Fields on entities (e.g.,
customerId,name,createdAt) - Links — Relationships between entities, with forward and reverse API names
Use client.list_entities() and client.get_entity_schema("EntityName") to discover available entities, properties, and links.
Primary Keys
Every entity has a primary key property defined in the ontology.
Valid PK types: string, integer, long, short, byte
# These are equivalent
client.objects("Customer").get(123)
client.objects("Customer").get("123")
Property Types
Primitives:
| Type | Python | JSON | Notes |
|---|---|---|---|
string |
str |
"text" |
|
integer |
int |
123 |
32-bit signed |
long |
int |
123 |
64-bit signed |
short |
int |
123 |
16-bit signed |
byte |
int |
123 |
8-bit signed |
float |
float |
1.5 |
32-bit |
double |
float |
1.5 |
64-bit |
decimal |
str or Decimal |
"123.45" |
Arbitrary precision |
boolean |
bool |
true |
|
date |
str |
"2024-01-15" |
ISO 8601 date |
timestamp |
str |
"2024-01-15T10:30:00Z" |
ISO 8601 datetime |
time |
str |
"10:30:00" |
ISO 8601 time |
bytes |
str |
"base64..." |
Base64 encoded |
Complex types (stored as JSON objects):
| Type | Description |
|---|---|
struct |
Nested object with defined fields |
geopoint |
{"lat": 40.7, "lng": -74.0} |
geoshape |
GeoJSON geometry object |
attachment |
File attachment metadata |
media_reference |
Media file reference |
Arrays: Properties can be arrays (e.g., tags: string[]). Pass as Python lists.
Nullable: Check nullable in the schema. Non-nullable fields are required on create.
Authentication
API keys are created in the Agimus dashboard under Settings > API Access. Keys use the format agm_<prefix>_<secret> and inherit permissions from their associated Service User.
client = AgimusClient(
api_key="agm_...",
base_url="https://api.agimus.ai", # Optional: override base URL
timeout=30.0, # Optional: request timeout (default: 30s)
)
Querying Objects
All queries start with client.objects("EntityName") and support method chaining.
Filtering
Use Django-style double-underscore syntax for operators:
# Equals (default)
.filter(status="active")
# Comparison
.filter(age__gt=18) # greater than
.filter(age__gte=18) # greater than or equal
.filter(age__lt=65) # less than
.filter(age__lte=65) # less than or equal
.filter(age__ne=0) # not equal
.filter(age__between=[18, 65]) # between (inclusive)
# Lists
.filter(region__in=["US", "EU"]) # in list
.filter(status__nin=["deleted"]) # not in list
# Strings
.filter(name__like="Acme%") # SQL LIKE (case-sensitive)
.filter(name__ilike="%acme%") # SQL LIKE (case-insensitive)
.filter(name__starts_with="A") # starts with
.filter(name__ends_with="Corp") # ends with
# Null checks
.filter(deletedAt__is_null=True) # is null
.filter(verifiedAt__is_not_null=True) # is not null
# Empty checks (arrays/strings)
.filter(tags__is_empty=True) # is empty
.filter(tags__is_not_empty=True) # is not empty
# Array operations
.filter(tags__contains="vip") # array contains value
.filter(tags__overlaps=["a", "b"]) # arrays overlap
# Multiple filters (AND)
.filter(status="active", region="US")
Sorting
.sort("name") # ascending
.sort("-createdAt") # descending (prefix with -)
.sort("-createdAt", "name") # multiple fields
Alias: .order_by()
Field Selection
.fields("customerId", "name", "email").all()
Alias: .select()
Expanding Relations
Include related objects inline using link apiName:
.expand("orders").all()
.expand("orders", "orders.items").all() # nested
# Or on single object fetch
.get(123, expand=["orders"])
Alias: .include()
Pagination
# Limit total results returned
.limit(100).all()
# Set page size for API calls (max 100, default 50)
.page_size(25).all()
# Auto-pagination with iteration
for customer in client.objects("Customer").filter(status="active"):
print(customer["name"])
Executing Queries
.all() # Get all results as list
.first() # Get first result (or None)
.exists() # Check if any results exist (bool)
.count() # Get count of matching objects
.iter() # Iterator with auto-pagination
Single Object Operations
# Get by primary key (raises NotFoundError if not found)
customer = client.objects("Customer").get(123)
# Get by primary key (returns None if not found)
customer = client.objects("Customer").get_or_none(123)
# Get multiple by primary keys (max 100)
result = client.objects("Customer").batch_get([1, 2, 3])
# Returns: {"data": [...], "found": 3, "requested": 3}
Distinct Values
regions = client.objects("Customer").distinct("region")
# Returns: ["US", "EU", "APAC", ...]
# With filter
regions = client.objects("Customer").filter(status="active").distinct("region")
Aggregation
result = client.objects("Order").filter(status="completed").aggregate(
metrics=[
{"op": "count", "alias": "orderCount"},
{"op": "sum", "field": "total", "alias": "revenue"},
{"op": "avg", "field": "total", "alias": "avgOrder"},
],
group_by=[
{"field": "region"},
{"field": "createdAt", "granularity": "month"}
],
sort=["-revenue"],
limit=100
)
Operators: count, count_distinct, sum, avg, min, max, first, last
Time Granularities: year, quarter, month, week, day, hour
Link Traversal
Navigate relationships using the link's API name:
# Customer -> orders (forward link)
orders = client.objects("Customer").links(123, "orders")
# Returns: {"data": [...], "hasMore": False}
# Order -> customer (reverse link)
customer = client.objects("Order").links(456, "customer")
# With pagination
orders = client.objects("Customer").links(123, "orders", page_size=10, offset=0)
# Count related objects
count = client.objects("Customer").count_links(123, "orders")
Write Operations
Create
customer = client.objects("Customer").create({
"customerId": 1, # PK required
"name": "Acme Corp", # non-nullable fields required
"email": "contact@acme.com",
"status": "active"
})
Update
# Partial update - only specified fields change
updated = client.objects("Customer").update(1, {"status": "premium"})
Upsert
# Create if not exists, update if exists
customer = client.objects("Customer").upsert(1, {
"name": "Acme Corp",
"status": "active"
})
Delete
deleted = client.objects("Customer").delete(1) # Returns: True
Batch Operations
result = client.objects("Customer").batch([
{"op": "create", "data": {"customerId": 1, "name": "Customer 1"}},
{"op": "update", "pk": 2, "data": {"status": "active"}},
{"op": "delete", "pk": 3},
])
# Returns: {"results": [...], "succeeded": 2, "failed": 1}
Datasets — Raw Data Access
Read raw dataset data as DataFrames. Data transfer uses Arrow Flight (gRPC + columnar streaming) — column selection is pushed down to the server so only requested columns are read and transferred.
List Datasets
datasets = client.datasets.list()
for ds in datasets:
print(f"{ds.name}: {ds.total_rows:,} rows ({ds.source_type})")
# Filter by source type
external = client.datasets.list(source_type="external")
uploads = client.datasets.list(source_type="file_upload")
pipeline = client.datasets.list(source_type="pipeline")
# Search by name
results = client.datasets.list(search="customer")
Get Dataset
# By name
dataset = client.datasets.get("customers")
# By ID
dataset = client.datasets.get("a1b2c3d4-...")
# Inspect metadata
print(dataset.name) # "customers"
print(dataset.total_rows) # 7800000
print(dataset.total_size_bytes) # 524288000
print(dataset.source_type) # "external"
print(dataset.column_names) # ["customer_id", "name", "email", ...]
print(dataset.columns) # [{"name": "customer_id", "type": "string", ...}, ...]
Download as DataFrame
# Full dataset
df = dataset.to_pandas()
# Column selection (only the requested columns are read and transferred)
df = dataset.to_pandas(columns=["customer_id", "revenue", "region"])
# Polars
df = dataset.to_polars(columns=["customer_id", "revenue"])
# Raw PyArrow Table
table = dataset.to_arrow()
Stream Large Datasets
For datasets too large to fit in memory, use iter_batches():
for batch in dataset.iter_batches():
# batch is a pyarrow.RecordBatch
chunk_df = batch.to_pandas()
process(chunk_df)
# With column selection
for batch in dataset.iter_batches(columns=["customer_id", "revenue"]):
process(batch)
Schema Discovery
# List all accessible entities
entities = client.list_entities()
for e in entities:
print(f"{e['apiName']}: {e['displayName']}")
# Get full entity schema
schema = client.get_entity_schema("Customer")
print(f"Primary key: {schema['primaryKey']}")
for prop in schema["properties"]:
print(f" {prop['apiName']}: {prop['baseType']} (nullable: {prop['nullable']})")
for link in schema["links"]:
print(f" -> {link['apiName']}: {link['targetEntity']}")
# Individual schema methods
properties = client.get_properties("Customer")
pk = client.get_primary_key("Customer")
links = client.get_links("Customer")
Async Client
For async/await applications (object operations only):
from agimus import AsyncAgimusClient
async with AsyncAgimusClient(api_key="agm_...") as client:
customers = await client.objects("Customer").filter(status="active").all()
customer = await client.objects("Customer").get(123)
async for customer in client.objects("Customer").filter(status="active"):
print(customer["name"])
Error Handling
All errors inherit from AgimusError:
from agimus import (
AgimusError, # Base class for all errors
AuthenticationError, # Invalid or missing API key (401)
AccessDeniedError, # Permission denied (403)
NotFoundError, # Entity or object not found (404)
ValidationError, # Invalid request data (400/422)
RateLimitError, # Rate limit exceeded (429)
ServerError, # Server error (5xx)
)
try:
df = client.datasets.get("customers").to_pandas()
except NotFoundError as e:
print(f"Not found: {e.message}")
except AuthenticationError:
print("Invalid API key")
except ServerError as e:
print(f"Server error ({e.status_code}): {e.message}")
except AgimusError as e:
print(f"Error: {e.message}")
Utility Methods
# Health check
client.health()
# {"status": "healthy", "version": "..."}
# Current API key info
client.me()
# {"tenantName": "...", "scope": "read_write", ...}
Context Manager
with AgimusClient(api_key="agm_...") as client:
customers = client.objects("Customer").all()
df = client.datasets.get("orders").to_pandas()
# Connections automatically closed
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agimus-0.3.0.tar.gz.
File metadata
- Download URL: agimus-0.3.0.tar.gz
- Upload date:
- Size: 29.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d21fbbe586d7b4bd950ffaedd10030ce72b4d756280183b8d058cb2302ee6dd6
|
|
| MD5 |
b86e53fdc79e3db493790803e0ea6d84
|
|
| BLAKE2b-256 |
47575466746b313fb306d7fe9765bc8678f1a792cf26dc716d89f6c54a34f432
|
File details
Details for the file agimus-0.3.0-py3-none-any.whl.
File metadata
- Download URL: agimus-0.3.0-py3-none-any.whl
- Upload date:
- Size: 28.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
424ffb675b3df1319119fc967d2ef9212a928db3732eea78551d54e8bb2e6715
|
|
| MD5 |
2b8b6008daec7ebc630a2ab7472f2c54
|
|
| BLAKE2b-256 |
7a0173446e2ce587fb88e9a6a2778e497f9e7af774b0dbb2fdf59e986ddc421a
|