Skip to main content

ETL-style data processing framework with composable flow nodes — sources, transformers, reducers, and combiners

Project description

Data Pipeline

ETL-style data processing framework with composable flow nodes — sources, transformers, reducers, and combiners.

Installation

pip install vcti-data-pipeline>=2.0.0

In pyproject.toml dependencies

dependencies = [
    "vcti-data-pipeline>=2.0.0",
]

Quick Start

import numpy as np
from vcti.pipeline import DataSource, DataTransformer, DataRecord, data_record

# 1. Define a source
class CsvSource(DataSource):
    def __init__(self, path: str, *, name: str | None = None) -> None:
        super().__init__(name=name)
        self.path = path

    def load(self) -> DataRecord:
        arr = np.genfromtxt(self.path, dtype=None, delimiter=",", names=True)
        return data_record(arr)

# 2. Define a transformer
class ScaleTransformer(DataTransformer):
    def __init__(self, factor: float, *, name: str | None = None) -> None:
        super().__init__(name=name)
        self.factor = factor

    def transform(self, record: DataRecord) -> DataRecord:
        scaled = record.load().copy()
        scaled["value"] *= self.factor
        return data_record(scaled, record.attributes)

# 3. Compose the flow (right-to-left)
source = CsvSource("data.csv", name="raw-csv")
scaler = ScaleTransformer(factor=2.0, name="2x-scaler")
flow = scaler.connect(source)

# 4. Execute
result = flow.execute()

Combining multiple sources

from vcti.pipeline import DataCombiner

ids = IdSource()
coords = CoordSource()

combiner = DataCombiner()
flow = combiner.connect(ids)
flow = flow.connect(coords)

combined = flow.execute()
# combined.load() has fields from both sources

Iterating over cases

from vcti.pipeline import ForEachFlow

flow = ForEachFlow(
    keys=CasesSource(reader),
    factory=lambda case_id: CaseDataSource(reader, case_id),
    key_field="ID",
)

for case_id, case_flow in flow:
    record = case_flow.execute()
    # process each case...

Flow Node Types

Type Inputs Purpose
DataSource 0 (leaf) Load data from external source
DataTransformer 1 Transform one record to another
DataReducer 1+ Combine multiple records into one
DataCombiner 1+ Merge arrays field-wise (extends DataReducer)
DataTarget 1 Persist record, pass through
ForEachFlow Iterate over keyed cases

Other types

Type Purpose
DataRecord Alias for DataNode — the data payload flowing between nodes
Column Frozen dataclass coupling column name with numpy dtype
FlowNode Abstract base for all flow nodes

Dependencies

  • numpy (>=1.24)
  • vcti-datanode (>=2.0.0) — DataNode (aliased as DataRecord), plus EagerDataSource / LazyDataSource

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vcti_data_pipeline-2.0.0.tar.gz (15.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vcti_data_pipeline-2.0.0-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file vcti_data_pipeline-2.0.0.tar.gz.

File metadata

  • Download URL: vcti_data_pipeline-2.0.0.tar.gz
  • Upload date:
  • Size: 15.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for vcti_data_pipeline-2.0.0.tar.gz
Algorithm Hash digest
SHA256 473ed2e39e143c4a5cf2e38c8c94c22cb009e982fcd4bf6fc92cbf099d4cc700
MD5 ca73cc135c770e830102f1e90f77006f
BLAKE2b-256 036349d724fb53a270895c7519c3103bc15e01370b5aa69c061a836a2f95f1cc

See more details on using hashes here.

Provenance

The following attestation bundles were made for vcti_data_pipeline-2.0.0.tar.gz:

Publisher: publish.yml on vcollab/vcti-python-data-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file vcti_data_pipeline-2.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for vcti_data_pipeline-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f6ca735d9d4d74d52b7f8aff9d0c444baa46862bf9b8f284b7c3f74d5836bfc8
MD5 abdd974ba22eb760cc46ecbf76234b21
BLAKE2b-256 b75a7a368e8d0a2d2f93ccb24e1ab81f2085deb5995e135580ae5d6c0a83fe7d

See more details on using hashes here.

Provenance

The following attestation bundles were made for vcti_data_pipeline-2.0.0-py3-none-any.whl:

Publisher: publish.yml on vcollab/vcti-python-data-pipeline

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page