Microframework for python aws lambdas
Project description
braincube-aws-core
Microframework for Python AWS lambdas.
Installation
pip install braincube-aws-core-alpha
Built With
- asyncpg - A fast PostgreSQL Database Client Library for Python/asyncio.
- pydantic - Data validation using Python type hints.
- pypika - Python Query Builder.
Application Controllers Example
import asyncio
from uuid import uuid4
from http import HTTPStatus
from core.rest.data import HTTPRequest, HTTPResponse
from core.rest.app_module import AppModule
from core.rest.app_controller import AppController
from pydantic import BaseModel
class AccountDto(BaseModel):
iban: str
bban: str
data = {
"a0a412d9-87ef-474b-9ac8-b682ec5e0fb3": AccountDto(iban="EUR27100777770209299700", bban="EURC12345612345678"),
"5ebc25bd-e152-4a70-b251-d68e43be581e": AccountDto(iban="GR27100777770209299700", bban="GRC12345612345678"),
}
app = AppController("/accounts")
@app.get("/{id}")
async def get_account(request: HTTPRequest) -> HTTPResponse:
account = data.get(request.path_parameters["id"])
return HTTPResponse(HTTPStatus.OK if account else HTTPStatus.NO_CONTENT, account)
@app.post()
async def create_account(request: HTTPRequest[AccountDto]) -> HTTPResponse:
data[uuid4()] = request.body
return HTTPResponse(HTTPStatus.CREATED)
loop = asyncio.get_event_loop()
module = AppModule([app])
def main(event, context):
return loop.run_until_complete(module.serve(event, context))
Dependency Injection Example
from core.di.injector import inject
from core.dal.postgres_connection import get_pool, Pool
@inject("data_warehouse_pool")
async def provide_warehouse_pool() -> Pool:
return await get_pool()
@inject(qualifier="pool:data_warehouse_pool")
class BankService:
def __init__(self, pool: Pool):
self._pool = pool
Postgres Repository Example
from core.rest.data import HTTPRequest
from core.utils.data import Order, OrderType
from core.dal.data import Key, Schema, Column, Relation, SimpleColumn, JoinType, JoinThrough, StatementField
from core.dal.postgres_connection import get_pool, Pool
from core.dal.postgres_repository import PostgresRepository
# schema definition
equities = Schema(
table="equities",
alias="e",
primary_key=["id"],
columns=[
Column("id", updatable=False, insertable=False),
Column("name"),
Column("type"),
Column("issuer_id", alias="issuerId"),
Column("industry_sector", alias="industrySector"),
Column("isin"),
Column("reference"),
Column("bloomberg_code", alias="bloombergCode"),
Column("market_symbol", alias="marketSymbol"),
Column("currency"),
Column("country", ),
Column("min_amount", alias="minAmount"),
],
statement_fields=[
StatementField("isTypeOne", statement="CASE WHEN e.type = 1 then True else False END")
],
order=[
Order(type=OrderType.asc, alias="name")
],
relations=[
Relation(
table="parties",
alias="p",
columns=[
SimpleColumn("name"),
SimpleColumn("short_name", alias="shortName"),
],
join_forced=False,
join_type=JoinType.left,
join_through=JoinThrough(from_column_name="issuer_id", to_column_name="id")
)
]
)
# repository definition
class EquitiesRepo(PostgresRepository):
def __init__(self, pool: Pool):
super().__init__(pool, equities)
# repository usage
request = HTTPRequest()
repo = EquitiesRepo(await get_pool())
await repo.find_by_pk(Key(request.path_parameters["id"]), request.query_parameters.fields)
await repo.exists_by_pk(Key("9448a57b-f686-4935-b152-566baab712db"))
await repo.find_one(
request.query_parameters.fields,
conditions=request.query_parameters.conditions,
order=request.query_parameters.order)
await repo.find_all(
request.query_parameters.fields,
conditions=request.query_parameters.conditions,
order=request.query_parameters.order)
await repo.find_all_by_pk(
[
Key("9448a57b-f686-4935-b152-566baab712db"),
Key("43c8ec37-9a59-44eb-be90-def391ba2f02")
],
aliases=request.query_parameters.fields,
order=request.query_parameters.order)
await repo.find_many(
request.query_parameters.fields,
conditions=request.query_parameters.conditions,
page=request.query_parameters.page,
order=request.query_parameters.order)
await repo.insert({
"name": "Bursa de Valori Bucuresti SA",
"type": 1,
"industrySector": 40,
"isin": "ROBVBAACNOR0",
"bloombergCode": "BBG000BBWMC5",
"marketSymbol": "BVB RO Equity",
"currency": "RON",
"country": "RO",
})
await repo.insert_bulk(
aliases=["name", "type", "industrySector", "isin", "bloombergCode", "marketSymbol", "currency", "country"],
data=[
["Bursa de Valori Bucuresti SA", 1, 40, "ROBVBAACNOR0", "BBG000BBWMC5", "BVB RO Equity", "RON", "RO"],
["Citigroup Inc", 1, 40, "US1729674242", "BBG000FY4S11", "C US Equity", "USD", "US"],
["Coca-Cola HBC AG", 1, 49, "CH0198251305", "BBG004HJV2T1", "EEE GA Equity", "EUR", "GR"],
]
)
await repo.update({
"type": 1,
"isin": 40,
}, request.query_parameters.conditions, request.query_parameters.fields)
await repo.update_by_pk(Key("9448a57b-f686-4935-b152-566baab712db"), {
"type": 1,
"isin": 40
})
await repo.delete(request.query_parameters.conditions, ["id", "name", "type"])
await repo.delete_by_pk(Key("9448a57b-f686-4935-b152-566baab712db"), ["id", "name", "type"])
await repo.fetch("SELECT * FROM equities WHERE type = $1 and isin = $2", [1, "TREEGYO00017"])
await repo.fetch_one("SELECT * FROM equities WHERE id = $1", ["2b67122a-f47e-41b1-b7f7-53be5ca381a0"])
await repo.execute("DELETE FROM equities WHERE id = $1", ["2b67122a-f47e-41b1-b7f7-53be5ca381a0"])
Query params format
fields=name, type, industrySector, isin, bloombergCode, parties_name, parties_shortName
type=1
isin=range(40, 49)
id=any(9448a57b-f686-4935-b152-566baab712db, 43c8ec37-9a59-44eb-be90-def391ba2f02)
page_no=1
page_size=50
top_size=50
order=name, id DESC
Local Development Requirements
To use the SAM CLI, you need the following tools.
Run server locally
# open ssh tunel
sudo sh ssh_tunnel_Analog_JBox.sh
# apply code changes to docker image
sam-api$ sam build
# start server locally on http://127.0.0.1:3000
sam-api$ sam local start-api --warm-containers EAGER
# or run function locally using event.json as parameter
sam-api$ sam local invoke ApiFunction --event events/event.json
Deploy to AWS
sam build --use-container
sam deploy --guided --profile analog_user --region eu-west-1
Build and deploy new package version using twine
python3 -m pip install --upgrade pip
python3 -m pip install --upgrade build
python3 -m pip install --upgrade twine
python3 -m build
twine upload --skip-existing dist/*
Resources
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for braincube-aws-core-alpha-0.0.30.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 794ff88862599d54d3cd763f539b6d90937ccd39baa577672e9bb78e855754c6 |
|
MD5 | 6dd54fc23b09287cca611351dfc1b154 |
|
BLAKE2b-256 | e5482d2c4b694d5e64e364c5cada55fafab93d8de1fba7df21545c57c0694684 |
Close
Hashes for braincube_aws_core_alpha-0.0.30-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2429f333bc52fdb400f33fbfc95264c4b246e8b9de3c54e268cef4dd328d33bb |
|
MD5 | 395a5288ad4522a879ae373e97fd9c85 |
|
BLAKE2b-256 | 272a6c567e5b9d8e992ebf896f0e8f3b6dec8cf1ac34dbbadf0a21b7c90a2cec |