A data pipeline library: YAML/JSON-configured DAGs of typed dict transformations.
Project description
braided
A typed-dict pipeline library for Python. Define transformation DAGs in code or YAML/JSON, get automatic type checking, lazy evaluation, and optional caching.
Contents
- Install
- Quickstart
- Strand kinds
- Custom inputs
- YAML / JSON config
- Built-in strands
- Custom execution backends
Install
pip install braided
Optional extras:
| Extra | What it unlocks |
|---|---|
braided[datasets] |
HuggingFace Datasets map/cache backend (braided.integrations.hf_datasets) |
braided[torch] |
Tensor-safe pickle serialization in Cache |
braided[jaxtyping] |
Array dtype/shape awareness in the type checker |
braided[dev] |
pytest, pyright, and build tools |
Quickstart
from typing import Iterator, TypedDict
from braided import Node, NodeSpec, SequenceInput, execute_pipeline, strand
class Record(TypedDict):
x: int
@strand
def double(item: Record) -> Record:
return Record(x=item["x"] * 2)
@strand.one_to_many
def up_to(item: Record) -> Iterator[Record]:
for i in range(item["x"]):
yield Record(x=i)
nodes: NodeSpec[Record] = {
"out": Node(function=up_to, args=["doubled"]),
"doubled": Node(function=double, args=["seed"]),
}
result = execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=3)])})
print(list(result))
# [{"x": 0}, {"x": 1}, {"x": 2}, {"x": 3}, {"x": 4}, {"x": 5}]
Strand kinds
| Decorator | Input → Output | Use for |
|---|---|---|
@strand |
T → T' |
one-to-one row transforms |
@strand.one_to_many |
T → Iterator[T'] |
splitting or expanding rows |
@strand.many_to_many |
Sequence[T] → Iterator[T'] |
aggregations, joins, reordering |
A strand can take multiple input sequences by declaring multiple parameters. For @strand and @strand.one_to_many, inputs are aligned by position (zipped); for @strand.many_to_many, they are passed as separate sequences:
class Pair(TypedDict):
a: int
b: int
@strand
def zip_add(left: Record, right: Record) -> Pair:
return Pair(a=left["x"], b=right["x"])
nodes: NodeSpec[Pair] = {
"out": Node(function=zip_add, args=["left", "right"]),
}
result = execute_pipeline(nodes, {
"left": SequenceInput[Record]([Record(x=1), Record(x=2)]),
"right": SequenceInput[Record]([Record(x=10), Record(x=20)]),
})
print(list(result)) # [{"a": 1, "b": 10}, {"a": 2, "b": 20}]
Class-based strands inherit from Strand[T].OneToOne(), .OneToMany(), or .ManyToMany(). They can take constructor parameters:
from braided import Strand
class Scale(Strand[Record].OneToOne()):
def __init__(self, factor: int) -> None:
self.factor = factor
def __call__(self, item: Record) -> Record:
return Record(x=item["x"] * self.factor)
Custom inputs
Pipelines receive data through PipelineInput subclasses. SequenceInput wraps an in-memory list. For other data sources — files, databases, streaming APIs — subclass PipelineInput directly:
from collections.abc import Iterator, Sequence
from typing import overload
from braided import PipelineInput
class CSVInput(PipelineInput[Record]):
def __init__(self, path: str) -> None:
import csv
with open(path) as f:
self._rows = [Record(x=int(r["x"])) for r in csv.DictReader(f)]
def __len__(self) -> int:
return len(self._rows)
def __iter__(self) -> Iterator[Record]:
return iter(self._rows)
@overload
def __getitem__(self, index: int) -> Record: ...
@overload
def __getitem__(self, index: slice) -> Sequence[Record]: ...
def __getitem__(self, index: int | slice) -> Record | Sequence[Record]:
return self._rows[index]
Pass it like any other input:
result = execute_pipeline(nodes, {"seed": CSVInput("data.csv")})
Custom inputs can also be instantiated from YAML config (see YAML / JSON config) as long as their constructor arguments use concrete types that jsonargparse can resolve.
YAML / JSON config
Pipelines can be defined in YAML or JSON and loaded at runtime. The function field accepts a dotted import path or a class_path + init_args dict for class-based strands.
# pipeline.yaml
nodes:
out:
function:
class_path: mypackage.Scale
init_args:
factor: 10
args: [doubled]
doubled:
function: mypackage.double
args: [seed]
inputs:
seed:
class_path: mypackage.CSVInput
init_args:
path: data.csv
from braided import execute_pipeline_from_config
result = list(execute_pipeline_from_config("pipeline.yaml"))
Built-in strands
Cache
Cache is a pass-through strand that persists its input sequence to disk on the first run and reloads it on subsequent runs, skipping upstream computation:
from braided import Cache, Node, NodeSpec, SequenceInput, execute_pipeline
nodes: NodeSpec[Record] = {
"out": Node(function=Cache[Record]("/tmp/my_cache"), args=["source"]),
"source": Node(function=double, args=["seed"]),
}
# First run: computes and saves to disk.
execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=1), Record(x=2)])})
# Second run: loads from disk; "source" is never evaluated.
result = list(execute_pipeline(nodes, {"seed": SequenceInput[Record]([])}))
join / Join
Inner join on a shared key column:
from braided import Join, Node, NodeSpec, SequenceInput, execute_pipeline
nodes: NodeSpec[dict] = {
"out": Node(function=Join[dict]("id"), args=["left", "right"]),
}
result = execute_pipeline(nodes, {
"left": SequenceInput[dict]([{"id": 1, "val": "a"}]),
"right": SequenceInput[dict]([{"id": 1, "score": 42}]),
})
print(list(result)) # [{"id": 1, "val": "a", "score": 42}]
Custom execution backends
Pass custom map, flat_map, or many_to_many callables to execute_pipeline to control how sequences are materialized — for example, using the HuggingFace Datasets backend:
from braided.integrations.hf_datasets import hf_map_funcs
result = execute_pipeline(nodes, inputs, **hf_map_funcs())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file braided-0.0.1.tar.gz.
File metadata
- Download URL: braided-0.0.1.tar.gz
- Upload date:
- Size: 13.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b60bc7de6330acd85a961046c06f0277249905ee46b713343887641ac325b5b
|
|
| MD5 |
fada42da82bf31da4055f31916f9a40c
|
|
| BLAKE2b-256 |
d75c7a0cb6a2460452e24f5b3a45b01764ecb6c0142341653f3f3857ba055220
|
File details
Details for the file braided-0.0.1-py3-none-any.whl.
File metadata
- Download URL: braided-0.0.1-py3-none-any.whl
- Upload date:
- Size: 13.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0c002a1101b3d1e73fb15e75bee7cfc71cfdc37f02827e2a25e4bf831e51b3d4
|
|
| MD5 |
334df1754cfdf408ac0764f233d16dd1
|
|
| BLAKE2b-256 |
924552d5350b5a59f1c55dfd60966d7b6385bb6bfa571848e65c3b716886222f
|