Skip to main content

OData filter and projection helpers for AWS DynamoDB

Project description

dynamo-odata

DynamoDB-focused OData toolkit: build filters, projections, and query DynamoDB tables using OData expressions — with full async support and no eval().

Features:

  • OData $filter expressions → boto3 ConditionBase (no string eval, fully type-safe)
  • OData $selectProjectionExpression with reserved keyword handling
  • OData $expand → batch FK resolution with dotted $select support
  • FastAPI integration: ODataQueryParams + ODataService wires all OData params in one call
  • DynamoDB CRUD operations with sync and async (aioboto3) parity
  • Single-table design helpers (1#/0# active prefix, soft/hard delete, restore)
  • Atomic multi-item writes via transact_write
  • GSI queries with OData filter support
  • Pydantic-friendly: call model.model_dump(exclude_none=True) before writes — see Pydantic integration
  • 197 tests, lark-based parser, Python 3.10+

What this is: A focused DynamoDB library. Not an ORM, not a general SQL tool, not a full OData server.

What's NOT included (by design): SQL backends, table creation, schema migrations, Athena, SQLite.


Why dynamo-odata?

Unreadable filter expressions

# boto3 raw
Attr('status').eq('active') & Attr('age').gt(18)

# dynamo-odata — pass the OData string straight from the query param
filter="status eq 'active' and age gt 18"

FK resolution N+1 boilerplate

# boto3 raw — ~50 lines: collect, deduplicate, chunk into 100-key batches,
# retry UnprocessedKeys, join results back onto base items

# dynamo-odata — one call, concurrent batches, automatic retry
await expand_items_async(items, {"owner": owner_cfg}, db)

FastAPI query-param wiring

# boto3 raw — ~30 lines of Query() params + manual threading to DynamoDB

# dynamo-odata
@app.get("/items")
async def list_items(params: ODataQueryParams = Depends(ODataQueryParams)):
    return await item_service.query_items(db, pk, params)
Task Raw boto3 dynamo-odata
OData filter from query string ~15 lines (parse, validate, build ConditionBase) filter="status eq 'active'"
Resolve FKs for 25 items ~50 lines (collect, deduplicate, chunk, retry, join) expand_items_async(items, specs, db)
Wire filter/select/expand/pagination in FastAPI ~30 lines of Query params + manual threading Depends(ODataQueryParams) + query_items

Installation

# Core library (sync only)
pip install dynamo-odata

# With async support (aioboto3)
pip install dynamo-odata[async]

# With FastAPI integration (ODataQueryParams, ODataService)
pip install dynamo-odata[fastapi]

# Development
pip install dynamo-odata[dev]

Setup

AWS Credentials

  • dynamo-odata uses boto3, so configure AWS credentials as you normally would:
# Option 1: Environment variables
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_DEFAULT_REGION=us-west-2

# Option 2: AWS credentials file
~/.aws/credentials
~/.aws/config

# Option 3: IAM role (on EC2, Lambda, etc.)
# Automatic if running in AWS

Quick Test

Verify the installation works:

from dynamo_odata import build_filter

# Parse an OData filter expression
condition = build_filter("status eq 'active'")
print(condition)  # Attr('status').eq('active')

Quickstart

Sync Operations

from dynamo_odata import DynamoDb, build_filter, build_projection

# Initialize the client
db = DynamoDb(table_name="users-table", region="us-west-2")

# Get a single item by partition and sort key
item = db.get(pk="user::tenant1", sk="1#user123", item_only=True)

# Query all items for a tenant, with OData filter
items = db.get_all(
    pk="user::tenant1",
    filter="status eq 'active' and age gt 18",
    item_only=True
)

# Batch get multiple items
items = db.batch_get(pk="user::tenant1", sks=["user1", "user2", "user3"], item_only=True)

# Create or update an item
db.put(pk="user::tenant1", sk="1#user123", data={"name": "Ada", "status": "active"})

# Soft delete (moves to inactive prefix `0#`)
db.soft_delete(pk="user::tenant1", sk="1#user123")

# Hard delete (permanent)
db.hard_delete(pk="user::tenant1", sk="1#user123")

# Paginate through results
for page in db.scan_all_paginated(pk="user::tenant1", page_size=50):
    print(f"Got {len(page)} items")

Async Operations

All methods have async equivalents. Use with asyncio or async frameworks like FastAPI:

from dynamo_odata import DynamoDb

db = DynamoDb(table_name="users-table", region="us-west-2")

# Async reads (with native aioboto3)
item = await db.get_async(pk="user::tenant1", sk="1#user123", item_only=True)
items = await db.get_all_async(pk="user::tenant1", filter="status eq 'active'")
items = await db.batch_get_async(pk="user::tenant1", sks=["user1", "user2"])

# Async writes
await db.put_async(pk="user::tenant1", sk="1#user123", data={"name": "Ada"})
await db.soft_delete_async(pk="user::tenant1", sk="1#user123")

# Async pagination
async for page in db.scan_all_paginated_async(pk="user::tenant1", page_size=50):
    print(f"Got {len(page)} items")

Key Schema Configuration

The default key attribute names are lowercase pk and sk. For projects that use uppercase attributes such as PK and SK, pass a schema preset when constructing the client:

from dynamo_odata import DynamoDb, UPPERCASE_KEY_SCHEMA

db = DynamoDb(
    table_name="main-table",
    region="us-east-1",
    key_schema=UPPERCASE_KEY_SCHEMA,
)

item = db.get(pk="TENANT#tenant1", sk="1#USER#123", item_only=True)

You can also define a custom schema if your table uses different key names or separators:

from dynamo_odata import DynamoDb, KeySchema

custom_schema = KeySchema(
    pk_name="PK",
    sk_name="SK",
    pk_separator="::",
    sk_separator="#",
)

db = DynamoDb(table_name="custom-table", key_schema=custom_schema)

Guardrails For API Usage

For API-facing workloads, you can opt into partition-key validation and filter-policy validation so callers cannot query unexpected partitions or use unrestricted filters.

from dynamo_odata import DynamoDb, FilterPolicy, PartitionKeyGuard, UPPERCASE_KEY_SCHEMA

db = DynamoDb(
    table_name="main-table",
    key_schema=UPPERCASE_KEY_SCHEMA,
    partition_key_guard=PartitionKeyGuard(("TENANT#",)),
    filter_policy=FilterPolicy(
        allowed_fields=frozenset({"status", "specialty", "lsis3"}),
        allowed_comparators=frozenset({"eq", "ne", "gt", "ge", "lt", "le"}),
        allowed_functions=frozenset({"contains", "startswith", "tolower"}),
        max_predicates=4,
        max_depth=6,
    ),
)

# Allowed
items = db.get_all("TENANT#tenant1", filter="status eq 'active'", item_only=True)

# Rejected before query execution
# db.get_all("DISEASE#123", filter="contains(notes, 'x')", item_only=True)

Regulated Environment Profile Helpers

For API layers that need repeatable controls, use the optional regulated profile helpers:

These helpers provide policy primitives only. PHI/PII identification and enforcement rules remain the responsibility of the consuming application.

from dynamo_odata import (
    apply_response_allowlist,
    apply_response_field_policy,
    build_regulated_profile,
    validate_regulated_query,
)

profile = build_regulated_profile(
    partition_prefixes=("TENANT#",),
    allowed_filter_fields=frozenset({"status", "name"}),
    max_page_size=50,
)

normalized_limit = validate_regulated_query(
    profile,
    partition_key="TENANT#tenant1",
    filter_text="status eq 'active'",
    limit=25,
)

items = [
    {"PK": "TENANT#tenant1", "SK": "1#USER#1", "name": "Ada", "status": "active"},
]
safe_items = apply_response_field_policy(items, profile.forbidden_response_fields)
public_items = apply_response_allowlist(safe_items, frozenset({"name", "status"}))

Building Filters and Projections

Use build_filter() and build_projection() as standalone utilities (no database connection needed):

from dynamo_odata import build_filter, build_projection

# Parse OData filter into boto3 ConditionBase
condition = build_filter("status eq 'active' and age gt 18")
# Returns: Attr('status').eq('active') & Attr('age').gt(18)

# Build projection expression (field list)
# All fields are aliased because many common names are DynamoDB reserved keywords
projection_expr, attr_names = build_projection(["id", "name", "status"])
# Returns: ("#id,#name,#status", {"#id": "id", "#name": "name", "#status": "status"})

$expand

Resolve foreign-key fields to full objects using a single BatchGetItem call per alias — no N+1 queries, automatic deduplication, concurrent fetches.

from dynamo_odata import DynamoDb
from dynamo_odata.expand import ExpandConfig, expand_items_async, apply_dotted_select

db = DynamoDb(table_name="main-table", region="us-west-2")

# Declare the FK relationship once
owner_cfg = ExpandConfig(
    local_key="owner_user_id",   # FK field on each base item
    target_pk="USER#tenant1",    # DynamoDB PK for the target entity
    remote_key="user_id",        # Field on expanded item that matches the FK
    target_sk_prefix="USER#",    # Prepended to FK value to form the SK body
    fields=("name", "email"),    # Optional: limit returned fields (None = all)
)

# Fetch base items
items, _ = await db.get_all_async(pk="ITEM#tenant1", select="id,status,owner_user_id")

# Resolve FKs — concurrent BatchGetItem per alias, auto-retry on UnprocessedKeys
items = await expand_items_async(items, {"owner": owner_cfg}, db)
# items[0] == {"id": "...", "status": "active", "owner_user_id": "alice",
#               "owner": {"user_id": "alice", "name": "Alice", "email": "alice@ex.com"}}

# Trim expanded objects to only the requested dotted sub-fields
items = apply_dotted_select(items, "id,status,owner.name,owner.email")
# items[0]["owner"] == {"name": "Alice", "email": "alice@ex.com"}

Known limitation: Expanded lookups are active-items only. DynamoDb._normalize_sks prepends the active prefix (1#) to every sort key, so FK values pointing to soft-deleted items silently resolve to None.


FastAPI integration

pip install dynamo-odata[fastapi]

ODataQueryParams binds all five OData query params as a Depends argument. ODataService.query_items wires filter, select, expand, and pagination in one call.

from fastapi import Depends, FastAPI
from dynamo_odata import DynamoDb
from dynamo_odata.expand import ExpandConfig
from dynamo_odata.fastapi import ODataQueryParams, ODataService

app = FastAPI()
db = DynamoDb(table_name="main-table", region="us-west-2")

item_service = ODataService(
    expand_config={
        "owner": ExpandConfig(
            local_key="owner_user_id",
            target_pk="USER#tenant1",
            remote_key="user_id",
            target_sk_prefix="USER#",
        ),
    }
)

@app.get("/items")
async def list_items(params: ODataQueryParams = Depends(ODataQueryParams)):
    return await item_service.query_items(db, "ITEM#tenant1", params)

Supported query params on any route wired this way:

Param Example Effect
$filter status eq 'active' OData filter expression
$select id,name,owner.email Field projection; dotted paths trim expanded objects
$expand owner Resolve FK alias(es) via BatchGetItem
$top 25 Page size (default 25)
$skipToken (opaque cursor) Pagination token from @odata.nextLink

Dotted $select fields (e.g. owner.email) automatically add the implied expand — no explicit $expand=owner needed.

See examples/fastapi_expand.py for a complete runnable app with Swagger UI.

$expand flow:

expand flow


Filter Expressions (OData)

Supported Operators

Comparison:

build_filter("name eq 'John'")       # equals
build_filter("age ne 30")             # not equals
build_filter("price lt 100")          # less than
build_filter("price le 100")          # less than or equal
build_filter("score gt 50")           # greater than
build_filter("score ge 50")           # greater than or equal

Logical:

build_filter("status eq 'active' and age gt 18")      # AND
build_filter("role eq 'admin' or role eq 'mod'")      # OR
build_filter("not deleted eq true")                    # NOT

Membership:

build_filter("status in ('active', 'pending', 'review')")  # IN list
build_filter("age between 18 and 65")                      # BETWEEN

String Functions:

build_filter("email contains '@example.com'")    # substring match
build_filter("email startswith 'admin'")         # prefix match

Special:

build_filter("last_seen exists")         # attribute exists
build_filter("deleted not_exists")       # attribute missing
build_filter("status eq null")           # null checks (special handling in DynamoDB)

Grouping with Parentheses

Parentheses control evaluation order and are essential when mixing and / or. Without them, and binds more tightly than or — the same precedence rules as Python and SQL.

# Without parentheses: `and` binds tighter than `or`
# Equivalent to: status eq 'active' and (role eq 'admin' or role eq 'mod')? NO —
# actually: (status eq 'active' and role eq 'admin') or role eq 'mod'
build_filter("status eq 'active' and role eq 'admin' or role eq 'mod'")

# With parentheses: intent is explicit and correct
build_filter("status eq 'active' and (role eq 'admin' or role eq 'mod')")

# Multiple groups
build_filter("(status eq 'active' or status eq 'trial') and (age gt 18 or override eq true)")

# Negating a group
build_filter("not (status eq 'deleted' or status eq 'banned')")

# Deeply nested
build_filter("(a eq 1 and (b eq 2 or c eq 3)) or (d eq 4 and e eq 5)")

Rule of thumb: any time you combine and with or in the same expression, use parentheses to make the grouping explicit.

Unsupported (by design)

These are not supported in DynamoDB OData queries:

  • endswith, concat, indexof, length, substring, toupper, trim
  • datetime helpers: year, month, day, hour, minute, date, now
  • math helpers: round, floor, ceiling

Attempting to use unsupported functions raises UnsupportedFunctionException.

Common Patterns

Multi-tenant queries (single-table design):

# Query all active users in a tenant
db.get_all(
    pk="user::tenant123",
    filter="status eq 'active'",
    item_only=True
)

Combining filters:

# Complex filter expression
db.get_all(
    pk="user::tenant1",
    filter="(status eq 'active' or status eq 'trial') and age gt 18 and premium eq true",
    item_only=True
)

Projecting specific fields:

# Return only certain fields
projection_expr, attr_names = build_projection(["id", "email", "name", "created_at"])

items = db.get_all(
    pk="user::tenant1",
    projection_expression=projection_expr,
    expression_attribute_names=attr_names,
    item_only=True
)

Single-Table Pattern

dynamo-odata supports the common single-table DynamoDB design with prefixed sort keys for managing record status.

Active/Inactive Records

By convention, records use a 1# prefix for active records and 0# for inactive (soft-deleted):

# Create/put an item (automatically gets 1# prefix)
db.put(pk="user::tenant1", sk="user123", data={"email": "alice@example.com"})
# Stored as: pk="user::tenant1", sk="1#user123"

# Query only active records (default behavior)
items = db.get_all(pk="user::tenant1", item_only=True)
# Only returns records with sk starting with "1#"

# Soft delete (moves record to inactive)
db.soft_delete(pk="user::tenant1", sk="1#user123")
# Record now: pk="user::tenant1", sk="0#user123"

# Query both active and inactive
items = db.get_all(pk="user::tenant1", include_inactive=True, item_only=True)

Hard Delete vs Soft Delete

Operation Effect Query Impact Recovery
soft_delete() Moves 1#0# prefix Item still in table, excluded from default queries Can restore by moving back to 1#
hard_delete() Removes item entirely Item permanently gone Not recoverable

When to use each:

  • Soft delete: User deletions, content removal, audit trails
  • Hard delete: GDPR compliance, purging test data, final cleanup

Querying Soft-Deleted Items

# By default, get_all excludes soft-deleted items
items = db.get_all(pk="user::tenant1")  # Only `1#` records

# Include soft-deleted items explicitly
all_items = db.get_all(pk="user::tenant1", include_inactive=True)

# Query only soft-deleted items
deleted_items = db.get_all(
    pk="user::tenant1",
    filter="sk_begins_with('0#')"  # Low-level filter if needed
)

API Reference

DynamoDb Client

Initialization:

db = DynamoDb(
    table_name="users",           # Required
    region="us-west-2",           # Optional, defaults to us-west-2
    pk_separator="::",            # Optional, default partition key separator
    sk_separator="#",             # Optional, default sort key status separator
)

Methods (Sync/Async pairs):

Method Args Returns Notes
get / get_async pk, sk, [item_only] dict or Item Single item lookup
get_all / get_all_async pk, [filter, select, item_only, include_inactive] list[dict] Query with filter
batch_get / batch_get_async pk, sks, [item_only] list[dict] Multiple items, auto-chunked
put / put_async pk, sk, data None Unconditional full-item replace (PUT semantics)
put_item / put_item_async pk, sk, item None Unconditional write; does not strip key attrs
create_item / create_item_async pk, sk, item None Conditional write — raises if item already exists
update_item / update_item_async pk, sk, updates dict Partial update (PATCH); returns full item after write
query_gsi / query_gsi_async index_name, pk_attr, pk_value, [sk_*, filter, limit, cursor] (list[dict], cursor) GSI query with optional SK conditions and pagination
transact_write / transact_write_async operations None Atomic multi-item write (up to 25 ops); TableName injected automatically
restore / restore_async pk, sk_body, [restore_data] None Swap SK from 0#1#, clear deleted_* attrs
delete / delete_async pk, sk None Hard delete
soft_delete / soft_delete_async pk, sk None Soft delete (prefix move)
hard_delete / hard_delete_async pk, sk None Permanent delete
scan_all_paginated / scan_all_paginated_async [pk, filter, page_size] Iterator[list[dict]] Paginated scan

Utility Functions:

Function Args Returns Notes
build_filter(expr) OData filter string ConditionBase Parse filter expression
build_projection(fields) list[str] (expr, attr_names_dict) Build projection + name map

Key Helpers (DynamoDb methods):

Method Args Returns Notes
build_pk *parts str Joins key parts with pk_separator
build_active_sk value str Ensures active SK prefix (1# by default)
build_inactive_sk value str Ensures inactive SK prefix (0# by default)
is_active_sk value bool Checks active prefix
is_inactive_sk value bool Checks inactive prefix

Pydantic Integration

dynamo-odata is Pydantic-agnostic — it works with plain dicts. The recommended pattern when using Pydantic models is to serialise to dict before writing and deserialise after reading:

from dynamo_odata import DynamoDb, UPPERCASE_KEY_SCHEMA
from pydantic import BaseModel

db = DynamoDb(table_name="main", key_schema=UPPERCASE_KEY_SCHEMA)

# Write — exclude_none=True prevents boto3 rejecting null attribute values
item = my_model.model_dump(exclude_none=True)
db.put_item(pk, sk, item)

# Read
raw = db.get(pk, sk, item_only=True)
model = MyModel(**raw)

For models with optional fields, always pass exclude_none=True (or exclude_unset=True for partial updates). DynamoDB's resource interface rejects None attribute values with a TypeError.


Error Handling

Common exceptions you may encounter:

from dynamo_odata import DynamoDb
from botocore.exceptions import ClientError

db = DynamoDb(table_name="users")

try:
    item = db.get(pk="user::t1", sk="1#user1")
except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceNotFoundException':
        print("Table does not exist")
    else:
        print(f"DynamoDB error: {e}")

For filter parsing errors:

from dynamo_odata import build_filter
from dynamo_odata.odata_query.exceptions import InvalidQueryException

try:
    condition = build_filter("invalid filter syntax @@")
except InvalidQueryException as e:
    print(f"Filter syntax error: {e}")

Sync vs Async: When to Use Each

Use sync if:

  • Running in a synchronous context (Flask, Django, scripts)
  • You need simpler code and don't mind blocking I/O
  • Testing or scripting

Use async if:

  • Running in an async framework (FastAPI, asyncio)
  • You need to handle many concurrent requests
  • Integrating with other async libraries

Performance note: Async has minimal overhead but shines when combined with other async operations. For single isolated queries, sync and async have similar latency.


Repository layout

  • plan/ — implementation plans and roadmap
  • docs/ — diagrams and supplementary docs
  • examples/ — runnable example apps
  • src/dynamo_odata/ — library source code
    • db.py — DynamoDb client class
    • dynamo_filter.py — OData filter building
    • projection.py — projection expression building
    • expand.py$expand FK resolution (ExpandConfig, expand_items_async, apply_dotted_select, parse_expand)
    • fastapi/ — FastAPI integration (ODataQueryParams, ODataService)
    • odata_query/ — vendored OData parser and AST
  • tests/ — automated test suite

Development

Running Tests

# Install dev dependencies
pip install -e ".[dev]"

# Run all tests
pytest tests/

# Run with coverage
pytest tests/ --cov=src/dynamo_odata

# Run specific test file
pytest tests/test_filter.py -v

Project Status

Phase Status Version
Core library ✅ Complete 0.1.0
Parser (lark) ✅ Complete 0.1.0
Async support ✅ Complete 0.5.0
$expand + FastAPI layer ✅ Complete 0.7.0
PyPI publish 📅 Pending

License

MIT. See LICENSE for details.

Attribution

This package includes a vendored and modified version of the OData AST, visitor, and grammar from odata-query by Gorillini NV, used under the MIT License. The DynamoDB backend is original work.


What's Next?

  • Contribute DynamoDB visitor back to upstream odata-query project
  • PyPI publish (pip install dynamo-odata) — see Phase 3 of the expand plan

Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Write tests for new functionality
  4. Run pytest and ensure all tests pass
  5. Open a pull request with a clear description

See CONTRIBUTING.md for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamo_odata-0.7.0.tar.gz (64.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamo_odata-0.7.0-py3-none-any.whl (45.2 kB view details)

Uploaded Python 3

File details

Details for the file dynamo_odata-0.7.0.tar.gz.

File metadata

  • Download URL: dynamo_odata-0.7.0.tar.gz
  • Upload date:
  • Size: 64.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamo_odata-0.7.0.tar.gz
Algorithm Hash digest
SHA256 814de5d712cf75b284a8d20b2a3d442ed21d65205d38b628aadf15c4352b960f
MD5 de0c8aa062f2ef9b74b8f90513f3bbe9
BLAKE2b-256 a45c22a2fe5f646ae95b007c6c297419d801f8c6736b8db88d48ca774a75b731

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamo_odata-0.7.0.tar.gz:

Publisher: publish.yml on one-table-labs/dynamo-odata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dynamo_odata-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: dynamo_odata-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 45.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dynamo_odata-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fbcac202eb69fce7ec495a6b2c4293d59ef012958ac0af561e3d01c7210a6e2f
MD5 af1e8f4c16bae9c1c14042d493313388
BLAKE2b-256 796987ee7f330ce44ec4cbb748f28bf84d393a7e1e758912647e02368631a791

See more details on using hashes here.

Provenance

The following attestation bundles were made for dynamo_odata-0.7.0-py3-none-any.whl:

Publisher: publish.yml on one-table-labs/dynamo-odata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page