Skip to main content

Library used for chaining together synchronous and asynchronous functions for data transformation

Project description

aChain

Library used for chaining together synchronous and asynchronous functions for data transformation

What does this do?

The achain.Chain class allows for the construction of chained synchronous and asynchronous functions

Example:

import typing
import json
from datetime import datetime, timedelta

from achain import Chain

SERVICE_URL = "https://www.service.com/api"
"""The address of some service containing data of interest"""

async def get_data(url, **kwargs) -> str:
    """
    Gets raw text from the given URL with the given query parameters
    """
    ...

def normalize_data(
    data: typing.Dict[str, typing.Any]
) -> typing.Dict[str, typing.Dict[str, typing.Any]]:
    """
    Transforms the passed data into one that is easier to deserialize
    """
    ...

class ExampleClass:
    """
    An example of a class that may be constructed from the generated data
    """
    def __init__(self, **kwargs):
        """Constructor"""
        ...

async def main():
    """
    Create lists of remote data
    """
    # Declare your chain
    chain: Chain[typing.List[ExampleClass]] = Chain(
        get_data,
        url=SERVICE_URL
    ).then(
        json.loads
    ).then(
        normalize_data
    ).then(
        lambda data: [ExampleClass(**values) for values in data.values()]
    )
    
    # Call Asynchronously
    yesterdays_data: typing.List[ExampleClass] = await chain(
        start=datetime.now() - timedelta(hours=48),
        end=datetime.now() - timedelta(hours=24)
    )

    # Call Synchronously
    todays_data: typing.List[ExampleClass] = chain.execute_synchronously(
        start=datetime.now() - timedelta(hours=24),
        end=datetime.now
    )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

achain-0.1.1.tar.gz (15.9 kB view hashes)

Uploaded Source

Built Distribution

achain-0.1.1-py3-none-any.whl (15.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page