Skip to main content

Functional-style Streams library for processing collections. Supports querying files (json, toml, yaml, xml, csv, tsv) - as well as creating and updating them. Provides easy integration with itertools

Project description

Pyrio

PYRIO

Python 3.x tests codecov docs PyPI Downloads


Functional-style Streams API library

Facilitates processing of collections and iterables using fluent APIs.
Gives access to files of various types (json, toml, yaml, xml, csv and tsv) for reading and executing complex queries
Provides easy integration with itertools
(NB: Commonly used itertools 'recipes' are included as part of the main APIs)

How to use

Creating streams

  • stream from iterable
Stream([1, 2, 3])
  • from variadic arguments
Stream.of(1, 2, 3)
  • empty stream
Stream.empty()
  • infinite ordered stream
Stream.iterate(0, lambda x: x + 1)
  • infinite unordered stream
import random

Stream.generate(lambda: random.random())
  • infinite stream with given value
Stream.constant(42)
  • concat
    (concatenate new streams/iterables with the current one)
Stream.of(1, 2, 3).concat(Stream.of(4, 5)).to_list()
Stream([1, 2, 3]).concat([5, 6]).to_list()
  • prepend
    (prepend new stream/iterable to the current one)
Stream([2, 3, 4]).prepend(0, 1).to_list()
Stream.of(3, 4, 5).prepend(Stream.of([0, 1], 2)).to_list()

Intermediate operations

  • filter
Stream([1, 2, 3]).filter(lambda x: x % 2 == 0)
  • map
Stream([1, 2, 3]).map(str).to_list()
Stream([1, 2, 3]).map(lambda x: x + 5).to_list()
  • filter_map
    (filter out all None or falsy values (if falsy=True) and applies mapper function to the elements of the stream)
Stream.of(None, "foo", "", "bar", 0, []).filter_map(str.upper, falsy=True).to_list()
["FOO", "BAR"]
  • flat_map
    (map each element of the stream and yields the elements of the produced iterators)
Stream([[1, 2], [3, 4], [5]]).flat_map(lambda x: Stream(x)).to_list()
[1, 2, 3, 4, 5]
  • flatten
Stream([[1, 2], [3, 4], [5]]).flatten().to_list()
[1, 2, 3, 4, 5]
  • reduce
    (returns Optional)
Stream([1, 2, 3]).reduce(lambda acc, val: acc + val, identity=3).get()
  • peek
    (perform the provided operation on each element of the stream without consuming it)
(Stream([1, 2, 3, 4])
    .filter(lambda x: x > 2)
    .peek(lambda x: print(f"{x} ", end=""))
    .map(lambda x: x * 20)
    .to_list())
  • view
    (provides access to a selected part of the stream)
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).view(start=1, stop=-3, step=2).to_list()
[2, 4, 6]
  • distinct
    (returns a stream with the distinct elements of the current one)
Stream([1, 1, 2, 2, 2, 3]).distinct().to_list()
  • skip
    (discards the first n elements of the stream and returns a new stream with the remaining ones)
Stream.iterate(0, lambda x: x + 1).skip(5).limit(5).to_list()
  • limit / head
    (returns a stream with the first n elements, or fewer if the underlying iterator ends sooner)
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).limit(3).to_tuple()
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).head(3).to_tuple()
  • tail
    (returns a stream with the last n elements, or fewer if the underlying iterator ends sooner)
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).tail(3).to_tuple()
  • take_while
    (returns a stream that yields elements based on a predicate)
Stream.of(1, 2, 3, 4, 5, 6, 7, 2, 3).take_while(lambda x: x < 5).to_list()
[1, 2, 3, 4]
  • drop_while
    (returns a stream that skips elements based on a predicate and yields the remaining ones)
Stream.of(1, 2, 3, 5, 6, 7, 2).drop_while(lambda x: x < 5).to_list()
[5, 6, 7, 2]
  • sorted
    (sorts the elements of the current stream according to natural order or based on the given comparator;
    if 'reverse' flag is True, the elements are sorted in descending order)
(Stream.of((3, 30), (2, 30), (2, 20), (1, 20), (1, 10))
    .sorted(lambda x: (x[0], x[1]), reverse=True)
    .to_list())
[(3, 30), (2, 30), (2, 20), (1, 20), (1, 10)]

Terminal operations

Collectors

  • collecting result into list, tuple, set
Stream([1, 2, 3]).to_list()
Stream([1, 2, 3]).to_tuple()
Stream([1, 2, 3]).to_set()
  • into dict
class Foo:
    def __init__(self, name, num):
        self.name = name
        self.num = num
        
Stream([Foo("fizz", 1), Foo("buzz", 2)]).to_dict(lambda x: (x.name, x.num))
{"fizz": 1, "buzz": 2}

In the case of a collision (duplicate keys) the 'merger' functions indicates which entry should be kept

