Skip to main content

Config-addressed, journaled data-experiment management for Python

Project description

datablocks

Config-addressed, journaled data-experiment management for Python.

PyPI License: MIT Python 3.12+


Overview

datablocks (imported as dbx) is a framework for building reproducible data pipelines where every computation step is:

  • Config-addressed — identified by a deterministic SHA-256 hash of its class name, configuration, and version.
  • Journaled — every build writes a Parquet journal entry recording the full provenance (timestamp, git revision, config, hash).
  • Idempotent — calling build() on an already-valid block is a no-op.
  • Storage-agnostic — paths are resolved via fsspec, supporting local, S3, GCS, ADLS, and any other backend.

Key Features

Feature Description
Datablock Config-addressed unit of computation with topic-based output paths
Datastack Orchestrates parallel builds of child Datablocks (shards)
CONFIG Dataclass-based configuration schema with lazy evaluation and specline support
Journaling Automatic Parquet-based provenance tracking for every build
env() Relocatable environment variable references that stay symbolic in handles
Callable Executors Pluggable parallelism: inline, threading, multiprocessing, Ray
Torch Executors Device-aware executors that automatically move callables between CPU/GPU
Remote Execution Ray-based remote dbx interpreter with transparent proxying
Slurm Integration Launch Ray clusters on Slurm and execute pipelines remotely
@tagged Decorator that auto-generates human-readable pipeline tags

Installation

pip install datablocks

With Ray support (for remote execution and RayCallableExecutor):

pip install datablocks[ray]

For development:

git clone https://github.com/dmitry-karpeyev/datablocks.git
cd datablocks
pip install -e ".[dev]"

Quickstart

import os, tempfile
from dataclasses import dataclass
import pandas as pd
from dbx import Datablock, write_frame, read_frame

class SampleBlock(Datablock):
    """A minimal Datablock that generates a DataFrame."""

    TOPICFILES = {'output': 'data.parquet'}

    @dataclass
    class CONFIG(Datablock.CONFIG):
        n_rows: int = 10

    def __build__(self):
        df = pd.DataFrame({'x': range(self.cfg.n_rows)})
        write_frame(df, self.path('output', ensure_dirpath=True))
        return self

    def __read__(self, topic='output'):
        return read_frame(self.path(topic))

# Build and read
root = tempfile.mkdtemp()
os.environ['DBXGITREPO'] = ''  # skip git check for this example

block = SampleBlock(url=root, spec=dict(n_rows=5))
block.build()
df = block.read('output')
print(df)
#    x
# 0  0
# 1  1
# 2  2
# 3  3
# 4  4

Core Concepts

Datablock

A Datablock is the fundamental unit. Subclass it and implement:

  • __build__(self) — produce your outputs (write to self.path(topic)).
  • __read__(self, topic) — load and return the output for a topic.
  • TOPICFILES — dict mapping topic names to filenames.
  • CONFIG — dataclass defining your configuration schema.
class MyBlock(Datablock):
    TOPICFILES = {'features': 'features.parquet', 'model': 'model.pt'}

    @dataclass
    class CONFIG(Datablock.CONFIG):
        input_path: str = None
        n_components: int = 64

    def __build__(self):
        # self.cfg.input_path, self.cfg.n_components are available
        # Write outputs to self.path('features'), self.path('model')
        ...

Path Hierarchy

Every Datablock maps to a deterministic directory tree:

{root}/{anchor}/{key}/{topic}/{TOPICFILE}
  • root — storage root (local path or fsspec URL)
  • anchor — defaults to the fully-qualified class name
  • key — derived from the content hash (configurable via keyby)
  • topic — named output channel
  • TOPICFILE — the filename within the topic directory

Datastack

A Datastack manages a collection of child Datablocks (shards) and builds them in parallel:

class MyStack(Datastack):
    @dataclass
    class CONFIG(Datablock.CONFIG):
        n_items: int = 1000
        shard_size: int = 100

    @property
    def n_shards(self):
        return math.ceil(self.cfg.n_items / self.cfg.shard_size)

    def __shard__(self, idx):
        return MyShard(url=self.root, spec=dict(idx=idx, ...))

stack = MyStack(url='/data', spec=dict(n_items=1000),
                parallelization='multithreading', n_workers=4)
stack.build()

Configuration & env()

Use env() to reference environment variables symbolically — the handle stays portable across machines:

from dbx import env

block = MyBlock(url=env('DATA_ROOT'), spec=dict(input_path=env('INPUT_DIR')))
# The handle contains "$dbx.getenv('DATA_ROOT')" instead of the resolved path

Callable Executors

Pluggable parallelism for any sequence of callables:

from dbx import callable_executor

executor = callable_executor('multithreading', n_workers=8, tag='my-job')
results = executor.execute(list_of_callables)

# Available strategies: 'inline', 'multithreading', 'multiprocessing',
#   'ray', 'torch_multithreading', 'torch_multiprocessing'

Journaling

Every build() writes journal entries (Parquet) recording the timestamp, git revision, config, and hash. Query them later:

j = block.journal()            # JournalFrame (DataFrame subclass)
entry = j.get(0)               # JournalEntry (Series subclass)
print(entry.hash, entry.anchor, entry.revision)

CLI

The package installs several console entry points:

dbx 'module.function(arg1, arg2)'          # evaluate a dbx expression
dbx.exec 'module.function(arg1, arg2)'     # same as dbx
dbx.pprint 'module.function(arg1, arg2)'   # evaluate and pretty-print
dbx.slurm.exec 'module.function()'         # execute on a Slurm Ray cluster

Running Tests

python -m pytest tests/ -x -q

Pre-requisites for Remote Tests

Remote tests rely on Ray. If you are running tests in an environment with a git repository, the tests will fail if the repository has uncommitted changes (unless DBXGITREPO is unset).

Environment Variables

Variable Description
DBX_ROOT Default storage root when root is not specified
DBX_GIT_REPO Path to the git repository for revision tracking
DBX_DIRTY_REPO_OK Set to skip the dirty-repo check
DBX_LOG_INFO, DBX_LOG_DEBUG, etc. Control per-level log output
DBX_LOG_SELECTION Comma-separated FQN list for selected() log filtering

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datablocks-0.0.3.tar.gz (85.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datablocks-0.0.3-py3-none-any.whl (47.5 kB view details)

Uploaded Python 3

File details

Details for the file datablocks-0.0.3.tar.gz.

File metadata

  • Download URL: datablocks-0.0.3.tar.gz
  • Upload date:
  • Size: 85.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for datablocks-0.0.3.tar.gz
Algorithm Hash digest
SHA256 bb035e9b9cc21e72acc95af0434a98f9660ecf488707891b32302ce8844deec3
MD5 9679d520063deb86579099795a4690c5
BLAKE2b-256 39883c03b71a0fa291c28813594bb18aab14c4ded8f8d34c16417d1ac6c7e3df

See more details on using hashes here.

File details

Details for the file datablocks-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: datablocks-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 47.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for datablocks-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 732390d6181672ebbe44479e36c9b5f8a919f6c80e8299fbbfa42acbc50bbd86
MD5 6f8f772d0fdcf3429d1a7ce23dec674b
BLAKE2b-256 9e03b43a44184ed93037fb5be748f0c1aaaba85c3728eb5eed9fca5a1a127df1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page