Skip to main content

No project description provided

Project description

AsyncGraphs

AsyncGraphs is a tiny ETL framework that leverages asyncio to make the execution more efficient.

Installation

pip install asyncgraphs

Basic usage

import asyncio
import datetime
from random import random

import pytz
from asyncgraphs import Graph, run


async def my_extract():
    while True:
        await asyncio.sleep(1)
        yield {"timestamp": datetime.datetime.now(tz=pytz.UTC), "value": random()}


def my_transform(in_doc):
    if in_doc["value"] < 0.5:
        in_doc["value"] = None
    return in_doc


class MyForwardFill:
    def __init__(self):
        self.last_value = None

    def __call__(self, in_doc):
        if in_doc["value"] is None:
            in_doc["value"] = self.last_value
        else:
            self.last_value = in_doc["value"]
        return in_doc


async def main():
    g = Graph()
    g | my_extract() | my_transform | MyForwardFill() | print
    await run(g)

if __name__ == '__main__':
    asyncio.run(main())

The example above shows some dummy extract/transform/load steps. In the example most are synchronous, but regular applications should use async libraries as often as possible.

Features

Typed

This library is typed. Checking the types of chained operations is also supported.

In the following example, the source outputs strings. Adding a transformer that expects an integer is thus not supported

This is indicated when using a type checker.

# tests/test_examples/typed.py

import asyncio
import random
import string
from typing import AsyncGenerator

from asyncgraphs import Graph, run

graph = Graph()


async def random_strings(
    value_count: int, character_count: int
) -> AsyncGenerator[str, None]:
    for i in range(value_count):
        yield "".join(
            random.choice(string.ascii_lowercase) for _ in range(character_count)
        )
        await asyncio.sleep(0)


def add_one(value: int) -> int:
    return value + 1


def prefix_hello(value: str) -> str:
    return f"Hello {value}"


async def main() -> None:
    g = Graph()
    g | random_strings(20, 5) \
      | add_one \
      | print
    await run(g)

    g = Graph()
    g | random_strings(20, 5) \
      | prefix_hello \
      | print
    await run(g)


if __name__ == "__main__":
    asyncio.run(main())
$ mypy tests/test_examples/typed.py
tests/test_examples/typed.py:32: error: Unsupported operand types for | ("Source[str]" and "Callable[[int], int]")  [operator]
Found 1 error in 1 file (checked 1 source file)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncgraphs-0.0.3.tar.gz (7.3 kB view details)

Uploaded Source

Built Distribution

asyncgraphs-0.0.3-py3-none-any.whl (5.1 kB view details)

Uploaded Python 3

File details

Details for the file asyncgraphs-0.0.3.tar.gz.

File metadata

  • Download URL: asyncgraphs-0.0.3.tar.gz
  • Upload date:
  • Size: 7.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.2

File hashes

Hashes for asyncgraphs-0.0.3.tar.gz
Algorithm Hash digest
SHA256 ed82eb09e08417c2a61a46b83c9ee055bca739edbe648473919f76c097f0909d
MD5 ad7f650a0b2d590f5064c290fa21cf32
BLAKE2b-256 adb15efd905e1cb4e53f3584003400fae5bcb0bd20cb633efe516805536615aa

See more details on using hashes here.

File details

Details for the file asyncgraphs-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: asyncgraphs-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 5.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.2

File hashes

Hashes for asyncgraphs-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 f101ebc5d365f99a33f44ac115c74429c67a41da2504c5638b5e6cd802057187
MD5 be6ab657d70923df6757639874b08def
BLAKE2b-256 8b018bb08601220b8ad789a2ba16c89ed7488793ec0aee4e1a0e59a1bf1b27bf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page