No project description provided
Project description
AsyncGraphs
AsyncGraphs is a tiny ETL framework that leverages asyncio to make the execution more efficient.
Installation
pip install asyncgraphs
Basic usage
import asyncio
import datetime
from random import random
import pytz
from asyncgraphs import Graph, run
async def my_extract():
while True:
await asyncio.sleep(1)
yield {"timestamp": datetime.datetime.now(tz=pytz.UTC), "value": random()}
def my_transform(in_doc):
if in_doc["value"] < 0.5:
in_doc["value"] = None
return in_doc
class MyForwardFill:
def __init__(self):
self.last_value = None
def __call__(self, in_doc):
if in_doc["value"] is None:
in_doc["value"] = self.last_value
else:
self.last_value = in_doc["value"]
return in_doc
async def main():
g = Graph()
g | my_extract() | my_transform | MyForwardFill() | print
await run(g)
if __name__ == '__main__':
asyncio.run(main())
The example above shows some dummy extract/transform/load steps. In the example most are synchronous, but regular applications should use async libraries as often as possible.
Features
Typed
This library is typed. Checking the types of chained operations is also supported.
In the following example, the source outputs strings. Adding a transformer that expects an integer is thus not supported
This is indicated when using a type checker.
# tests/test_examples/typed.py
import asyncio
import random
import string
from typing import AsyncGenerator
from asyncgraphs import Graph, run
graph = Graph()
async def random_strings(
value_count: int, character_count: int
) -> AsyncGenerator[str, None]:
for i in range(value_count):
yield "".join(
random.choice(string.ascii_lowercase) for _ in range(character_count)
)
await asyncio.sleep(0)
def add_one(value: int) -> int:
return value + 1
def prefix_hello(value: str) -> str:
return f"Hello {value}"
async def main() -> None:
g = Graph()
g | random_strings(20, 5) \
| add_one \
| print
await run(g)
g = Graph()
g | random_strings(20, 5) \
| prefix_hello \
| print
await run(g)
if __name__ == "__main__":
asyncio.run(main())
$ mypy tests/test_examples/typed.py
tests/test_examples/typed.py:32: error: Unsupported operand types for | ("Source[str]" and "Callable[[int], int]") [operator]
Found 1 error in 1 file (checked 1 source file)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file asyncgraphs-0.0.3.tar.gz
.
File metadata
- Download URL: asyncgraphs-0.0.3.tar.gz
- Upload date:
- Size: 7.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ed82eb09e08417c2a61a46b83c9ee055bca739edbe648473919f76c097f0909d |
|
MD5 | ad7f650a0b2d590f5064c290fa21cf32 |
|
BLAKE2b-256 | adb15efd905e1cb4e53f3584003400fae5bcb0bd20cb633efe516805536615aa |
File details
Details for the file asyncgraphs-0.0.3-py3-none-any.whl
.
File metadata
- Download URL: asyncgraphs-0.0.3-py3-none-any.whl
- Upload date:
- Size: 5.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f101ebc5d365f99a33f44ac115c74429c67a41da2504c5638b5e6cd802057187 |
|
MD5 | be6ab657d70923df6757639874b08def |
|
BLAKE2b-256 | 8b018bb08601220b8ad789a2ba16c89ed7488793ec0aee4e1a0e59a1bf1b27bf |