collection = [Foo("fizz", 1), Foo("fizz", 2), Foo("buzz", 2)]
Stream(collection).to_dict(collector=lambda x: (x.name, x.num), merger=lambda old, new: old)
{"fizz": 1, "buzz": 2}

to_dict method also supports creating dictionaries from dict Item objects

first_dict = {"x": 1, "y": 2}
second_dict = {"p": 33, "q": 44, "r": None}
Stream(first_dict).concat(Stream(second_dict)).to_dict(lambda x: Item(x.key, x.value or 0)) 
{"x": 1, "y": 2, "p": 33, "q": 44, "r": 0}

e.g. you could combine streams of dicts by writing:

Stream(first_dict).concat(Stream(second_dict)).to_dict(lambda x: x) 
  • alternative for working with collectors is using the collect method
Stream([1, 2, 3]).collect(tuple)
Stream.of(1, 2, 3).collect(list)
Stream.of(1, 1, 2, 2, 2, 3).collect(set)
Stream.of(1, 2, 3, 4).collect(dict, lambda x: (str(x), x * 10))
  • grouping
Stream("AAAABBBCCD").group_by(collector=lambda key, grouper: (key, len(grouper)))
{"A": 4, "B": 3, "C": 2, "D": 1}
coll = [Foo("fizz", 1), Foo("fizz", 2), Foo("fizz", 3), Foo("buzz", 2), Foo("buzz", 3), Foo("buzz", 4), Foo("buzz", 5)]
Stream(coll).group_by(
    classifier=lambda obj: obj.name,
    collector=lambda key, grouper: (key, [(obj.name, obj.num) for obj in list(grouper)]))
{
  "fizz": [("fizz", 1), ("fizz", 2), ("fizz", 3)],
  "buzz": [("buzz", 2), ("buzz", 3), ("buzz", 4), ("buzz", 5)],
}

Other terminal operations

  • for_each
Stream([1, 2, 3, 4]).for_each(lambda x: print(f"{'#' * x} ", end=""))
  • count
    (returns the count of elements in the stream)
Stream([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).count()
  • sum
Stream.of(1, 2, 3, 4).sum() 
  • find_first
    (search for an element of the stream that satisfies a predicate, returns an Optional with the first found value, if any, or None)
Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_first().get()
  • find_any
    (search for an element of the stream that satisfies a predicate, returns an Optional with some of the found values, if any, or None)
Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_any().get()
  • any_match
    (returns whether any elements of the stream match the given predicate)
Stream.of(1, 2, 3, 4).any_match(lambda x: x > 2)
  • all_match
    (returns whether all elements of the stream match the given predicate)
Stream.of(1, 2, 3, 4).all_match(lambda x: x > 2)
  • none_match
    (returns whether no elements of the stream match the given predicate)
Stream.of(1, 2, 3, 4).none_match(lambda x: x < 0)
  • min
    (returns Optional with the minimum element of the stream)
Stream.of(2, 1, 3, 4).min().get()
  • max
    (returns Optional with the maximum element of the stream)
Stream.of(2, 1, 3, 4).max().get()
  • compare_with
    (compares linearly the contents of two streams based on a given comparator)
fizz = Foo("fizz", 1)
buzz = Foo("buzz", 2)
Stream([buzz, fizz]).compare_with(Stream([fizz, buzz]), lambda x, y: x.num == y.num)
  • quantify
    (count how many of the elements are Truthy or evaluate to True based on a given predicate)
Stream([2, 3, 4, 5, 6]).quantify(predicate=lambda x: x % 2 == 0)

Itertools integration

Invoke use method by passing the itertools function and it's arguments as **kwargs

import itertools
import operator

Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).use(itertools.islice, start=3, stop=8)
Stream.of(1, 2, 3, 4, 5).use(itertools.accumulate, func=operator.mul).to_list()
Stream(range(3)).use(itertools.permutations, r=3).to_list()

Itertools 'recipes'

Invoke the 'recipes' described here as stream methods and pass required key-word arguments

Stream([1, 2, 3]).ncycles(count=2).to_list()
Stream.of(2, 3, 4).take_nth(10, default=66).get()
Stream(["ABC", "D", "EF"]).round_robin().to_list()

FileStreams

Querying files

  • working with json, toml, yaml, xml files
    NB: FileStream reads data as series of Item objects from underlying dict_items view
FileStream("path/to/file").map(lambda x: f"{x.key}=>{x.value}").to_tuple()
(
  "abc=>xyz", 
  "qwerty=>42",
)
from operator import attrgetter
from pyrio import Item

(FileStream("path/to/file")
    .filter(lambda x: "a" in x.key)
    .map(lambda x: Item(x.key, sum(x.value) * 10))
    .sorted(attrgetter("value"), reverse=True)
    .map(lambda x: f"{str(x.value)}::{x.key}")
    .to_list()) 
["230::xza", "110::abba", "30::a"]
  • querying csv and tsv files
    (each row is read as a dict with keys taken from the header)
