Skip to main content

Framework for building pipelines for data processing

Project description

pipedata

Chained operations in Python, applied to data processing.

Installation

To install with all optional dependencies:

pip install pipedata[ops]

If you only want the core functionality (building pipelines), and not the data processing applications, then:

pip install pipedata

Examples

Chaining Data Operations

pipedata.ops provides some operations for streaming data through memory.

import json
import zipfile

import pyarrow.parquet as pq

from pipedata.core import Stream
from pipedata.ops import json_records, parquet_writer, zipped_files


data1 = [
    {"col1": 1, "col2": "Hello"},
    {"col1": 2, "col2": "world"},
]
data2 = [
    {"col1": 3, "col2": "!"},
]

with zipfile.ZipFile("test_input.json.zip", "w") as zipped:
    zipped.writestr("file1.json", json.dumps(data1))
    zipped.writestr("file2.json", json.dumps(data2))

result = (
    Stream(["test_input.json.zip"])
    .then(zipped_files)
    .then(json_records())
    .then(parquet_writer("test_output.parquet"))
    .to_list()
)

table = pq.read_table("test_output.parquet")
print(table.to_pydict())
#> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']}

Alternatively, you can construct the pipeline as a chain:

import pyarrow.parquet as pq

from pipedata.core import Chain, Stream
from pipedata.ops import json_records, parquet_writer, zipped_files

# Running this after input file created in above example
chain = (
    Chain()
    .then(zipped_files)
    .then(json_records())
    .then(parquet_writer("test_output_2.parquet"))
)
result = Stream(["test_input.json.zip"]).then(chain).to_list()
table = pq.read_table("test_output_2.parquet")
print(table.to_pydict())
#> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']}

Core Framework

The core framework provides the building blocks for chaining operations.

Running a stream:

from pipedata.core import Stream, ops


result = (
    Stream(range(10))
    .then(ops.filtering(lambda x: x % 2 == 0))
    .then(ops.mapping(lambda x: x ^ 2))
    .then(ops.batched(lambda x: x, 2))
    .to_list()
)
print(result)
#> [(2, 0), (6, 4), (10,)]

Creating a chain and then using it, this time using the pipe notation:

import json
from pipedata.core import Chain, Stream, ops


chain = (
    Chain()
    | ops.filtering(lambda x: x % 2 == 0)
    | ops.mapping(lambda x: x ^ 2)
    | ops.batched(lambda x: sum(x), 2)
)
print(Stream(range(10)).then(chain).to_list())
#> [2, 10, 10]
print(json.dumps(chain.get_counts(), indent=4))
#> [
#>     {
#>         "name": "_identity",
#>         "inputs": 10,
#>         "outputs": 10
#>     },
#>     {
#>         "name": "<lambda>",
#>         "inputs": 10,
#>         "outputs": 5
#>     },
#>     {
#>         "name": "<lambda>",
#>         "inputs": 5,
#>         "outputs": 5
#>     },
#>     {
#>         "name": "<lambda>",
#>         "inputs": 5,
#>         "outputs": 3
#>     }
#> ]

Similar Functionality

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipedata-0.4.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

pipedata-0.4-py3-none-any.whl (9.8 kB view details)

Uploaded Python 3

File details

Details for the file pipedata-0.4.tar.gz.

File metadata

  • Download URL: pipedata-0.4.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for pipedata-0.4.tar.gz
Algorithm Hash digest
SHA256 93960218c0f6b3afa576993475d8feac5216135af901a871566faed439154866
MD5 8edc952b801393718976998d295bc821
BLAKE2b-256 3d6d62edee1d846942036f1447ebe1b72bde84332f0613ea9c1b52a4d145fd83

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipedata-0.4.tar.gz:

Publisher: publish.yml on simw/pipedata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pipedata-0.4-py3-none-any.whl.

File metadata

  • Download URL: pipedata-0.4-py3-none-any.whl
  • Upload date:
  • Size: 9.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for pipedata-0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 1fd25a2d99a89818855baf625cfbbbc8675ee429e6ca2d02d24fd06b8b8c70e6
MD5 647c106fd28f6f03caba075954a5c7f2
BLAKE2b-256 7e809f211cc5001ce7e6b797f975e410d75133230f59846f196efa78b6a17f8e

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipedata-0.4-py3-none-any.whl:

Publisher: publish.yml on simw/pipedata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page