Skip to main content

Amazon DynamoDB source connector for moss-connectors.

Project description

moss-connector-dynamodb

Amazon DynamoDB source connector for Moss. Reads items from a DynamoDB table and ingests them into a Moss index via boto3.

Install

pip install moss-connector-dynamodb

Pulls boto3 as a dependency. AWS credentials must be configured separately (environment variables, ~/.aws/credentials, or an IAM role).

Usage — full-table Scan

import asyncio
from moss import DocumentInfo
from moss_connector_dynamodb import DynamoDBConnector, ingest

async def main():
    source = DynamoDBConnector(
        table_name="articles",
        mapper=lambda item: DocumentInfo(
            id=item["sku"],
            text=item["body"],
            metadata={"title": item["title"]},
        ),
        region_name="us-east-1",
    )

    result = await ingest(
        source,
        project_id="your_project_id",
        project_key="your_project_key",
        index_name="articles",
    )
    print(f"copied {result.doc_count} items")

asyncio.run(main())

Use auto_id=True when your mapper does not have a stable primary key and you want Moss to generate UUID document IDs.

Usage — partition-key Query

Use DynamoDBQueryConnector when you only want items for a specific partition:

from boto3.dynamodb.conditions import Key
from moss import DocumentInfo
from moss_connector_dynamodb import DynamoDBQueryConnector, ingest

source = DynamoDBQueryConnector(
    table_name="events",
    key_condition_expression=Key("tenant_id").eq("acme"),
    mapper=lambda item: DocumentInfo(
        id=item["event_id"],
        text=item["description"],
        metadata={"tenant_id": item["tenant_id"]},
    ),
    region_name="us-east-1",
)

Filtering (Scan)

Pass a boto3 FilterExpression to restrict which items the connector yields:

from boto3.dynamodb.conditions import Attr

source = DynamoDBConnector(
    table_name="articles",
    filter_expression=Attr("status").eq("published"),
    mapper=...,
    region_name="us-east-1",
)

FilterExpression is applied server-side by DynamoDB after the Scan reads items — it does not reduce consumed capacity, but it does reduce the data your Lambda / server has to process. For true server-side filtering, use a Query with DynamoDBQueryConnector or a DynamoDB Stream / Filter Policy.

Pagination

Both connectors automatically follow LastEvaluatedKey pagination so you get every item in the table regardless of size. Tune page_size (default 100) to control how many items are fetched per round-trip.

Data requirements

DocumentInfo.metadata requires Dict[str, str]. DynamoDB's high-level resource API returns Decimal for numbers and bytes for Binary. Coerce non-string values in your mapper:

mapper=lambda item: DocumentInfo(
    id=item["id"],
    text=item["content"],
    metadata={
        "price": str(item["price"]),        # Decimal → str
        "in_stock": str(item["in_stock"]),  # bool → str
        "tags": ",".join(item["tags"]),     # set/list → str
    },
)

Connecting to DynamoDB Local / LocalStack

Pass endpoint_url to the connector:

DynamoDBConnector(
    table_name="articles",
    mapper=...,
    region_name="us-east-1",
    endpoint_url="http://localhost:8000",   # DynamoDB Local
)

Layout

src/
├── __init__.py      # re-exports DynamoDBConnector, DynamoDBQueryConnector, ingest
├── connector.py     # DynamoDBConnector and DynamoDBQueryConnector classes
└── ingest.py        # ingest() - keep in sync with the other connector packages

Tests

pip install -e ".[dev]"
pytest tests/test_dynamodb.py -v                          # mocked with moto, no AWS needed
pytest tests/test_integration_dynamodb_moss.py -v -s      # live AWS + Moss

The mocked tests use moto to simulate DynamoDB locally — no AWS credentials needed.

The integration test requires AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, MOSS_PROJECT_ID, and MOSS_PROJECT_KEY. Set DYNAMODB_ENDPOINT_URL=http://localhost:8000 to target a local DynamoDB instead of real AWS.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

moss_connector_dynamodb-0.0.1.tar.gz (9.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

moss_connector_dynamodb-0.0.1-py3-none-any.whl (6.4 kB view details)

Uploaded Python 3

File details

Details for the file moss_connector_dynamodb-0.0.1.tar.gz.

File metadata

  • Download URL: moss_connector_dynamodb-0.0.1.tar.gz
  • Upload date:
  • Size: 9.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for moss_connector_dynamodb-0.0.1.tar.gz
Algorithm Hash digest
SHA256 b8fc682963d099c43e10031b6c508bbabdf59f23f408589d91f07645314cf5a0
MD5 cbd8654a7b6aac3c465fdf7a934d03d1
BLAKE2b-256 7c07bb4f312e254ce0680673ae0f5debeb3a8aa43a8a77181efc924bb7de69ab

See more details on using hashes here.

File details

Details for the file moss_connector_dynamodb-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for moss_connector_dynamodb-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3b961eed116193769f80b3ad891829860c5f88de8c76f7b3e1440a9529981d77
MD5 db9f97d53c3ef88e4493de7b8a803fea
BLAKE2b-256 e7135220cf05a0e2ab8cef838312492f57b223af3b98f7aacdbb22fb52b2b2e1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page