GUI orchestrator for python-based dataframe transform pipelines.
Project description
Data Engine
Data Engine is a GUI orchestrator for Python-based dataframe transform pipelines.
It provides:
- a workspace-based runtime for authored flows
- a desktop GUI for operators
- a terminal UI for headless/local operation
- an experimental Rust-backed
eguisurface - parquet-first inspect/debug tooling for dataframe outputs
Flows are plain Python modules that declare how source files, settings workbooks, schedules, and manual runs should move through Polars, DuckDB, and file outputs.
What It Is
Data Engine is not just a dataframe library and not just a scheduler. It is the operator/runtime layer around Python-authored flow modules.
The package handles:
- workspace discovery and selection
- daemon ownership and control handoff
- manual, poll, and schedule execution modes
- mirrored output routing
- persisted run/log/state history
- dataframe inspection inside the app
Step functions use normal Python libraries directly. In practice that usually means:
- Polars for dataframe transforms
- DuckDB for SQL-oriented work
- pathlib-style file output
Install
Installer scripts
Use the installer that matches your environment:
- macOS: INSTALL/INSTALL MAC.command
- Windows: INSTALL/INSTALL WINDOWS.bat
- Windows VM / CPU-safe Polars path: INSTALL/INSTALL WINDOWS_VM.bat
Manual install
Base install:
python -m pip install py-data-engine
Editable local install:
python -m pip install -e .
Notebook-authored flow modules (.ipynb) work with the base install. Install the optional notebook extra only when you want Jupyter authoring tools:
python -m pip install -e ".[notebook]"
For contributors:
python -m pip install -e ".[dev]"
Core requirements:
- Python
>=3.14 - PySide6 for the desktop GUI
- Textual for the terminal UI
Start The App
Desktop GUI:
data-engine start gui
Experimental Rust egui surface:
data-engine start egui
Terminal UI:
data-engine start tui
You can also launch from module form if needed:
python -m data_engine.ui.cli.app start gui
Headless CLI
data-engine list
data-engine show example_summary
data-engine run --once example_summary
data-engine run
data-engine run starts the automated engine headlessly for discovered automated flows and keeps running until stopped. Use --once to force a single pass instead.
Public API
from data_engine import Flow, FlowContext, discover_flows, load_flow, run
Workspace Model
Data Engine discovers workspaces from a collection root resolved from:
DATA_ENGINE_WORKSPACE_COLLECTION_ROOT, when explicitly setDATA_ENGINE_WORKSPACE_ROOT, when binding directly to one authored workspace- otherwise the machine-local app settings store
Each immediate child folder containing flow_modules/ is treated as a workspace, for example:
workspaces/example_workspace/flow_modules/workspaces/claims2/flow_modules/
Shared workspace state lives inside each authored workspace:
workspaces/<workspace_id>/.workspace_state/
Machine-local runtime state lives under the app artifacts root:
artifacts/workspace_cache/<workspace_id>/artifacts/runtime_state/<workspace_id>/
The app's selected workspace and collection-root preference are machine-local settings, not repo-local config.
Flow Shape
from data_engine import Flow
import polars as pl
def read_claims(context):
return pl.read_excel(context.source.path)
def keep_open(context):
return context.current.filter(pl.col("status") == "OPEN")
def write_parquet(context):
output = context.mirror.with_suffix(".parquet")
context.current.write_parquet(output)
return output
def build():
return (
Flow(group="Claims")
.watch(
mode="poll",
source="../../../example_data/Input/claims_flat",
interval="5s",
extensions=[".xlsx", ".xls", ".xlsm"],
settle=1,
)
.mirror(root="../../../example_data/Output/example_mirror")
.step(read_claims, save_as="raw_df")
.step(keep_open, use="raw_df", save_as="filtered_df")
.step(write_parquet, use="filtered_df")
)
Each flow module exports:
- optional
DESCRIPTION build() -> Flow
The module filename is the flow identity. Authored flow modules should set Flow(group=...) and let the loader inject the final flow name from the module filename.
Runtime Modes
Flows can run as:
manualpollschedule
At a high level:
manualruns on operator requestpollwatches source inputs for new or changed filesscheduleruns on a time-based cadence
Batch Helpers
For batch-oriented flows, use Flow.collect(...) plus either Flow.map(...) or Flow.step_each(...).
from data_engine import Flow
def validate_workbook(context, file_ref):
return {
"name": file_ref.name,
"path": file_ref.path,
"ok": file_ref.exists(),
}
def build():
return (
Flow(group="Claims")
.watch(mode="schedule", run_as="batch", interval="15m", source="../../../example_data/Input/claims_flat")
.collect([".xlsx"])
.map(validate_workbook)
)
Flow.collect(...) returns a Batch of FileRef items. Flow.map(...) applies one callable to each item and returns a new Batch. Flow.step_each(...) is the equivalent readability-first alias.
Flow API
Flow(group=...).watch(mode="manual", source=None, run_as="individual").watch(mode="poll", source=..., interval=..., extensions=None, settle=1, run_as="individual").watch(mode="schedule", interval=..., source=None, run_as="individual" | "batch").watch(mode="schedule", time="HH:MM", source=None, run_as="individual" | "batch").watch(mode="schedule", time=["08:15", "14:45"], source=..., run_as="individual" | "batch").mirror(root=...).step(fn, use=None, save_as=None, label=None).collect(extensions, root=None, recursive=False, use=None, save_as=None, label=None).map(fn, use=None, save_as=None, label=None).step_each(fn, use=None, save_as=None, label=None).preview(use=None).run_once().run().show()
step() callables always receive one FlowContext parameter and return the next value for context.current.
map() and step_each() callables accept either (item) or (context, item) and return a mapped Batch.
FlowContext
FlowContext exposes the active run state:
context.sourcecontext.mirrorcontext.currentcontext.objectscontext.metadatacontext.source_metadata()context.debug
Useful source helpers:
context.source.pathcontext.source.with_extension(".json")context.source.with_suffix(".json")context.source.file("notes.json")context.source.namespaced_file("notes.json")context.source.root_file("lookup.csv")
Useful mirror helpers:
context.mirror.with_extension(".parquet")context.mirror.with_suffix(".parquet")context.mirror.file("open_claims.parquet")context.mirror.namespaced_file("open_claims.parquet")
use="name" loads context.objects["name"] into context.current before the step runs. save_as="name" stores the returned value into context.objects["name"].
Dataframe Debugging
The app includes a dataframe-first debug pane for saved parquet artifacts.
From a flow step, save a debug dataframe with:
context.debug.save_frame(context.current, name="raw_claims")
That writes:
- a parquet artifact for the dataframe
- companion metadata used by the UI
The desktop GUI can then:
- list saved dataframe artifacts by flow/step/timestamp
- inspect parquet outputs in-app
- preview top N, bottom N, or sampled rows
- filter columns with Excel-style distinct-value popups
- copy one or more selected cells from the table
The inspect modal reuses the same dataframe rendering path.
Notebook Preview
For notebook or REPL-style authoring outside a compiled flow module, preview() is usually the most useful helper:
build().preview(use="raw_df").head(10)
build().preview(use="filtered_df")
preview(use="name") runs the flow until that save_as="name" object exists, then returns the real object without running later steps. preview() is not available from inside compiled flow modules, so use it from an external notebook, REPL, or script while iterating on the flow.
Discovery And Compilation
Starter flow modules live in:
workspaces/<workspace_id>/flow_modules/artifacts/workspace_cache/<workspace_id>/compiled_flow_modules/
Authored flow modules compile into:
artifacts/workspace_cache/<workspace_id>/compiled_flow_modules/*.py
The runtime loads discovered flows from those compiled modules.
Workspace Layout
src/data_engine/Runtime package, operator surfaces, and servicesworkspaces/<workspace_id>/flow_modules/Authored flow sources (.pyor.ipynb)workspaces/<workspace_id>/.workspace_state/Shared lease markers and checkpoint parquet snapshotsartifacts/workspace_cache/<workspace_id>/compiled_flow_modules/Generated importable flow modulesartifacts/runtime_state/<workspace_id>/Machine-local runtime and daemon statesrc/data_engine/docs/Packaged documentation content
Smoke Data
Generate local smoke data with:
python scripts/generate_smoke_data.py --root . --workspace-id example_workspace --workspace-id claims2
Generated local data and workspaces are intentionally ignored:
data/data2/workspaces/
Packaging
Distribution name:
py-data-engine
Version source of truth:
src/data_engine/platform/identity.py
Build checks:
python -m build
python -m twine check dist/*
Status
This project is pre-alpha. Internal architecture is still moving quickly, and backwards compatibility is not a current goal.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file py_data_engine-0.3.2.tar.gz.
File metadata
- Download URL: py_data_engine-0.3.2.tar.gz
- Upload date:
- Size: 14.0 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d00a2576e7985104bd1647e4cdc39bf9c2ac49364280b4266da123997f6e5467
|
|
| MD5 |
41e67c7d9f06e1542f597dc06b761f59
|
|
| BLAKE2b-256 |
341e4b88220fd75a90bcbf5709e447e0ae2ff6b775e28cb4e3513a2a8f4446ce
|
Provenance
The following attestation bundles were made for py_data_engine-0.3.2.tar.gz:
Publisher:
publish-pypi.yml on bj-data-eng/data-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
py_data_engine-0.3.2.tar.gz -
Subject digest:
d00a2576e7985104bd1647e4cdc39bf9c2ac49364280b4266da123997f6e5467 - Sigstore transparency entry: 1342934577
- Sigstore integration time:
-
Permalink:
bj-data-eng/data-engine@5efc498827f04e44c50724c49b6d6980d2b3b01f -
Branch / Tag:
refs/heads/main - Owner: https://github.com/bj-data-eng
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@5efc498827f04e44c50724c49b6d6980d2b3b01f -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file py_data_engine-0.3.2-cp314-abi3-win_amd64.whl.
File metadata
- Download URL: py_data_engine-0.3.2-cp314-abi3-win_amd64.whl
- Upload date:
- Size: 14.5 MB
- Tags: CPython 3.14+, Windows x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d2574425f317d01294951cf38e75c9f4116fffe695f895b73f72cc65edc7d67
|
|
| MD5 |
17835652f3a4e41e949c56afcfa58a11
|
|
| BLAKE2b-256 |
0a631ee7dabc28deec8bbb947e4234118d925e80f1128937383c8f97aef0f3dd
|
Provenance
The following attestation bundles were made for py_data_engine-0.3.2-cp314-abi3-win_amd64.whl:
Publisher:
publish-pypi.yml on bj-data-eng/data-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
py_data_engine-0.3.2-cp314-abi3-win_amd64.whl -
Subject digest:
8d2574425f317d01294951cf38e75c9f4116fffe695f895b73f72cc65edc7d67 - Sigstore transparency entry: 1342934587
- Sigstore integration time:
-
Permalink:
bj-data-eng/data-engine@5efc498827f04e44c50724c49b6d6980d2b3b01f -
Branch / Tag:
refs/heads/main - Owner: https://github.com/bj-data-eng
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-pypi.yml@5efc498827f04e44c50724c49b6d6980d2b3b01f -
Trigger Event:
workflow_dispatch
-
Statement type: