Skip to main content

convtools allows to define and reuse conversions for processing collections and csv tables, complex aggregations and joins.

Project description

convtools is a python library to declaratively define data transforms:

  • convtools.conversion - pipelines for processing collections, doing complex aggregations and joins.

  • convtools.contrib.tables - stream processing of table-like data (e.g. CSV)

https://img.shields.io/pypi/pyversions/convtools.svg https://img.shields.io/github/license/westandskif/convtools.svg https://codecov.io/gh/westandskif/convtools/branch/master/graph/badge.svg Tests Status Documentation Status https://img.shields.io/github/tag/westandskif/convtools.svg https://badge.fury.io/py/convtools.svg Twitter URL

Docs

Why would you need this?

  • you prefer declarative approach

  • you love functional programming

  • you believe that Python is high-level enough not to make you write aggregations and joins by hand

  • you need to serialize/validate objects

  • you need to dynamically define transforms (including at runtime)

  • you like the idea of having something write ad hoc code for you :)

Installation:

pip install convtools

Conversions - data transforms, aggregations, joins

# pip install convtools

from convtools import conversion as c

input_data = [{"StoreID": " 123", "Quantity": "123"}]

# define a conversion (sometimes you may want to do this dynamically)
#  takes iterable and returns iterable of dicts, stopping before the first
#  one with quantity >= 1000, splitting into chunks of size = 1000
conversion = (
    c.iter(
        {
            "id": c.item("StoreID").call_method("strip"),
            "quantity": c.item("Quantity").as_type(int),
        }
    )
    .take_while(c.item("quantity") < 1000)
    .pipe(
        c.chunk_by(c.item("id"), size=1000)
    )
    .as_type(list)
    .gen_converter(debug=True)
)

# compile the conversion into an ad hoc function and run it
converter = conversion.gen_converter()
converter(input_data)

# OR in case of a one-shot use
conversion.execute(input_data)
from convtools import conversion as c


def test_doc__index_intro():

    # ======== #
    # GROUP BY #
    # ======== #
    input_data = [
        {"a": 5, "b": "foo"},
        {"a": 10, "b": "foo"},
        {"a": 10, "b": "bar"},
        {"a": 10, "b": "bar"},
        {"a": 20, "b": "bar"},
    ]

    conv = (
        c.group_by(c.item("b"))
        .aggregate(
            {
                "b": c.item("b"),
                "a_first": c.ReduceFuncs.First(c.item("a")),
                "a_max": c.ReduceFuncs.Max(c.item("a")),
            }
        )
        .gen_converter(debug=True)
    )

    assert conv(input_data) == [
        {"b": "foo", "a_first": 5, "a_max": 10},
        {"b": "bar", "a_first": 10, "a_max": 20},
    ]

    # ========= #
    # AGGREGATE #
    # ========= #
    conv = c.aggregate(
        {
            # list of "a" values where "b" equals to "bar"
            "a": c.ReduceFuncs.Array(c.item("a"), where=c.item("b") == "bar"),
            # "b" value of a row where "a" has Max value
            "b": c.ReduceFuncs.MaxRow(
                c.item("a"),
            ).item("b", default=None),
        }
    ).gen_converter(debug=True)

    assert conv(input_data) == {"a": [10, 10, 20], "b": "bar"}

    # ==== #
    # JOIN #
    # ==== #
    collection_1 = [
        {"id": 1, "name": "Nick"},
        {"id": 2, "name": "Joash"},
        {"id": 3, "name": "Bob"},
    ]
    collection_2 = [
        {"ID": "3", "age": 17, "country": "GB"},
        {"ID": "2", "age": 21, "country": "US"},
        {"ID": "1", "age": 18, "country": "CA"},
    ]
    input_data = (collection_1, collection_2)

    conv = (
        c.join(
            c.item(0),
            c.item(1),
            c.and_(
                c.LEFT.item("id") == c.RIGHT.item("ID").as_type(int),
                c.RIGHT.item("age") >= 18,
            ),
            how="left",
        )
        .pipe(
            c.list_comp(
                {
                    "id": c.item(0, "id"),
                    "name": c.item(0, "name"),
                    "age": c.item(1, "age", default=None),
                    "country": c.item(1, "country", default=None),
                }
            )
        )
        .gen_converter(debug=True)
    )

    assert conv(input_data) == [
        {"id": 1, "name": "Nick", "age": 18, "country": "CA"},
        {"id": 2, "name": "Joash", "age": 21, "country": "US"},
        {"id": 3, "name": "Bob", "age": None, "country": None},
    ]

What reducers are supported by aggregations?

Built-in ones, exposed like c.ReduceFuncs.Sum:
  • Sum

  • SumOrNone

  • Max

  • MaxRow

  • Min

  • MinRow

  • Count

  • CountDistinct

  • First

  • Last

  • Average

  • Median

  • Percentile - c.ReduceFuncs.Percentile(95.0, c.item("x"))

  • Mode

  • TopK - c.ReduceFuncs.TopK(3, c.item("x"))

  • Array

  • ArrayDistinct

  • ArraySorted - c.ReduceFuncs.ArraySorted(c.item("x"), key=lambda v: v, reverse=True)

  • Dict - c.ReduceFuncs.Dict(c.item("key"), c.item("x"))

  • DictArray

  • DictSum

  • DictSumOrNone

  • DictMax

  • DictMin

  • DictCount

  • DictCountDistinct

  • DictFirst

  • DictLast

and any reduce function of two arguments you pass in c.reduce.

Contrib / Table - stream processing of table-like data

Table helper allows to massage CSVs and table-like data:
  • join / zip / chain tables

  • take / drop / rename columns

  • filter rows

  • update / update_all values

from convtools.contrib.tables import Table
from convtools import conversion as c

# reads Iterable of rows
(
    Table.from_rows([(0, -1), (1, 2)], header=["a", "b"]).join(
        Table
        # reads tab-separated CSV file
        .from_csv(
            "tests/csvs/ac.csv",
            header=True,
            dialect=Table.csv_dialect(delimiter="\t"),
        )
        # transform column values
        .update(
            a=c.col("a").as_type(float),
            c=c.col("c").as_type(int),
        )
        # filter rows by condition
        .filter(c.col("c") >= 0),
        # joins on column "a" values
        on=["a"],
        how="inner",
    )
    # rearrange columns
    .take(..., "a")
    # this is a generator to consume (tuple, list are supported too)
    .into_iter_rows(dict)
)

Is it any different from tools like Pandas / Polars?

  • convtools doesn’t wrap data in any container, it just writes and runs the code which perform the conversion you defined

  • convtools is a lightweight library with no dependencies (however optional black is highly recommended for pretty-printing generated code when debugging)

  • convtools is about defining and reusing conversions – declarative approach, while wrapping data in high-performance containers is more of being imperative

  • convtools supports nested aggregations

Is this thing debuggable?

Despite being compiled at runtime, it is (by both pdb and pydevd).

Docs

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

convtools-0.35.0.tar.gz (55.2 kB view hashes)

Uploaded Source

Built Distribution

convtools-0.35.0-py3-none-any.whl (75.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page