OData filter and projection helpers for AWS DynamoDB
Project description
dynamo-odata
DynamoDB-focused OData toolkit: build filters, projections, and query DynamoDB tables using OData expressions — with full async support and no eval().
Features:
- OData
$filterexpressions → boto3ConditionBase(no string eval, fully type-safe) - OData
$select→ProjectionExpressionwith reserved keyword handling - DynamoDB CRUD operations with sync and async (
aioboto3) parity - Single-table design helpers (
1#/0#active prefix, soft/hard delete) - 133 tests, lark-based parser, Python 3.10+
What this is: A focused DynamoDB library. Not an ORM, not a general SQL tool, not a full OData server.
What's NOT included (by design): SQL backends, table creation, schema migrations, Athena, SQLite.
Installation
# Core library (sync only)
pip install dynamo-odata
# With async support (aioboto3)
pip install dynamo-odata[async]
# Development
pip install dynamo-odata[dev]
Setup
AWS Credentials
dynamo-odata uses boto3, so configure AWS credentials as you normally would:
# Option 1: Environment variables
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_DEFAULT_REGION=us-west-2
# Option 2: AWS credentials file
~/.aws/credentials
~/.aws/config
# Option 3: IAM role (on EC2, Lambda, etc.)
# Automatic if running in AWS
Quick Test
Verify the installation works:
from dynamo_odata import build_filter
# Parse an OData filter expression
condition = build_filter("status eq 'active'")
print(condition) # Attr('status').eq('active')
Quickstart
Sync Operations
from dynamo_odata import DynamoDb, build_filter, build_projection
# Initialize the client
db = DynamoDb(table_name="users-table", region="us-west-2")
# Get a single item by partition and sort key
item = db.get(pk="user::tenant1", sk="1#user123", item_only=True)
# Query all items for a tenant, with OData filter
items = db.get_all(
pk="user::tenant1",
filter="status eq 'active' and age gt 18",
item_only=True
)
# Batch get multiple items
items = db.batch_get(pk="user::tenant1", sks=["user1", "user2", "user3"], item_only=True)
# Create or update an item
db.put(pk="user::tenant1", sk="1#user123", data={"name": "Ada", "status": "active"})
# Soft delete (moves to inactive prefix `0#`)
db.soft_delete(pk="user::tenant1", sk="1#user123")
# Hard delete (permanent)
db.hard_delete(pk="user::tenant1", sk="1#user123")
# Paginate through results
for page in db.scan_all_paginated(pk="user::tenant1", page_size=50):
print(f"Got {len(page)} items")
Async Operations
All methods have async equivalents. Use with asyncio or async frameworks like FastAPI:
from dynamo_odata import DynamoDb
db = DynamoDb(table_name="users-table", region="us-west-2")
# Async reads (with native aioboto3)
item = await db.get_async(pk="user::tenant1", sk="1#user123", item_only=True)
items = await db.get_all_async(pk="user::tenant1", filter="status eq 'active'")
items = await db.batch_get_async(pk="user::tenant1", sks=["user1", "user2"])
# Async writes
await db.put_async(pk="user::tenant1", sk="1#user123", data={"name": "Ada"})
await db.soft_delete_async(pk="user::tenant1", sk="1#user123")
# Async pagination
async for page in db.scan_all_paginated_async(pk="user::tenant1", page_size=50):
print(f"Got {len(page)} items")
Key Schema Configuration
The default key attribute names are lowercase pk and sk. For projects that use uppercase
attributes such as PK and SK, pass a schema preset when constructing the client:
from dynamo_odata import DynamoDb, UPPERCASE_KEY_SCHEMA
db = DynamoDb(
table_name="main-table",
region="us-east-1",
key_schema=UPPERCASE_KEY_SCHEMA,
)
item = db.get(pk="TENANT#tenant1", sk="1#USER#123", item_only=True)
You can also define a custom schema if your table uses different key names or separators:
from dynamo_odata import DynamoDb, KeySchema
custom_schema = KeySchema(
pk_name="PK",
sk_name="SK",
pk_separator="::",
sk_separator="#",
)
db = DynamoDb(table_name="custom-table", key_schema=custom_schema)
Guardrails For API Usage
For API-facing workloads, you can opt into partition-key validation and filter-policy validation so callers cannot query unexpected partitions or use unrestricted filters.
from dynamo_odata import DynamoDb, FilterPolicy, PartitionKeyGuard, UPPERCASE_KEY_SCHEMA
db = DynamoDb(
table_name="main-table",
key_schema=UPPERCASE_KEY_SCHEMA,
partition_key_guard=PartitionKeyGuard(("TENANT#",)),
filter_policy=FilterPolicy(
allowed_fields=frozenset({"status", "specialty", "lsis3"}),
allowed_comparators=frozenset({"eq", "ne", "gt", "ge", "lt", "le"}),
allowed_functions=frozenset({"contains", "startswith", "tolower"}),
max_predicates=4,
max_depth=6,
),
)
# Allowed
items = db.get_all("TENANT#tenant1", filter="status eq 'active'", item_only=True)
# Rejected before query execution
# db.get_all("DISEASE#123", filter="contains(notes, 'x')", item_only=True)
Regulated Environment Profile Helpers
For API layers that need repeatable controls, use the optional regulated profile helpers:
These helpers provide policy primitives only. PHI/PII identification and enforcement rules remain the responsibility of the consuming application.
from dynamo_odata import (
apply_response_allowlist,
apply_response_field_policy,
build_regulated_profile,
validate_regulated_query,
)
profile = build_regulated_profile(
partition_prefixes=("TENANT#",),
allowed_filter_fields=frozenset({"status", "name"}),
max_page_size=50,
)
normalized_limit = validate_regulated_query(
profile,
partition_key="TENANT#tenant1",
filter_text="status eq 'active'",
limit=25,
)
items = [
{"PK": "TENANT#tenant1", "SK": "1#USER#1", "name": "Ada", "status": "active"},
]
safe_items = apply_response_field_policy(items, profile.forbidden_response_fields)
public_items = apply_response_allowlist(safe_items, frozenset({"name", "status"}))
Building Filters and Projections
Use build_filter() and build_projection() as standalone utilities (no database connection needed):
from dynamo_odata import build_filter, build_projection
# Parse OData filter into boto3 ConditionBase
condition = build_filter("status eq 'active' and age gt 18")
# Returns: Attr('status').eq('active') & Attr('age').gt(18)
# Build projection expression (field list)
# All fields are aliased because many common names are DynamoDB reserved keywords
projection_expr, attr_names = build_projection(["id", "name", "status"])
# Returns: ("#id,#name,#status", {"#id": "id", "#name": "name", "#status": "status"})
Filter Expressions (OData)
Supported Operators
Comparison:
build_filter("name eq 'John'") # equals
build_filter("age ne 30") # not equals
build_filter("price lt 100") # less than
build_filter("price le 100") # less than or equal
build_filter("score gt 50") # greater than
build_filter("score ge 50") # greater than or equal
Logical:
build_filter("status eq 'active' and age gt 18") # AND
build_filter("role eq 'admin' or role eq 'mod'") # OR
build_filter("not deleted eq true") # NOT
Membership:
build_filter("status in ('active', 'pending', 'review')") # IN list
build_filter("age between 18 and 65") # BETWEEN
String Functions:
build_filter("email contains '@example.com'") # substring match
build_filter("email startswith 'admin'") # prefix match
Special:
build_filter("last_seen exists") # attribute exists
build_filter("deleted not_exists") # attribute missing
build_filter("status eq null") # null checks (special handling in DynamoDB)
Unsupported (by design)
These are not supported in DynamoDB OData queries:
endswith,concat,indexof,length,substring,toupper,trim- datetime helpers:
year,month,day,hour,minute,date,now - math helpers:
round,floor,ceiling
Attempting to use unsupported functions raises UnsupportedFunctionException.
Common Patterns
Multi-tenant queries (single-table design):
# Query all active users in a tenant
db.get_all(
pk="user::tenant123",
filter="status eq 'active'",
item_only=True
)
Combining filters:
# Complex filter expression
db.get_all(
pk="user::tenant1",
filter="(status eq 'active' or status eq 'trial') and age gt 18 and premium eq true",
item_only=True
)
Projecting specific fields:
# Return only certain fields
projection_expr, attr_names = build_projection(["id", "email", "name", "created_at"])
items = db.get_all(
pk="user::tenant1",
projection_expression=projection_expr,
expression_attribute_names=attr_names,
item_only=True
)
Single-Table Pattern
dynamo-odata supports the common single-table DynamoDB design with prefixed sort keys for managing record status.
Active/Inactive Records
By convention, records use a 1# prefix for active records and 0# for inactive (soft-deleted):
# Create/put an item (automatically gets 1# prefix)
db.put(pk="user::tenant1", sk="user123", data={"email": "alice@example.com"})
# Stored as: pk="user::tenant1", sk="1#user123"
# Query only active records (default behavior)
items = db.get_all(pk="user::tenant1", item_only=True)
# Only returns records with sk starting with "1#"
# Soft delete (moves record to inactive)
db.soft_delete(pk="user::tenant1", sk="1#user123")
# Record now: pk="user::tenant1", sk="0#user123"
# Query both active and inactive
items = db.get_all(pk="user::tenant1", include_inactive=True, item_only=True)
Hard Delete vs Soft Delete
| Operation | Effect | Query Impact | Recovery |
|---|---|---|---|
soft_delete() |
Moves 1# → 0# prefix |
Item still in table, excluded from default queries | Can restore by moving back to 1# |
hard_delete() |
Removes item entirely | Item permanently gone | Not recoverable |
When to use each:
- Soft delete: User deletions, content removal, audit trails
- Hard delete: GDPR compliance, purging test data, final cleanup
Querying Soft-Deleted Items
# By default, get_all excludes soft-deleted items
items = db.get_all(pk="user::tenant1") # Only `1#` records
# Include soft-deleted items explicitly
all_items = db.get_all(pk="user::tenant1", include_inactive=True)
# Query only soft-deleted items
deleted_items = db.get_all(
pk="user::tenant1",
filter="sk_begins_with('0#')" # Low-level filter if needed
)
API Reference
DynamoDb Client
Initialization:
db = DynamoDb(
table_name="users", # Required
region="us-west-2", # Optional, defaults to us-west-2
pk_separator="::", # Optional, default partition key separator
sk_separator="#", # Optional, default sort key status separator
)
Methods (Sync/Async pairs):
| Method | Args | Returns | Notes |
|---|---|---|---|
get / get_async |
pk, sk, [item_only] |
dict or Item | Single item lookup |
get_all / get_all_async |
pk, [filter, select, item_only, include_inactive] |
list[dict] | Query with filter |
batch_get / batch_get_async |
pk, sks, [item_only] |
list[dict] | Multiple items, auto-chunked |
put / put_async |
pk, sk, data |
None | Create or update |
delete / delete_async |
pk, sk |
None | Hard delete |
soft_delete / soft_delete_async |
pk, sk |
None | Soft delete (prefix move) |
hard_delete / hard_delete_async |
pk, sk |
None | Permanent delete |
scan_all_paginated / scan_all_paginated_async |
[pk, filter, page_size] |
Iterator[list[dict]] | Paginated scan |
Utility Functions:
| Function | Args | Returns | Notes |
|---|---|---|---|
build_filter(expr) |
OData filter string | ConditionBase | Parse filter expression |
build_projection(fields) |
list[str] | (expr, attr_names_dict) | Build projection + name map |
Key Helpers (DynamoDb methods):
| Method | Args | Returns | Notes |
|---|---|---|---|
build_pk |
*parts |
str | Joins key parts with pk_separator |
build_active_sk |
value |
str | Ensures active SK prefix (1# by default) |
build_inactive_sk |
value |
str | Ensures inactive SK prefix (0# by default) |
is_active_sk |
value |
bool | Checks active prefix |
is_inactive_sk |
value |
bool | Checks inactive prefix |
Error Handling
Common exceptions you may encounter:
from dynamo_odata import DynamoDb
from botocore.exceptions import ClientError
db = DynamoDb(table_name="users")
try:
item = db.get(pk="user::t1", sk="1#user1")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print("Table does not exist")
else:
print(f"DynamoDB error: {e}")
For filter parsing errors:
from dynamo_odata import build_filter
from dynamo_odata.odata_query.exceptions import InvalidQueryException
try:
condition = build_filter("invalid filter syntax @@")
except InvalidQueryException as e:
print(f"Filter syntax error: {e}")
Sync vs Async: When to Use Each
Use sync if:
- Running in a synchronous context (Flask, Django, scripts)
- You need simpler code and don't mind blocking I/O
- Testing or scripting
Use async if:
- Running in an async framework (FastAPI, asyncio)
- You need to handle many concurrent requests
- Integrating with other async libraries
Performance note: Async has minimal overhead but shines when combined with other async operations. For single isolated queries, sync and async have similar latency.
Repository layout
plan/— implementation plans and roadmapsrc/dynamo_odata/— library source codedb.py— DynamoDb client classdynamo_filter.py— OData filter buildingprojection.py— projection expression buildingodata_query/— vendored OData parser and AST
tests/— automated test suite (133 passing tests)
Development
Running Tests
# Install dev dependencies
pip install -e ".[dev]"
# Run all tests
pytest tests/
# Run with coverage
pytest tests/ --cov=src/dynamo_odata
# Run specific test file
pytest tests/test_filter.py -v
Project Status
| Phase | Status | Next |
|---|---|---|
| Core library | ✅ Complete | Phase 4: Upstream contribution |
| Parser (lark) | ✅ Complete | — |
| Async support | ✅ Complete | — |
| Test coverage | ✅ 133 tests | — |
| FastAPI layer | 📅 v1.1 | ODataService, ODataRouter |
| PyPI publish | 📅 Soon | Need upstream contribution first |
License
MIT. See LICENSE for details.
Attribution
This package includes a vendored and modified version of the OData AST, visitor, and grammar from odata-query by Gorillini NV, used under the MIT License. The DynamoDB backend is original work.
What's Next?
Planned for upcoming releases:
- v1.1: FastAPI integration layer (ODataService, ODataRouter, Pydantic models)
- v1.2:
$expandsupport with dotted$select(N+1 optimization) - Contribute DynamoDB visitor back to upstream
odata-queryproject - Integration with
consumer_sdkpackage
See plan/DYNAMO_ODATA_STANDALONE_PLAN.md for detailed implementation roadmap.
Contributing
Contributions welcome! Please:
- Fork the repository
- Create a feature branch
- Write tests for new functionality
- Run
pytestand ensure all tests pass - Open a pull request with a clear description
See CONTRIBUTING.md for more details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dynamo_odata-0.2.0.tar.gz.
File metadata
- Download URL: dynamo_odata-0.2.0.tar.gz
- Upload date:
- Size: 45.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8bdd1f71081b1df4603de092694c3fd0d8464a7809528b881407ceb7ef99de1
|
|
| MD5 |
bd35f3e7e6b4146ecc33bbb4a14ba68e
|
|
| BLAKE2b-256 |
84654137fa3d2b8139e3873672257c0ebe626481384dcb4de63bfc2d026da624
|
File details
Details for the file dynamo_odata-0.2.0-py3-none-any.whl.
File metadata
- Download URL: dynamo_odata-0.2.0-py3-none-any.whl
- Upload date:
- Size: 34.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
45464dffff37e76a7f0e32e6ea9ca73fa715fada1650bacfca9134b0636e7ad6
|
|
| MD5 |
13103c26a64bf986bf4f001a9edfc5c9
|
|
| BLAKE2b-256 |
a6baf2adc4885a7f069e5aa1d2f84a5ea7af8f12c0947b59566908f1892c3dd2
|