Skip to main content

No project description provided

Project description

AsyncGraphs

AsyncGraphs is a tiny ETL framework that leverages asyncio to make the execution more efficient.

Installation

pip install asyncgraphs

Basic usage

import asyncio
import datetime
from random import random

import pytz
from asyncgraphs import Graph, run


async def my_extract():
    while True:
        await asyncio.sleep(1)
        yield {"timestamp": datetime.datetime.now(tz=pytz.UTC), "value": random()}


def my_transform(in_doc):
    if in_doc["value"] < 0.5:
        in_doc["value"] = None
    return in_doc


class MyForwardFill:
    def __init__(self):
        self.last_value = None

    def __call__(self, in_doc):
        if in_doc["value"] is None:
            in_doc["value"] = self.last_value
        else:
            self.last_value = in_doc["value"]
        return in_doc


async def main():
    g = Graph()
    g | my_extract() | my_transform | MyForwardFill() | print
    await run(g)

if __name__ == '__main__':
    asyncio.run(main())

The example above shows some dummy extract/transform/load steps. In the example most are synchronous, but regular applications should use async libraries as often as possible.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncgraphs-0.0.2.tar.gz (5.4 kB view hashes)

Uploaded Source

Built Distribution

asyncgraphs-0.0.2-py3-none-any.whl (4.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page