FileStream("path/to/file").map(lambda x: f"fizz: {x['fizz']}, buzz: {x['buzz']}").to_tuple() 
(
  "fizz: 42, buzz: 45",
  "fizz: aaa, buzz: bbb",
)
from operator import itemgetter

FileStream("path/to/file").map(itemgetter('fizz', 'buzz')).to_tuple()
(('42', '45'), ('aaa', 'bbb'))

You could query the nested dicts by creating streams out of them

(FileStream("path/to/file")
    .map(lambda x: (Stream(x).to_dict(lambda y: Item(y.key, y.value or "Unknown"))))
    .save())
  • reading a file with process() method
    • use extra f_open_options (for the underlying open file function)
    • f_read_options (to be passed to the corresponding library function that is loading the file content e.g. tomllib, json)
from decimal import Decimal

(FileStream.process(
    file_path="path/to/file.json", 
    f_open_options={"encoding": "utf-8"}, 
    f_read_options={"parse_float": Decimal})
 .map(lambda x:x.value).to_list())
['foo', True, Decimal('1.22'), Decimal('5.456367654369698986')]

To include the root tag when loading an .xml file pass 'include_root=True'

FileStream.process("path/to/custom_root.xml", include_root=True).map(
    lambda x: f"root={x.key}: inner_records={str(x.value)}"
).to_list()
["root=custom-root: inner_records={'abc': 'xyz', 'qwerty': '42'}"]

Saving to a file

  • write the contents of a FileStream by passing a file_path to the save() method
in_memory_dict = Stream(json_dict).filter(lambda x: len(x.key) < 6).to_tuple()
FileStream("path/to/file.json").prepend(in_memory_dict).save("./tests/resources/updated.json")

If no path is given, the source file for the FileStream will be updated

FileStream("path/to/file.json").concat(in_memory_dict).save()

NB: if while updating the file something goes wrong, the original content will be restored/preserved

  • handle null values
    (pass null_handler function to replace null values)
FileStream("path/to/test.toml").save(null_handler=lambda x: Item(x.key, x.value or "N/A"))

NB: useful for writing .toml files which don't allow None values

  • passing advanced file open and write options
    similarly to the process method you could provide
    • f_open_options (for the underlying open function)
    • f_write_options (passed to the corresponding library that will 'dump' the contents of the stream e.g. tomli-w, pyyaml)
FileStream("path/to/file.json").concat(in_memory_dict).save(
    file_path="merged.xml",
    f_open_options={"encoding": "utf-8"},
    f_write_options={"indent": 4},
)

To add custom root tag when saving an .xml file pass 'xml_root="my-custom-root"'

FileStream("path/to/file.json").concat(in_memory_dict).save(
    file_path="path/to/custom.xml",
    f_open_options={"encoding": "utf-8"},
    f_write_options={"indent": 4},
    xml_root="my-custom-root",
)

  • how far can we actually push it?
(
    FileStream("path/to/file.csv")
    .concat(
        FileStream("path/to/other/file.json")
        .filter(
            lambda x: (
                Stream(x.value)
                .find_first(lambda y: y.key == "name" and y.value != "Snake")
                .or_else_get(lambda: None)
            )
            is not None
        )
        .map(lambda x: x.value)
    )
    .map(lambda x: (Stream(x).to_dict(lambda y: Item(y.key, y.value or "N/A"))))
    .save("path/to/third/file.tsv")
)

or how hideous can it get?

Chubby

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrio-1.3.2.tar.gz (41.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrio-1.3.2-py3-none-any.whl (18.1 kB view details)

Uploaded Python 3

File details

Details for the file pyrio-1.3.2.tar.gz.

File metadata

  • Download URL: pyrio-1.3.2.tar.gz
  • Upload date:
  • Size: 41.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.3 Linux/6.8.0-48-generic

File hashes

Hashes for pyrio-1.3.2.tar.gz
Algorithm Hash digest
SHA256 55270e4a00009510c4a2306dbf81b30eaed00312000a274af50a079867e4e62a
MD5 2457313cd311e4b295c8814007ec8030
BLAKE2b-256 e3771edd6fad7ff4010d24b22201689fbdd54371b7a73206ea56b9a9ab0391c1

See more details on using hashes here.

File details

Details for the file pyrio-1.3.2-py3-none-any.whl.

File metadata

  • Download URL: pyrio-1.3.2-py3-none-any.whl
  • Upload date:
  • Size: 18.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.3 Linux/6.8.0-48-generic

File hashes

Hashes for pyrio-1.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cdfd01bbf8b180e20348cb089bf0fade31abe3902a58486bd5efc06c023167a6
MD5 7aa7199c7b668734fdf358f70ce0e510
BLAKE2b-256 3bf1e50d5f01638279ce28aa88fd7483eaf82bc522cd5c6e8c43e7cb36060283

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page