Skip to main content

Add your description here

Project description

erns-shared

Shared Python utilities for AWS + AI projects. Provides reusable Domain-Driven Design (DDD) primitives, AWS service wrappers, and Lambda infrastructure patterns used across Lab products and Domenxa.

Only code that has been reused in 2+ real projects enters this library.

Installation

uv add erns-shared
# or
pip install erns-shared

Optional extras

# SSE streaming for FastAPI
uv add erns-shared[http]

# AI providers — pick what you need
uv add erns-shared[ai-anthropic]
uv add erns-shared[ai-openai]
uv add erns-shared[ai-google]
uv add erns-shared[ai-ollama]

# All AI providers at once
uv add erns-shared[ai]

Modules

Module Status Description
erns_shared.ddd Available DDD primitives, DynamoDB UoW, EventBridge publisher
erns_shared.aws Available S3, DynamoDB query helpers, SSM Parameter Store, Lambda Powertools
erns_shared.ai Available Multi-provider AI client with cost tracking and error handling
erns_shared.http Available SSE streaming response for FastAPI
erns_shared.parsers Coming soon PDF extraction, CSV parsing

erns_shared.ddd

Battle-tested building blocks for DDD-style Python services on AWS.

Define a domain model

from erns_shared.ddd import (
    EntityId, DomainAggregate, DomainEvent, Command,
    ValueObject, EpochTime, update_last_update_date,
)

class OrderId(EntityId):
    value: str

class OrderPlaced(DomainEvent):
    domain_name: str = "orders"
    order_id: str
    total: float

class Order(DomainAggregate):
    id: OrderId
    total: float
    status: str = "pending"

    @update_last_update_date
    def place(self) -> None:
        self.status = "placed"
        self.add_event(OrderPlaced(order_id=self.id._key(), total=self.total))

Persist with DynamoDB (single-table) + publish to EventBridge

from erns_shared.ddd.adapters.unit_of_work import DynamoDbUnitOfWork
from erns_shared.ddd.adapters.persistence.dynamodb_repository import DynamoDbRepository

uow = DynamoDbUnitOfWork()
repo = DynamoDbRepository(session=uow.session, table_name="my-table", entity_type=Order)

order = Order(id=OrderId(value="ord-123"), total=99.99)
order.place()

with uow.transaction():
    repo.put(order)
    uow.publish_events(order.pull_events())

Structured Lambda logger

from erns_shared.aws import get_lambda_logger

logger = get_lambda_logger()
logger.info("Order placed")

Set LOG_LEVEL=DEBUG env var to enable debug logging.


erns_shared.aws

S3Client

from erns_shared.aws import S3Client

s3 = S3Client()

s3.upload("my-bucket", "docs/report.pdf", body=pdf_bytes, content_type="application/pdf")
data = s3.download("my-bucket", "docs/report.pdf")

if s3.key_exists("my-bucket", "docs/report.pdf"):
    url = s3.presigned_get_url("my-bucket", "docs/report.pdf", expiration=3600)

for key in s3.list_keys("my-bucket", prefix="docs/"):
    print(key)

DynamoDBTable

High-level query helpers for the single-table pattern. Works with plain dicts — complements the DDD persistence layer.

from boto3.dynamodb.conditions import Attr, Key
from erns_shared.aws import DynamoDBTable

table = DynamoDBTable("my-table")

# query — flexible sort key conditions
orders = table.query_by_pk("pk", "user#1", sk_condition=Key("sk").begins_with("order#"))
recent  = table.query_by_pk("pk", "user#1", sk_condition=Key("sk").between("order#2024", "order#2025"))

# query + filter on non-key attributes
shipped = table.query_by_pk(
    "pk", "user#1",
    sk_condition=Key("sk").begins_with("order#"),
    filter_expression=Attr("status").eq("shipped"),
)

# convenience prefix helper
orders = table.query_by_pk_sk_prefix("pk", "user#1", "sk", "order#")

# scan the full table
active = table.scan(filter_expression=Attr("active").eq(True))

# batch read (auto-chunks at 100)
items = table.batch_get(keys=[{"pk": "user#1", "sk": "profile"}])

# single-item writes with optional condition + return previous value
old = table.put_item(item, return_values="ALL_OLD")
table.put_item(item, condition=Attr("version").eq(3))
table.delete_item(key, condition=Attr("status").eq("pending"))

Batch writer — not atomic, auto-chunks at 25 per call

with table.batch_writer() as w:
    w.put({"pk": "user#1", "sk": "profile", "name": "Alice"})
    w.delete({"pk": "user#old", "sk": "profile"})
# flushes on exit, clears ops on exception

Transaction writer — fully atomic, max 100 operations

