Amazon DynamoDB source connector for moss-connectors.
Project description
moss-connector-dynamodb
Amazon DynamoDB source connector for Moss. Reads items from a DynamoDB table
and ingests them into a Moss index via boto3.
Install
pip install moss-connector-dynamodb
Pulls boto3 as a dependency. AWS credentials must be configured separately
(environment variables, ~/.aws/credentials, or an IAM role).
Usage — full-table Scan
import asyncio
from moss import DocumentInfo
from moss_connector_dynamodb import DynamoDBConnector, ingest
async def main():
source = DynamoDBConnector(
table_name="articles",
mapper=lambda item: DocumentInfo(
id=item["sku"],
text=item["body"],
metadata={"title": item["title"]},
),
region_name="us-east-1",
)
result = await ingest(
source,
project_id="your_project_id",
project_key="your_project_key",
index_name="articles",
)
print(f"copied {result.doc_count} items")
asyncio.run(main())
Use auto_id=True when your mapper does not have a stable primary key and you
want Moss to generate UUID document IDs.
Usage — partition-key Query
Use DynamoDBQueryConnector when you only want items for a specific partition:
from boto3.dynamodb.conditions import Key
from moss import DocumentInfo
from moss_connector_dynamodb import DynamoDBQueryConnector, ingest
source = DynamoDBQueryConnector(
table_name="events",
key_condition_expression=Key("tenant_id").eq("acme"),
mapper=lambda item: DocumentInfo(
id=item["event_id"],
text=item["description"],
metadata={"tenant_id": item["tenant_id"]},
),
region_name="us-east-1",
)
Filtering (Scan)
Pass a boto3 FilterExpression to restrict which items the connector yields:
from boto3.dynamodb.conditions import Attr
source = DynamoDBConnector(
table_name="articles",
filter_expression=Attr("status").eq("published"),
mapper=...,
region_name="us-east-1",
)
FilterExpression is applied server-side by DynamoDB after the Scan reads
items — it does not reduce consumed capacity, but it does reduce the data
your Lambda / server has to process. For true server-side filtering, use
a Query with DynamoDBQueryConnector or a DynamoDB Stream / Filter Policy.
Pagination
Both connectors automatically follow LastEvaluatedKey pagination so you get
every item in the table regardless of size. Tune page_size (default 100)
to control how many items are fetched per round-trip.
Data requirements
DocumentInfo.metadata requires Dict[str, str]. DynamoDB's high-level
resource API returns Decimal for numbers and bytes for Binary. Coerce
non-string values in your mapper:
mapper=lambda item: DocumentInfo(
id=item["id"],
text=item["content"],
metadata={
"price": str(item["price"]), # Decimal → str
"in_stock": str(item["in_stock"]), # bool → str
"tags": ",".join(item["tags"]), # set/list → str
},
)
Connecting to DynamoDB Local / LocalStack
Pass endpoint_url to the connector:
DynamoDBConnector(
table_name="articles",
mapper=...,
region_name="us-east-1",
endpoint_url="http://localhost:8000", # DynamoDB Local
)
Layout
src/
├── __init__.py # re-exports DynamoDBConnector, DynamoDBQueryConnector, ingest
├── connector.py # DynamoDBConnector and DynamoDBQueryConnector classes
└── ingest.py # ingest() - keep in sync with the other connector packages
Tests
pip install -e ".[dev]"
pytest tests/test_dynamodb.py -v # mocked with moto, no AWS needed
pytest tests/test_integration_dynamodb_moss.py -v -s # live AWS + Moss
The mocked tests use moto to simulate DynamoDB locally — no AWS credentials needed.
The integration test requires AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
MOSS_PROJECT_ID, and MOSS_PROJECT_KEY. Set DYNAMODB_ENDPOINT_URL=http://localhost:8000
to target a local DynamoDB instead of real AWS.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file moss_connector_dynamodb-0.0.1.tar.gz.
File metadata
- Download URL: moss_connector_dynamodb-0.0.1.tar.gz
- Upload date:
- Size: 9.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b8fc682963d099c43e10031b6c508bbabdf59f23f408589d91f07645314cf5a0
|
|
| MD5 |
cbd8654a7b6aac3c465fdf7a934d03d1
|
|
| BLAKE2b-256 |
7c07bb4f312e254ce0680673ae0f5debeb3a8aa43a8a77181efc924bb7de69ab
|
File details
Details for the file moss_connector_dynamodb-0.0.1-py3-none-any.whl.
File metadata
- Download URL: moss_connector_dynamodb-0.0.1-py3-none-any.whl
- Upload date:
- Size: 6.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b961eed116193769f80b3ad891829860c5f88de8c76f7b3e1440a9529981d77
|
|
| MD5 |
db9f97d53c3ef88e4493de7b8a803fea
|
|
| BLAKE2b-256 |
e7135220cf05a0e2ab8cef838312492f57b223af3b98f7aacdbb22fb52b2b2e1
|