Skip to main content

A data pipeline library: YAML/JSON-configured DAGs of typed dict transformations.

Project description

braided

A typed-dict pipeline library for Python. Define transformation DAGs in code or YAML/JSON, get automatic type checking, lazy evaluation, and optional caching.

Contents

Install

pip install braided

Optional extras:

Extra What it unlocks
braided[datasets] HuggingFace Datasets map/cache backend (braided.integrations.hf_datasets)
braided[torch] Tensor-safe pickle serialization in Cache
braided[jaxtyping] Array dtype/shape awareness in the type checker
braided[dev] pytest, pyright, and build tools

Quickstart

from typing import Iterator, TypedDict

from braided import Node, NodeSpec, SequenceInput, execute_pipeline, strand

class Record(TypedDict):
    x: int

@strand
def double(item: Record) -> Record:
    return Record(x=item["x"] * 2)

@strand.one_to_many
def up_to(item: Record) -> Iterator[Record]:
    for i in range(item["x"]):
        yield Record(x=i)

nodes: NodeSpec[Record] = {
    "out": Node(function=up_to, args=["doubled"]),
    "doubled": Node(function=double, args=["seed"]),
}
result = execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=3)])})
print(list(result))
# [{"x": 0}, {"x": 1}, {"x": 2}, {"x": 3}, {"x": 4}, {"x": 5}]

Strand kinds

Decorator Input → Output Use for
@strand T → T' one-to-one row transforms
@strand.one_to_many T → Iterator[T'] splitting or expanding rows
@strand.many_to_many Sequence[T] → Iterator[T'] aggregations, joins, reordering

A strand can take multiple input sequences by declaring multiple parameters. For @strand and @strand.one_to_many, inputs are aligned by position (zipped); for @strand.many_to_many, they are passed as separate sequences:

class Pair(TypedDict):
    a: int
    b: int

@strand
def zip_add(left: Record, right: Record) -> Pair:
    return Pair(a=left["x"], b=right["x"])

nodes: NodeSpec[Pair] = {
    "out": Node(function=zip_add, args=["left", "right"]),
}
result = execute_pipeline(nodes, {
    "left": SequenceInput[Record]([Record(x=1), Record(x=2)]),
    "right": SequenceInput[Record]([Record(x=10), Record(x=20)]),
})
print(list(result))  # [{"a": 1, "b": 10}, {"a": 2, "b": 20}]

Class-based strands inherit from Strand[T].OneToOne(), .OneToMany(), or .ManyToMany(). They can take constructor parameters:

from braided import Strand

class Scale(Strand[Record].OneToOne()):
    def __init__(self, factor: int) -> None:
        self.factor = factor

    def __call__(self, item: Record) -> Record:
        return Record(x=item["x"] * self.factor)

Custom inputs

Pipelines receive data through PipelineInput subclasses. SequenceInput wraps an in-memory list. For other data sources — files, databases, streaming APIs — subclass PipelineInput directly:

from collections.abc import Iterator, Sequence
from typing import overload

from braided import PipelineInput

class CSVInput(PipelineInput[Record]):
    def __init__(self, path: str) -> None:
        import csv
        with open(path) as f:
            self._rows = [Record(x=int(r["x"])) for r in csv.DictReader(f)]

    def __len__(self) -> int:
        return len(self._rows)

    def __iter__(self) -> Iterator[Record]:
        return iter(self._rows)

    @overload
    def __getitem__(self, index: int) -> Record: ...
    @overload
    def __getitem__(self, index: slice) -> Sequence[Record]: ...
    def __getitem__(self, index: int | slice) -> Record | Sequence[Record]:
        return self._rows[index]

Pass it like any other input:

result = execute_pipeline(nodes, {"seed": CSVInput("data.csv")})

Custom inputs can also be instantiated from YAML config (see YAML / JSON config) as long as their constructor arguments use concrete types that jsonargparse can resolve.

YAML / JSON config

Pipelines can be defined in YAML or JSON and loaded at runtime. The function field accepts a dotted import path or a class_path + init_args dict for class-based strands.

# pipeline.yaml
nodes:
  out:
    function:
      class_path: mypackage.Scale
      init_args:
        factor: 10
    args: [doubled]
  doubled:
    function: mypackage.double
    args: [seed]

inputs:
  seed:
    class_path: mypackage.CSVInput
    init_args:
      path: data.csv
from braided import execute_pipeline_from_config

result = list(execute_pipeline_from_config("pipeline.yaml"))

Built-in strands

Cache

Cache is a pass-through strand that persists its input sequence to disk on the first run and reloads it on subsequent runs, skipping upstream computation:

from braided import Cache, Node, NodeSpec, SequenceInput, execute_pipeline

nodes: NodeSpec[Record] = {
    "out": Node(function=Cache[Record]("/tmp/my_cache"), args=["source"]),
    "source": Node(function=double, args=["seed"]),
}
# First run: computes and saves to disk.
execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=1), Record(x=2)])})
# Second run: loads from disk; "source" is never evaluated.
result = list(execute_pipeline(nodes, {"seed": SequenceInput[Record]([])}))

join / Join

Inner join on a shared key column:

from braided import Join, Node, NodeSpec, SequenceInput, execute_pipeline

nodes: NodeSpec[dict] = {
    "out": Node(function=Join[dict]("id"), args=["left", "right"]),
}
result = execute_pipeline(nodes, {
    "left": SequenceInput[dict]([{"id": 1, "val": "a"}]),
    "right": SequenceInput[dict]([{"id": 1, "score": 42}]),
})
print(list(result))  # [{"id": 1, "val": "a", "score": 42}]

Custom execution backends

Pass custom map, flat_map, or many_to_many callables to execute_pipeline to control how sequences are materialized — for example, using the HuggingFace Datasets backend:

from braided.integrations.hf_datasets import hf_map_funcs

result = execute_pipeline(nodes, inputs, **hf_map_funcs())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

braided-0.0.1.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

braided-0.0.1-py3-none-any.whl (13.4 kB view details)

Uploaded Python 3

File details

Details for the file braided-0.0.1.tar.gz.

File metadata

  • Download URL: braided-0.0.1.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for braided-0.0.1.tar.gz
Algorithm Hash digest
SHA256 0b60bc7de6330acd85a961046c06f0277249905ee46b713343887641ac325b5b
MD5 fada42da82bf31da4055f31916f9a40c
BLAKE2b-256 d75c7a0cb6a2460452e24f5b3a45b01764ecb6c0142341653f3f3857ba055220

See more details on using hashes here.

File details

Details for the file braided-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: braided-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 13.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for braided-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0c002a1101b3d1e73fb15e75bee7cfc71cfdc37f02827e2a25e4bf831e51b3d4
MD5 334df1754cfdf408ac0764f233d16dd1
BLAKE2b-256 924552d5350b5a59f1c55dfd60966d7b6385bb6bfa571848e65c3b716886222f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page