with table.transaction_writer() as w:
    w.put({"pk": "order#1", "sk": "meta", "status": "placed"})
    w.put({"pk": "order#1", "sk": "meta"}, condition=Attr("version").eq(2))
    w.delete({"pk": "draft#1", "sk": "meta"})
# all land or none do

SSMClient

from erns_shared.aws import SSMClient

ssm = SSMClient()

# single parameter — cached by default
db_url = ssm.get_parameter("/app/db_url")

# load an entire path into cache in one call
params = ssm.get_parameters_by_path("/app/prod")

# write
ssm.put_parameter("/app/feature_flag", "true", param_type="String", overwrite=True)

# cache management
ssm.invalidate_cache("/app/db_url")  # single key
ssm.invalidate_cache()               # full flush

Lambda Powertools

from erns_shared.aws import build_powertools, LambdaContext

logger, tracer, metrics = build_powertools(service="order-service", namespace="MyApp")
# falls back to POWERTOOLS_SERVICE_NAME / POWERTOOLS_METRICS_NAMESPACE env vars

@tracer.capture_lambda_handler
@logger.inject_lambda_context
def handler(event: dict, context: LambdaContext) -> dict:
    logger.info("Handling event")
    return {"statusCode": 200}

erns_shared.ai

Multi-provider AI client with a unified interface, cost tracking, and provider-agnostic error handling.

Requires: uv add erns-shared[ai-anthropic] (or whichever provider you use).

One-shot completion

from erns_shared.ai import get_ai_client, ProviderError

client = get_ai_client(provider="anthropic", model="claude-sonnet-4-6")
# API key falls back to ANTHROPIC_API_KEY env var if not passed explicitly

try:
    response = client.complete(
        system="You are a helpful assistant.",
        user="Summarise this document in 3 bullet points.",
        max_tokens=1024,
    )
    print(response.text)
    print(f"Tokens: {response.input_tokens} in / {response.output_tokens} out")
    print(f"Cost: ${response.estimated_cost_usd:.6f}")
except ProviderError as e:
    # e.status_code, e.retryable, e.public_message
    print(e.public_message)

Streaming (SSE)

stream() is an async generator that yields text chunks as they arrive — designed to feed directly into sse_stream():

from erns_shared.ai import get_ai_client
from erns_shared.http import SSEEvent, sse_stream
from fastapi import FastAPI

app = FastAPI()
client = get_ai_client(provider="anthropic", model="claude-sonnet-4-6")

@app.post("/chat")
async def chat(prompt: str):
    async def generate():
        async for chunk in client.stream(system="You are helpful.", user=prompt):
            yield SSEEvent(data={"text": chunk}, event="delta")
        yield SSEEvent(data="[DONE]", event="done")

    return sse_stream(generate())

Supported providers: anthropic, openai, google, ollama.


erns_shared.http

SSE streaming for FastAPI

Requires: uv add erns-shared[http].

from fastapi import FastAPI
from erns_shared.http import SSEEvent, sse_stream

app = FastAPI()

@app.post("/chat")
async def chat(prompt: str):
    async def generate():
        async for chunk in some_ai_stream(prompt):
            yield SSEEvent(data={"text": chunk}, event="delta")
        yield SSEEvent(data="[DONE]", event="done")

    return sse_stream(generate())

SSEEvent fields: data (auto-serialized to JSON for dicts/lists), event, id, retry.


Requirements

  • Python 3.11+
  • pydantic >= 2.0
  • pydantic-settings >= 2.0
  • boto3 >= 1.34
  • backoff >= 2.0
  • aws-lambda-powertools >= 2.0

Contributing

See GUIDE.md for setup instructions, project structure, conventions, and the release workflow.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

erns_shared-0.1.1.tar.gz (18.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

erns_shared-0.1.1-py3-none-any.whl (25.8 kB view details)

Uploaded Python 3

File details

Details for the file erns_shared-0.1.1.tar.gz.

File metadata

  • Download URL: erns_shared-0.1.1.tar.gz
  • Upload date:
  • Size: 18.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for erns_shared-0.1.1.tar.gz
Algorithm Hash digest
SHA256 6ce1b4e3601d7413b8748975dca0bb08cbbffad3a64c09952912bd55cd571a25
MD5 b0018cbec219667ff30d4d8f3544db7b
BLAKE2b-256 6e01e5f324be83d3f5c6c0bf4e8c2b9689de2a9164afca7b53c4568bdea6fe86

See more details on using hashes here.

File details

Details for the file erns_shared-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: erns_shared-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 25.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for erns_shared-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b7b2a1386be067c556c1165774bafcb33dc4e467476b07f7c28609cd35f60016
MD5 a3424cc32190f6cead7086ac2503dc5f
BLAKE2b-256 d7f023dac3eb97ca1084b94bacec0a028c61e7159a5b8cf432ab10a9fe4c882c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page