Skip to main content

Content-addressed, journaled data-experiment management for Python

Project description

datablocks

Content-addressed, journaled data-experiment management for Python.

PyPI License: MIT Python 3.12+


Overview

datablocks (imported as dbx) is a framework for building reproducible data pipelines where every computation step is:

  • Content-addressed — identified by a deterministic SHA-256 hash of its class name, configuration, and version.
  • Journaled — every build writes a Parquet journal entry recording the full provenance (timestamp, git revision, config, hash).
  • Idempotent — calling build() on an already-valid block is a no-op.
  • Storage-agnostic — paths are resolved via fsspec, supporting local, S3, GCS, ADLS, and any other backend.

Key Features

Feature Description
Datablock Content-addressed unit of computation with topic-based output paths
Datastack Orchestrates parallel builds of child Datablocks (shards)
CONFIG Dataclass-based configuration schema with lazy evaluation and specline support
Journaling Automatic Parquet-based provenance tracking for every build
env() Relocatable environment variable references that stay symbolic in handles
Callable Executors Pluggable parallelism: inline, threading, multiprocessing, Ray
Torch Executors Device-aware executors that automatically move callables between CPU/GPU
Remote Execution Ray-based remote dbx interpreter with transparent proxying
Slurm Integration Launch Ray clusters on Slurm and execute pipelines remotely
@tagged Decorator that auto-generates human-readable pipeline tags

Installation

pip install datablocks

With Ray support (for remote execution and RayCallableExecutor):

pip install datablocks[ray]

For development:

git clone https://github.com/dmitry-karpeyev/datablocks.git
cd datablocks
pip install -e ".[dev]"

Quickstart

import os, tempfile
from dataclasses import dataclass
import pandas as pd
from dbx import Datablock, write_frame, read_frame

class SampleBlock(Datablock):
    """A minimal Datablock that generates a DataFrame."""

    TOPICFILES = {'output': 'data.parquet'}

    @dataclass
    class CONFIG(Datablock.CONFIG):
        n_rows: int = 10

    def __build__(self):
        df = pd.DataFrame({'x': range(self.cfg.n_rows)})
        write_frame(df, self.path('output', ensure_dirpath=True))
        return self

    def __read__(self, topic='output'):
        return read_frame(self.path(topic))

# Build and read
root = tempfile.mkdtemp()
os.environ['DBXGITREPO'] = ''  # skip git check for this example

block = SampleBlock(url=root, spec=dict(n_rows=5))
block.build()
df = block.read('output')
print(df)
#    x
# 0  0
# 1  1
# 2  2
# 3  3
# 4  4

Core Concepts

Datablock

A Datablock is the fundamental unit. Subclass it and implement:

  • __build__(self) — produce your outputs (write to self.path(topic)).
  • __read__(self, topic) — load and return the output for a topic.
  • TOPICFILES — dict mapping topic names to filenames.
  • CONFIG — dataclass defining your configuration schema.
class MyBlock(Datablock):
    TOPICFILES = {'features': 'features.parquet', 'model': 'model.pt'}

    @dataclass
    class CONFIG(Datablock.CONFIG):
        input_path: str = None
        n_components: int = 64

    def __build__(self):
        # self.cfg.input_path, self.cfg.n_components are available
        # Write outputs to self.path('features'), self.path('model')
        ...

Path Hierarchy

Every Datablock maps to a deterministic directory tree:

{root}/{anchor}/{key}/{topic}/{TOPICFILE}
  • root — storage root (local path or fsspec URL)
  • anchor — defaults to the fully-qualified class name
  • key — derived from the content hash (configurable via keyby)
  • topic — named output channel
  • TOPICFILE — the filename within the topic directory

Datastack

A Datastack manages a collection of child Datablocks (shards) and builds them in parallel:

class MyStack(Datastack):
    @dataclass
    class CONFIG(Datablock.CONFIG):
        n_items: int = 1000
        shard_size: int = 100

    @property
    def n_shards(self):
        return math.ceil(self.cfg.n_items / self.cfg.shard_size)

    def __shard__(self, idx):
        return MyShard(url=self.root, spec=dict(idx=idx, ...))

stack = MyStack(url='/data', spec=dict(n_items=1000),
                parallelization='multithreading', n_workers=4)
stack.build()

Configuration & env()

Use env() to reference environment variables symbolically — the handle stays portable across machines:

from dbx import env

block = MyBlock(url=env('DATA_ROOT'), spec=dict(input_path=env('INPUT_DIR')))
# The handle contains "$dbx.getenv('DATA_ROOT')" instead of the resolved path

Callable Executors

Pluggable parallelism for any sequence of callables:

from dbx import callable_executor

executor = callable_executor('multithreading', n_workers=8, tag='my-job')
results = executor.execute(list_of_callables)

# Available strategies: 'inline', 'multithreading', 'multiprocessing',
#   'ray', 'torch_multithreading', 'torch_multiprocessing'

Journaling

Every build() writes journal entries (Parquet) recording the timestamp, git revision, config, and hash. Query them later:

j = block.journal()            # JournalFrame (DataFrame subclass)
entry = j.get(0)               # JournalEntry (Series subclass)
print(entry.hash, entry.anchor, entry.revision)

CLI

The package installs several console entry points:

dbx 'module.function(arg1, arg2)'          # evaluate a dbx expression
dbx.exec 'module.function(arg1, arg2)'     # same as dbx
dbx.pprint 'module.function(arg1, arg2)'   # evaluate and pretty-print
dbx.slurm.exec 'module.function()'         # execute on a Slurm Ray cluster

Running Tests

python -m pytest tests/ -x -q

Pre-requisites for Remote Tests

Remote tests rely on Ray. If you are running tests in an environment with a git repository, the tests will fail if the repository has uncommitted changes (unless DBXGITREPO is unset).

Environment Variables

Variable Description
DBX_ROOT Default storage root when root is not specified
DBX_GIT_REPO Path to the git repository for revision tracking
DBX_DIRTY_REPO_OK Set to skip the dirty-repo check
DBX_LOG_INFO, DBX_LOG_DEBUG, etc. Control per-level log output
DBX_LOG_SELECTION Comma-separated FQN list for selected() log filtering

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datablocks-0.0.1.tar.gz (93.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datablocks-0.0.1-py3-none-any.whl (49.3 kB view details)

Uploaded Python 3

File details

Details for the file datablocks-0.0.1.tar.gz.

File metadata

  • Download URL: datablocks-0.0.1.tar.gz
  • Upload date:
  • Size: 93.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for datablocks-0.0.1.tar.gz
Algorithm Hash digest
SHA256 58829171134e19b5c8aa2f48b4091c1c360e43ec5ba3a62b2013264a711ed373
MD5 0a914fc5d1c925ab36af0b4477bbd767
BLAKE2b-256 b0963f9d4f54a87aa9138b3f44dbd86af6fb7d3ea43b27373a10e69d7b94e05c

See more details on using hashes here.

File details

Details for the file datablocks-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: datablocks-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 49.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for datablocks-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1728e4c6053e28574568acaa1247e9cde369ecbe9df6dc0a7f339437b12cb04c
MD5 954dac8b3f88b2995c10db9a8e36b86d
BLAKE2b-256 f970c20ace5955c90a0b483f4622945fb69d6995b981a74df9fee88049e562e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page