Skip to main content

OData filter and projection helpers for AWS DynamoDB

Project description

dynamo-odata

DynamoDB-focused OData toolkit: build filters, projections, and query DynamoDB tables using OData expressions — with full async support and no eval().

Features:

  • OData $filter expressions → boto3 ConditionBase (no string eval, fully type-safe)
  • OData $selectProjectionExpression with reserved keyword handling
  • DynamoDB CRUD operations with sync and async (aioboto3) parity
  • Single-table design helpers (1#/0# active prefix, soft/hard delete)
  • 133 tests, lark-based parser, Python 3.10+

What this is: A focused DynamoDB library. Not an ORM, not a general SQL tool, not a full OData server.

What's NOT included (by design): SQL backends, table creation, schema migrations, Athena, SQLite.


Installation

# Core library (sync only)
pip install dynamo-odata

# With async support (aioboto3)
pip install dynamo-odata[async]

# Development
pip install dynamo-odata[dev]

Setup

AWS Credentials

dynamo-odata uses boto3, so configure AWS credentials as you normally would:

# Option 1: Environment variables
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_DEFAULT_REGION=us-west-2

# Option 2: AWS credentials file
~/.aws/credentials
~/.aws/config

# Option 3: IAM role (on EC2, Lambda, etc.)
# Automatic if running in AWS

Quick Test

Verify the installation works:

from dynamo_odata import build_filter

# Parse an OData filter expression
condition = build_filter("status eq 'active'")
print(condition)  # Attr('status').eq('active')

Quickstart

Sync Operations

from dynamo_odata import DynamoDb, build_filter, build_projection

# Initialize the client
db = DynamoDb(table_name="users-table", region="us-west-2")

# Get a single item by partition and sort key
item = db.get(pk="user::tenant1", sk="1#user123", item_only=True)

# Query all items for a tenant, with OData filter
items = db.get_all(
    pk="user::tenant1",
    filter="status eq 'active' and age gt 18",
    item_only=True
)

# Batch get multiple items
items = db.batch_get(pk="user::tenant1", sks=["user1", "user2", "user3"], item_only=True)

# Create or update an item
db.put(pk="user::tenant1", sk="1#user123", data={"name": "Ada", "status": "active"})

# Soft delete (moves to inactive prefix `0#`)
db.soft_delete(pk="user::tenant1", sk="1#user123")

# Hard delete (permanent)
db.hard_delete(pk="user::tenant1", sk="1#user123")

# Paginate through results
for page in db.scan_all_paginated(pk="user::tenant1", page_size=50):
    print(f"Got {len(page)} items")

Async Operations

All methods have async equivalents. Use with asyncio or async frameworks like FastAPI:

from dynamo_odata import DynamoDb

db = DynamoDb(table_name="users-table", region="us-west-2")

# Async reads (with native aioboto3)
item = await db.get_async(pk="user::tenant1", sk="1#user123", item_only=True)
items = await db.get_all_async(pk="user::tenant1", filter="status eq 'active'")
items = await db.batch_get_async(pk="user::tenant1", sks=["user1", "user2"])

# Async writes
await db.put_async(pk="user::tenant1", sk="1#user123", data={"name": "Ada"})
await db.soft_delete_async(pk="user::tenant1", sk="1#user123")

# Async pagination
async for page in db.scan_all_paginated_async(pk="user::tenant1", page_size=50):
    print(f"Got {len(page)} items")

Key Schema Configuration

The default key attribute names are lowercase pk and sk. For projects that use uppercase attributes such as PK and SK, pass a schema preset when constructing the client:

from dynamo_odata import DynamoDb, UPPERCASE_KEY_SCHEMA

db = DynamoDb(
    table_name="main-table",
    region="us-east-1",
    key_schema=UPPERCASE_KEY_SCHEMA,
)

item = db.get(pk="TENANT#tenant1", sk="1#USER#123", item_only=True)

You can also define a custom schema if your table uses different key names or separators:

from dynamo_odata import DynamoDb, KeySchema

custom_schema = KeySchema(
    pk_name="PK",
    sk_name="SK",
    pk_separator="::",
    sk_separator="#",
)

db = DynamoDb(table_name="custom-table", key_schema=custom_schema)

Guardrails For API Usage

For API-facing workloads, you can opt into partition-key validation and filter-policy validation so callers cannot query unexpected partitions or use unrestricted filters.

from dynamo_odata import DynamoDb, FilterPolicy, PartitionKeyGuard, UPPERCASE_KEY_SCHEMA

db = DynamoDb(
    table_name="main-table",
    key_schema=UPPERCASE_KEY_SCHEMA,
    partition_key_guard=PartitionKeyGuard(("TENANT#",)),
    filter_policy=FilterPolicy(
        allowed_fields=frozenset({"status", "specialty", "lsis3"}),
        allowed_comparators=frozenset({"eq", "ne", "gt", "ge", "lt", "le"}),
        allowed_functions=frozenset({"contains", "startswith", "tolower"}),
        max_predicates=4,
        max_depth=6,
    ),
)

# Allowed
items = db.get_all("TENANT#tenant1", filter="status eq 'active'", item_only=True)

# Rejected before query execution
# db.get_all("DISEASE#123", filter="contains(notes, 'x')", item_only=True)

Regulated Environment Profile Helpers

For API layers that need repeatable controls, use the optional regulated profile helpers:

These helpers provide policy primitives only. PHI/PII identification and enforcement rules remain the responsibility of the consuming application.

from dynamo_odata import (
    apply_response_allowlist,
    apply_response_field_policy,
    build_regulated_profile,
    validate_regulated_query,
)

profile = build_regulated_profile(
    partition_prefixes=("TENANT#",),
    allowed_filter_fields=frozenset({"status", "name"}),
    max_page_size=50,
)

normalized_limit = validate_regulated_query(
    profile,
    partition_key="TENANT#tenant1",
    filter_text="status eq 'active'",
    limit=25,
)

items = [
    {"PK": "TENANT#tenant1", "SK": "1#USER#1", "name": "Ada", "status": "active"},
]
safe_items = apply_response_field_policy(items, profile.forbidden_response_fields)
public_items = apply_response_allowlist(safe_items, frozenset({"name", "status"}))

Building Filters and Projections

Use build_filter() and build_projection() as standalone utilities (no database connection needed):

from dynamo_odata import build_filter, build_projection

# Parse OData filter into boto3 ConditionBase
condition = build_filter("status eq 'active' and age gt 18")
# Returns: Attr('status').eq('active') & Attr('age').gt(18)

# Build projection expression (field list)
# All fields are aliased because many common names are DynamoDB reserved keywords
projection_expr, attr_names = build_projection(["id", "name", "status"])
# Returns: ("#id,#name,#status", {"#id": "id", "#name": "name", "#status": "status"})

Filter Expressions (OData)

Supported Operators

Comparison:

build_filter("name eq 'John'")       # equals
build_filter("age ne 30")             # not equals
build_filter("price lt 100")          # less than
build_filter("price le 100")          # less than or equal
build_filter("score gt 50")           # greater than
build_filter("score ge 50")           # greater than or equal

Logical:

build_filter("status eq 'active' and age gt 18")      # AND
build_filter("role eq 'admin' or role eq 'mod'")      # OR
build_filter("not deleted eq true")                    # NOT

Membership:

build_filter("status in ('active', 'pending', 'review')")  # IN list
build_filter("age between 18 and 65")                      # BETWEEN

String Functions:

build_filter("email contains '@example.com'")    # substring match
build_filter("email startswith 'admin'")         # prefix match

Special:

build_filter("last_seen exists")         # attribute exists
build_filter("deleted not_exists")       # attribute missing
build_filter("status eq null")           # null checks (special handling in DynamoDB)

Unsupported (by design)

These are not supported in DynamoDB OData queries:

  • endswith, concat, indexof, length, substring, toupper, trim
  • datetime helpers: year, month, day, hour, minute, date, now
  • math helpers: round, floor, ceiling

Attempting to use unsupported functions raises UnsupportedFunctionException.

Common Patterns

Multi-tenant queries (single-table design):

# Query all active users in a tenant
db.get_all(
    pk="user::tenant123",
    filter="status eq 'active'",
    item_only=True
)

Combining filters:

# Complex filter expression
db.get_all(
    pk="user::tenant1",
    filter="(status eq 'active' or status eq 'trial') and age gt 18 and premium eq true",
    item_only=True
)

Projecting specific fields:

# Return only certain fields
projection_expr, attr_names = build_projection(["id", "email", "name", "created_at"])

items = db.get_all(
    pk="user::tenant1",
    projection_expression=projection_expr,
    expression_attribute_names=attr_names,
    item_only=True
)

Single-Table Pattern

dynamo-odata supports the common single-table DynamoDB design with prefixed sort keys for managing record status.

Active/Inactive Records

By convention, records use a 1# prefix for active records and 0# for inactive (soft-deleted):

# Create/put an item (automatically gets 1# prefix)
db.put(pk="user::tenant1", sk="user123", data={"email": "alice@example.com"})
# Stored as: pk="user::tenant1", sk="1#user123"

# Query only active records (default behavior)
items = db.get_all(pk="user::tenant1", item_only=True)
# Only returns records with sk starting with "1#"

# Soft delete (moves record to inactive)
db.soft_delete(pk="user::tenant1", sk="1#user123")
# Record now: pk="user::tenant1", sk="0#user123"

# Query both active and inactive
items = db.get_all(pk="user::tenant1", include_inactive=True, item_only=True)

Hard Delete vs Soft Delete

Operation Effect Query Impact Recovery
soft_delete() Moves 1#0# prefix Item still in table, excluded from default queries Can restore by moving back to 1#
hard_delete() Removes item entirely Item permanently gone Not recoverable

When to use each:

  • Soft delete: User deletions, content removal, audit trails
  • Hard delete: GDPR compliance, purging test data, final cleanup

Querying Soft-Deleted Items

# By default, get_all excludes soft-deleted items
items = db.get_all(pk="user::tenant1")  # Only `1#` records

# Include soft-deleted items explicitly
all_items = db.get_all(pk="user::tenant1", include_inactive=True)

# Query only soft-deleted items
deleted_items = db.get_all(
    pk="user::tenant1",
    filter="sk_begins_with('0#')"  # Low-level filter if needed
)

API Reference

DynamoDb Client

Initialization:

db = DynamoDb(
    table_name="users",           # Required
    region="us-west-2",           # Optional, defaults to us-west-2
    pk_separator="::",            # Optional, default partition key separator
    sk_separator="#",             # Optional, default sort key status separator
)

Methods (Sync/Async pairs):

Method Args Returns Notes
get / get_async pk, sk, [item_only] dict or Item Single item lookup
get_all / get_all_async pk, [filter, select, item_only, include_inactive] list[dict] Query with filter
batch_get / batch_get_async pk, sks, [item_only] list[dict] Multiple items, auto-chunked
put / put_async pk, sk, data None Create or update
delete / delete_async pk, sk None Hard delete
soft_delete / soft_delete_async pk, sk None Soft delete (prefix move)
hard_delete / hard_delete_async pk, sk None Permanent delete
scan_all_paginated / scan_all_paginated_async [pk, filter, page_size] Iterator[list[dict]] Paginated scan

Utility Functions:

Function Args Returns Notes
build_filter(expr) OData filter string ConditionBase Parse filter expression
build_projection(fields) list[str] (expr, attr_names_dict) Build projection + name map

Key Helpers (DynamoDb methods):

Method Args Returns Notes
build_pk *parts str Joins key parts with pk_separator
build_active_sk value str Ensures active SK prefix (1# by default)
build_inactive_sk value str Ensures inactive SK prefix (0# by default)
is_active_sk value bool Checks active prefix
is_inactive_sk value bool Checks inactive prefix

Error Handling

Common exceptions you may encounter:

from dynamo_odata import DynamoDb
from botocore.exceptions import ClientError

db = DynamoDb(table_name="users")

try:
    item = db.get(pk="user::t1", sk="1#user1")
except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceNotFoundException':
        print("Table does not exist")
    else:
        print(f"DynamoDB error: {e}")

For filter parsing errors:

from dynamo_odata import build_filter
from dynamo_odata.odata_query.exceptions import InvalidQueryException

try:
    condition = build_filter("invalid filter syntax @@")
except InvalidQueryException as e:
    print(f"Filter syntax error: {e}")

Sync vs Async: When to Use Each

Use sync if:

  • Running in a synchronous context (Flask, Django, scripts)
  • You need simpler code and don't mind blocking I/O
  • Testing or scripting

Use async if:

  • Running in an async framework (FastAPI, asyncio)
  • You need to handle many concurrent requests
  • Integrating with other async libraries

Performance note: Async has minimal overhead but shines when combined with other async operations. For single isolated queries, sync and async have similar latency.


Repository layout

  • plan/ — implementation plans and roadmap
  • src/dynamo_odata/ — library source code
    • db.py — DynamoDb client class
    • dynamo_filter.py — OData filter building
    • projection.py — projection expression building
    • odata_query/ — vendored OData parser and AST
  • tests/ — automated test suite (133 passing tests)

Development

Running Tests

# Install dev dependencies
pip install -e ".[dev]"

# Run all tests
pytest tests/

# Run with coverage
pytest tests/ --cov=src/dynamo_odata

# Run specific test file
pytest tests/test_filter.py -v

Project Status

Phase Status Next
Core library ✅ Complete Phase 4: Upstream contribution
Parser (lark) ✅ Complete
Async support ✅ Complete
Test coverage ✅ 133 tests
FastAPI layer 📅 v1.1 ODataService, ODataRouter
PyPI publish 📅 Soon Need upstream contribution first

License

MIT. See LICENSE for details.

Attribution

This package includes a vendored and modified version of the OData AST, visitor, and grammar from odata-query by Gorillini NV, used under the MIT License. The DynamoDB backend is original work.


What's Next?

Planned for upcoming releases:

  • v1.1: FastAPI integration layer (ODataService, ODataRouter, Pydantic models)
  • v1.2: $expand support with dotted $select (N+1 optimization)
  • Contribute DynamoDB visitor back to upstream odata-query project
  • Integration with consumer_sdk package

See plan/DYNAMO_ODATA_STANDALONE_PLAN.md for detailed implementation roadmap.


Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Write tests for new functionality
  4. Run pytest and ensure all tests pass
  5. Open a pull request with a clear description

See CONTRIBUTING.md for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamo_odata-0.2.0.tar.gz (45.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamo_odata-0.2.0-py3-none-any.whl (34.3 kB view details)

Uploaded Python 3

File details

Details for the file dynamo_odata-0.2.0.tar.gz.

File metadata

  • Download URL: dynamo_odata-0.2.0.tar.gz
  • Upload date:
  • Size: 45.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dynamo_odata-0.2.0.tar.gz
Algorithm Hash digest
SHA256 a8bdd1f71081b1df4603de092694c3fd0d8464a7809528b881407ceb7ef99de1
MD5 bd35f3e7e6b4146ecc33bbb4a14ba68e
BLAKE2b-256 84654137fa3d2b8139e3873672257c0ebe626481384dcb4de63bfc2d026da624

See more details on using hashes here.

File details

Details for the file dynamo_odata-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dynamo_odata-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 34.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dynamo_odata-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 45464dffff37e76a7f0e32e6ea9ca73fa715fada1650bacfca9134b0636e7ad6
MD5 13103c26a64bf986bf4f001a9edfc5c9
BLAKE2b-256 a6baf2adc4885a7f069e5aa1d2f84a5ea7af8f12c0947b59566908f1892c3dd2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page