The elegance of Airflow + the power of AWS
Project description
examples/hello_orkestra.py
import random
from typing import *
from uuid import uuid4
from aws_lambda_powertools import Logger, Tracer
from pydantic import BaseModel
from orkestra import compose
from orkestra.interfaces import Duration
def dag():
(
generate_item
>> add_price
>> copy_item
>> double_price
>> (do_nothing, assert_false)
>> say_hello
>> [random_int, random_float]
>> say_goodbye
)
class Item(BaseModel):
id: str
name: str
price: Optional[float] = None
@classmethod
def random(cls):
return cls(
id=str(uuid4()),
name=random.choice(
[
"potato",
"moon rock",
"hat",
]
),
)
logger = Logger()
tracer = Tracer()
default_args = dict(
enable_powertools=True,
timeout=Duration.seconds(6),
)
@compose(**default_args)
def generate_item(event, context):
logger.info("generating random item")
item = Item.random().dict()
logger.info(item)
tracer.put_metadata("GenerateItem", "SUCCESS")
return item
@compose(model=Item, **default_args)
def add_price(item: Item, context):
price = 3.14
logger.info(
"adding price to item",
extra={
"item": item.dict(),
"price": price,
},
)
item.price = price
return item.dict()
@compose(model=Item, **default_args)
def copy_item(item: Item, context) -> list:
logger.info(item.dict())
return [item.dict()] * 10
@compose(model=Item, is_map_job=True, **default_args)
def double_price(item: Item, context):
item.price = item.price * 2
return item.dict()
@compose(**default_args)
def assert_false(event, context):
assert False
@compose(**default_args)
def do_nothing(event, context):
logger.info({"doing": "nothing"})
@compose(**default_args)
def say_hello(event, context):
return "hello, world"
@compose(**default_args)
def say_goodbye(event, context):
return "goodbye"
@compose(**default_args)
def random_int(event, context):
return random.randrange(100)
@compose(**default_args)
def random_float(event, context):
return float(random_int(event, context))
dag()
app.py
#!/usr/bin/env python3
from aws_cdk import core as cdk
from examples.hello_orkestra import generate_item
class HelloOrkestra(cdk.Stack):
def __init__(self, scope, id, **kwargs):
super().__init__(scope, id, **kwargs)
generate_item.schedule(
self,
expression="rate(5 minutes)",
state_machine_name="hello_orkestra",
)
app = cdk.App()
app.synth()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
orkestra-0.9.4.tar.gz
(15.6 kB
view details)
Built Distribution
orkestra-0.9.4-py3-none-any.whl
(11.6 kB
view details)
File details
Details for the file orkestra-0.9.4.tar.gz
.
File metadata
- Download URL: orkestra-0.9.4.tar.gz
- Upload date:
- Size: 15.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.4.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b29d29fcf81acb4e176d0aeb8e5f38529ddb44da6e8f453d34a8e41b9c549bdf |
|
MD5 | 3e3bad58aa2ebbfb602a4f624adad080 |
|
BLAKE2b-256 | ea50e08ef9722c268590cee3ea0985b3bf2aeb14d7d3cda714130bfdc43cef21 |
File details
Details for the file orkestra-0.9.4-py3-none-any.whl
.
File metadata
- Download URL: orkestra-0.9.4-py3-none-any.whl
- Upload date:
- Size: 11.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.4.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 60f29d427fc9cc679b093a738358443db01b9146576533cefbfe184706c846db |
|
MD5 | 4a85c70920cbbe9b3053cb969a96de3c |
|
BLAKE2b-256 | 7442fc42aadefaa96b3f5c8001425e02bceba561271ca51f02d8d99cf696417c |