Config-addressed, journaled data-experiment management for Python
Project description
datablocks
Config-addressed, journaled data-experiment management for Python.
Overview
datablocks (imported as dbx) is a framework for building reproducible data pipelines where every computation step is:
- Config-addressed — identified by a deterministic SHA-256 hash of its class name, configuration, and version.
- Journaled — every build writes a Parquet journal entry recording the full provenance (timestamp, git revision, config, hash).
- Idempotent — calling
build()on an already-valid block is a no-op. - Storage-agnostic — paths are resolved via fsspec, supporting local, S3, GCS, ADLS, and any other backend.
Key Features
| Feature | Description |
|---|---|
Datablock |
Config-addressed unit of computation with topic-based output paths |
Datastack |
Orchestrates parallel builds of child Datablocks (shards) |
CONFIG |
Dataclass-based configuration schema with lazy evaluation and specline support |
| Journaling | Automatic Parquet-based provenance tracking for every build |
env() |
Relocatable environment variable references that stay symbolic in handles |
| Callable Executors | Pluggable parallelism: inline, threading, multiprocessing, Ray |
| Torch Executors | Device-aware executors that automatically move callables between CPU/GPU |
| Remote Execution | Ray-based remote dbx interpreter with transparent proxying |
| Slurm Integration | Launch Ray clusters on Slurm and execute pipelines remotely |
@tagged |
Decorator that auto-generates human-readable pipeline tags |
Installation
pip install datablocks
With Ray support (for remote execution and RayCallableExecutor):
pip install datablocks[ray]
For development:
git clone https://github.com/dmitry-karpeyev/datablocks.git
cd datablocks
pip install -e ".[dev]"
Quickstart
import os, tempfile
from dataclasses import dataclass
import pandas as pd
from dbx import Datablock, write_frame, read_frame
class SampleBlock(Datablock):
"""A minimal Datablock that generates a DataFrame."""
TOPICFILES = {'output': 'data.parquet'}
@dataclass
class CONFIG(Datablock.CONFIG):
n_rows: int = 10
def __build__(self):
df = pd.DataFrame({'x': range(self.cfg.n_rows)})
write_frame(df, self.path('output', ensure_dirpath=True))
return self
def __read__(self, topic='output'):
return read_frame(self.path(topic))
# Build and read
root = tempfile.mkdtemp()
os.environ['DBXGITREPO'] = '' # skip git check for this example
block = SampleBlock(url=root, spec=dict(n_rows=5))
block.build()
df = block.read('output')
print(df)
# x
# 0 0
# 1 1
# 2 2
# 3 3
# 4 4
Core Concepts
Datablock
A Datablock is the fundamental unit. Subclass it and implement:
__build__(self)— produce your outputs (write toself.path(topic)).__read__(self, topic)— load and return the output for a topic.TOPICFILES— dict mapping topic names to filenames.CONFIG— dataclass defining your configuration schema.
class MyBlock(Datablock):
TOPICFILES = {'features': 'features.parquet', 'model': 'model.pt'}
@dataclass
class CONFIG(Datablock.CONFIG):
input_path: str = None
n_components: int = 64
def __build__(self):
# self.cfg.input_path, self.cfg.n_components are available
# Write outputs to self.path('features'), self.path('model')
...
Path Hierarchy
Every Datablock maps to a deterministic directory tree:
{root}/{anchor}/{key}/{topic}/{TOPICFILE}
- root — storage root (local path or fsspec URL)
- anchor — defaults to the fully-qualified class name
- key — derived from the content hash (configurable via
keyby) - topic — named output channel
- TOPICFILE — the filename within the topic directory
Datastack
A Datastack manages a collection of child Datablocks (shards) and builds them in parallel:
class MyStack(Datastack):
@dataclass
class CONFIG(Datablock.CONFIG):
n_items: int = 1000
shard_size: int = 100
@property
def n_shards(self):
return math.ceil(self.cfg.n_items / self.cfg.shard_size)
def __shard__(self, idx):
return MyShard(url=self.root, spec=dict(idx=idx, ...))
stack = MyStack(url='/data', spec=dict(n_items=1000),
parallelization='multithreading', n_workers=4)
stack.build()
Configuration & env()
Use env() to reference environment variables symbolically — the handle stays portable across machines:
from dbx import env
block = MyBlock(url=env('DATA_ROOT'), spec=dict(input_path=env('INPUT_DIR')))
# The handle contains "$dbx.getenv('DATA_ROOT')" instead of the resolved path
Callable Executors
Pluggable parallelism for any sequence of callables:
from dbx import callable_executor
executor = callable_executor('multithreading', n_workers=8, tag='my-job')
results = executor.execute(list_of_callables)
# Available strategies: 'inline', 'multithreading', 'multiprocessing',
# 'ray', 'torch_multithreading', 'torch_multiprocessing'
Journaling
Every build() writes journal entries (Parquet) recording the timestamp, git revision, config, and hash. Query them later:
j = block.journal() # JournalFrame (DataFrame subclass)
entry = j.get(0) # JournalEntry (Series subclass)
print(entry.hash, entry.anchor, entry.revision)
CLI
The package installs several console entry points:
dbx 'module.function(arg1, arg2)' # evaluate a dbx expression
dbx.exec 'module.function(arg1, arg2)' # same as dbx
dbx.pprint 'module.function(arg1, arg2)' # evaluate and pretty-print
dbx.slurm.exec 'module.function()' # execute on a Slurm Ray cluster
Running Tests
python -m pytest tests/ -x -q
Pre-requisites for Remote Tests
Remote tests rely on Ray. If you are running tests in an environment with a git repository, the tests will fail if the repository has uncommitted changes (unless DBXGITREPO is unset).
Environment Variables
| Variable | Description |
|---|---|
DBX_ROOT |
Default storage root when root is not specified |
DBX_GIT_REPO |
Path to the git repository for revision tracking |
DBX_DIRTY_REPO_OK |
Set to skip the dirty-repo check |
DBX_LOG_INFO, DBX_LOG_DEBUG, etc. |
Control per-level log output |
DBX_LOG_SELECTION |
Comma-separated FQN list for selected() log filtering |
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file datablocks-0.0.2.tar.gz.
File metadata
- Download URL: datablocks-0.0.2.tar.gz
- Upload date:
- Size: 78.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
921f13e804029dde879841eaef1cf2b82956e85872658ce531cb72ff64b053b6
|
|
| MD5 |
b63b581c599c0f19025f8fc8af91d850
|
|
| BLAKE2b-256 |
ed69c966d44f16516379a8e579f1dfa98615ff47e4f5aa6e17090d57e46f063f
|
File details
Details for the file datablocks-0.0.2-py3-none-any.whl.
File metadata
- Download URL: datablocks-0.0.2-py3-none-any.whl
- Upload date:
- Size: 46.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b16333308ac8e92229c0f39cf880f2bf4db22b95e23832ad5a0421496aad7657
|
|
| MD5 |
9db45e217130ea7817290ba8a1499ca8
|
|
| BLAKE2b-256 |
0e6da03eeb10e71114b231d196d10046ff4accdb80855dc89dd0514154816fe0
|