Skip to main content

The elegance of Airflow + the power of AWS

Project description

Orkestra

The elegance of Airflow + the power of AWS

Docs Codecov

PyPI PyPI - Downloads PyPI - License PyPI - Python Version GitHub issues Mentioned in Awesome CDK

examples/hello_orkestra.py

import random
from typing import *
from uuid import uuid4

from aws_lambda_powertools import Logger, Tracer
from pydantic import BaseModel

from orkestra import compose
from orkestra.interfaces import Duration


def dag():
    (
        generate_item
        >> add_price
        >> copy_item
        >> double_price
        >> (do_nothing, assert_false)
        >> say_hello
        >> [random_int, random_float]
        >> say_goodbye
    )


class Item(BaseModel):
    id: str
    name: str
    price: Optional[float] = None

    @classmethod
    def random(cls):
        return cls(
            id=str(uuid4()),
            name=random.choice(
                [
                    "potato",
                    "moon rock",
                    "hat",
                ]
            ),
        )


logger = Logger()

tracer = Tracer()


default_args = dict(
    enable_powertools=True,
    timeout=Duration.seconds(6),
)


@compose(**default_args)
def generate_item(event, context):
    logger.info("generating random item")
    item = Item.random().dict()
    logger.info(item)
    tracer.put_metadata("GenerateItem", "SUCCESS")
    return item


@compose(model=Item, **default_args)
def add_price(item: Item, context):
    price = 3.14
    logger.info(
        "adding price to item",
        extra={
            "item": item.dict(),
            "price": price,
        },
    )
    item.price = price
    return item.dict()


@compose(model=Item, **default_args)
def copy_item(item: Item, context) -> list:
    logger.info(item.dict())
    return [item.dict()] * 10


@compose(model=Item, is_map_job=True, **default_args)
def double_price(item: Item, context):
    item.price = item.price * 2
    return item.dict()


@compose(**default_args)
def assert_false(event, context):
    assert False


@compose(**default_args)
def do_nothing(event, context):
    logger.info({"doing": "nothing"})


@compose(**default_args)
def say_hello(event, context):
    return "hello, world"


@compose(**default_args)
def say_goodbye(event, context):
    return "goodbye"


@compose(**default_args)
def random_int(event, context):
    return random.randrange(100)


@compose(**default_args)
def random_float(event, context):
    return float(random_int(event, context))


dag()

app.py

#!/usr/bin/env python3
from aws_cdk import core as cdk

from examples.hello_orkestra import generate_item


class HelloOrkestra(cdk.Stack):
    def __init__(self, scope, id, **kwargs):

        super().__init__(scope, id, **kwargs)

        generate_item.schedule(
            self,
            expression="rate(5 minutes)",
            state_machine_name="hello_orkestra",
        )


app = cdk.App()


app.synth()

state machine

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

orkestra-0.9.4.tar.gz (15.6 kB view details)

Uploaded Source

Built Distribution

orkestra-0.9.4-py3-none-any.whl (11.6 kB view details)

Uploaded Python 3

File details

Details for the file orkestra-0.9.4.tar.gz.

File metadata

  • Download URL: orkestra-0.9.4.tar.gz
  • Upload date:
  • Size: 15.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.4.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.8.10

File hashes

Hashes for orkestra-0.9.4.tar.gz
Algorithm Hash digest
SHA256 b29d29fcf81acb4e176d0aeb8e5f38529ddb44da6e8f453d34a8e41b9c549bdf
MD5 3e3bad58aa2ebbfb602a4f624adad080
BLAKE2b-256 ea50e08ef9722c268590cee3ea0985b3bf2aeb14d7d3cda714130bfdc43cef21

See more details on using hashes here.

File details

Details for the file orkestra-0.9.4-py3-none-any.whl.

File metadata

  • Download URL: orkestra-0.9.4-py3-none-any.whl
  • Upload date:
  • Size: 11.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.4.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.8.10

File hashes

Hashes for orkestra-0.9.4-py3-none-any.whl
Algorithm Hash digest
SHA256 60f29d427fc9cc679b093a738358443db01b9146576533cefbfe184706c846db
MD5 4a85c70920cbbe9b3053cb969a96de3c
BLAKE2b-256 7442fc42aadefaa96b3f5c8001425e02bceba561271ca51f02d8d99cf696417c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page