Skip to main content

A modular data preparation CLI for ingestion, profiling, classification, preprocessing, and export pipelines

Project description

🗂️ Data Pill

A modular, open-source data preparation CLI for developers, data analysts, data scientists, and data engineers.

PyPI version Python License: MIT Ruff Tests SaaS

datapill gives you a single command-line tool to ingest, profile, classify, preprocess, and export data - from any source, to any destination - with a clean pipeline model and full artifact tracking. A hosted SaaS is coming soon.


💡 Why Data Pill?

Most data prep work happens in ad-hoc scripts, notebooks, or half-finished internal tools. datapill replaces that chaos with a reproducible, composable pipeline that works the same way whether you're exploring a CSV on your laptop or processing millions of rows from Kafka in production.

If you are… datapill helps you…
🧑‍💻 Developer Build data pipelines from config files, run them in CI, generate standalone Python scripts
📊 Data Analyst Profile any dataset in seconds, detect nulls, distributions, and anomalies without writing code
🤖 Data Scientist Classify columns by semantic type, preprocess features (scaling, imputation, encoding) with one command
⚙️ Data Engineer Stream-ingest from Postgres, MySQL, S3, Kafka, REST APIs; write back with upsert support

📦 Installation

pip install datapill

Requires Python 3.11+.

This installs the core CLI (~200 MB) with rule-based classification, profiling, preprocessing, and export. No ML model is required.

With embedding support

To use --mode embedding or --mode hybrid in dp classify, install the ML extras (~3.5 GB, includes PyTorch and sentence-transformers):

pip install "datapill[ml]"

Note: Without [ml], hybrid mode still works — it runs rule-based classification for all columns and skips the embedding fallback. Only columns that would otherwise be sent to the embedding model are affected.

Verify the install:

dp --help

🚀 Quick Start

1. Ingest a local CSV

dp ingest --source local_file --path data/sales.csv

2. Profile the dataset

dp profile --input <run_id>

3. Classify columns by semantic type

dp classify --input <run_id> --mode hybrid

4. Preprocess with a pipeline config

dp preprocess --input <run_id> --pipeline pipeline.json

5. Export to Parquet

dp export --input <run_id> --format parquet --out-path output/result.parquet

🛠️ Commands

dp ingest

Stream data from any supported source into the artifact store.

dp ingest --source postgresql --config pg.json --table orders
dp ingest --source s3 --config s3.json --url s3://my-bucket/data.parquet
dp ingest --source kafka --config kafka.json --topic events --max-records 10000
dp ingest --source rest --config api.json --endpoint /users
dp ingest --source local_file --path data/sales.csv --limit 50000

Supported sources: local_file · postgresql · mysql · s3 · rest · kafka

Supported formats (local / S3): csv · parquet · json · jsonl · excel


dp profile

Compute a full statistical profile of any ingested dataset.

dp profile --input <run_id>
dp profile --input <run_id> --mode summary
dp profile --input <run_id> --sample-strategy random --sample-size 100000
dp profile --input <run_id> --correlation spearman

What you get:

  • Per-column: null rate, distinct count, min/max/mean/median, std, skewness, kurtosis, percentiles, histogram
  • Top value frequencies with percentages
  • Pattern detection: email, URL, phone, UUID, ISO date
  • Correlation matrix (Pearson or Spearman) for all numeric columns
  • Warnings: HIGH_NULL_RATE, CONSTANT_COLUMN, SKEWED_DISTRIBUTION, HIGH_CARDINALITY, POTENTIAL_IDENTIFIER

dp classify

Classify every column in a dataset by its semantic type - automatically.

dp classify --input <run_id> --mode hybrid
dp classify --input <run_id> --mode rule_based --threshold 0.65
dp classify --input <run_id> --overrides '{"age": "numerical_continuous", "y": "target_label"}'

Modes:

Mode How it works Requires
rule_based Regex patterns on column names + dtype heuristics. Fast, zero ML dependencies. core
embedding Semantic similarity via sentence-transformers (all-MiniLM-L6-v2) against anchor texts per type. datapill[ml]
hybrid Rule-based first; embedding kicks in only for ambiguous or unknown columns. datapill[ml] for full accuracy

Without [ml]: hybrid mode runs entirely on rule-based logic. Columns that cannot be resolved by rules are returned as unknown instead of being sent to the embedding model.

Semantic types detected: identifier · numerical_continuous · numerical_discrete · categorical_nominal · categorical_ordinal · text_freeform · text_structured · datetime · boolean · geospatial · embedding · target_label


dp preprocess

Apply a preprocessing pipeline defined in a JSON config file.

dp preprocess --input <run_id> --pipeline pipeline.json
dp preprocess --input <run_id> --pipeline pipeline.json --dry-run
dp preprocess --input <run_id> --pipeline pipeline.json --checkpoint

Pipeline config format:

{
  "steps": [
    { "type": "impute_mean",      "scope": { "columns": ["age", "income"] } },
    { "type": "clip_iqr",         "scope": { "columns": ["income"] } },
    { "type": "standard_scaler",  "scope": { "columns": ["age", "income"] } },
    { "type": "onehot",           "scope": { "columns": ["category"] } },
    { "type": "drop_missing",     "scope": { "columns": [] } }
  ]
}

Available steps:

Category Steps
Missing values impute_mean · impute_median · impute_mode · drop_missing
Outliers clip_iqr · clip_zscore
Scaling standard_scaler · minmax_scaler · robust_scaler
Encoding onehot · ordinal
Structure select_columns · drop_columns · rename_columns · cast_dtype · deduplicate
Custom custom_python (sandboxed via RestrictedPython)

--dry-run runs on the first 1,000 rows and prints a preview without saving any artifact.
--checkpoint saves the DataFrame after each step so you can resume from any point.


dp export

Export a processed dataset to a file or write it back to a database or S3.

# Export to file
dp export --input <run_id> --format parquet --out-path output/result.parquet

# Write back to PostgreSQL (upsert)
dp export --input <run_id> --format parquet \
  --connector pg.json --write-mode upsert --primary-keys id

# Write to S3
dp export --input <run_id> --format csv --connector s3.json

Write modes: replace · append · upsert
Output formats: csv · parquet · json · jsonl · excel · arrow


dp connector

Inspect and interact with any connector directly - without running a full pipeline.

dp connector test     --source postgresql --config pg.json
dp connector schema   --source postgresql --config pg.json --table orders
dp connector upload   --source s3 --config s3.json --src-path data.csv --dest-url s3://bucket/data.csv
dp connector download --source s3 --config s3.json --url s3://bucket/data.csv --out-path ./data.csv
dp connector list     --source s3 --config s3.json --prefix input/
dp connector exec     --source postgresql --config pg.json --sql "DELETE FROM orders WHERE status='cancelled'"
dp connector truncate --source postgresql --config pg.json --table orders
dp connector produce  --source kafka --config kafka.json --topic events --file records.json

dp list

List all artifacts in the store.

dp list
dp list --feature ingest
dp list --limit 50

dp run

Run a full ingest + profile pipeline from a single config file.

dp run pipeline.json

Config format:

{
  "source": "postgresql",
  "connector": { "host": "localhost", "database": "mydb", "user": "u", "password": "p" },
  "query": { "table": "orders" },
  "ingest": { "batch_size": 10000 },
  "profile": { "mode": "full", "correlation": "pearson" }
}

🔌 Connector Configuration

All connectors are configured via JSON files passed with --config.

PostgreSQL / MySQL

{
  "host": "localhost",
  "port": 5432,
  "database": "mydb",
  "user": "myuser",
  "password": "mypassword"
}

S3

{
  "aws_access_key_id": "AKIA...",
  "aws_secret_access_key": "...",
  "region": "us-east-1",
  "bucket": "my-bucket"
}

Kafka

{
  "bootstrap_servers": ["localhost:9092"],
  "group_id": "datapill",
  "value_format": "json",
  "security_protocol": "PLAINTEXT"
}

SASL/SSL is supported - add sasl_mechanism, sasl_username, sasl_password, and ssl_cafile as needed.

REST API

{
  "base_url": "https://api.example.com",
  "headers": { "Authorization": "Bearer <token>" },
  "response_path": "data",
  "pagination": {
    "type": "offset",
    "limit": 100,
    "limit_param": "limit",
    "offset_param": "offset"
  }
}

Pagination modes: offset · cursor · link_header


🗃️ Artifact Store

Every pipeline run produces artifacts - Parquet files and JSON metadata - stored locally and tracked in a registry.

src/datapill/artifacts/
├── registry.json
├── a1b2c3d4_ingest_output.parquet
├── a1b2c3d4_ingest_schema.json
├── e5f6g7h8_profile_detail.json
└── e5f6g7h8_profile_summary.json

You can reference any artifact by its run_id (short 8-char hex) or full artifact_id. datapill resolves ambiguity automatically using feature-aware priority rules.

Change the artifact directory with --out:

dp ingest --source local_file --path data.csv --out /my/artifacts

🏗️ Architecture

datapill/
├── cli/            # Typer CLI - entry point for all commands
├── connectors/     # Source adapters (local, PG, MySQL, S3, REST, Kafka)
├── core/           # PipelineContext, ProgressEvent, FeaturePipeline interface
├── executor/       # Sandboxed code execution (RestrictedPython + Docker)
├── features/
│   ├── ingest/     # Stream ingestion → Parquet artifacts
│   ├── profile/    # Statistical profiling + correlation
│   ├── classify/   # Semantic type classification (rule-based + embedding)
│   ├── preprocess/ # Step-based transformation pipeline
│   └── export/     # File export + DB/S3 write-back + code generation
└── storage/        # ArtifactStore - registry, save/load, resolve

Every feature implements the same FeaturePipeline interface: validate → plan → execute. Pipelines emit async ProgressEvent streams so the CLI can render live progress bars.


🐍 Custom Python Steps (Sandboxed)

You can write arbitrary Python transformation logic and run it safely inside the preprocess pipeline.

# my_transform.py
def transform(df):
    return df.with_columns(
        (pl.col("revenue") / pl.col("units")).alias("avg_price")
    )
{
  "steps": [
    {
      "type": "custom_python",
      "scope": { "columns": [] },
      "params": { "code": "<contents of my_transform.py>", "func": "transform" }
    }
  ]
}

Custom code is validated by an AST analyzer (banned imports, banned builtins, dunder access) before execution. Two sandbox backends are available:

  • RestrictedPython - in-process, low overhead, suitable for most use cases
  • Docker - full container isolation (--network none, read-only FS, memory + CPU limits), for untrusted code

⚡ Code Generation

Export any preprocess config into a self-contained, runnable Python script - no datapill dependency required at runtime.

# Coming soon via: dp pipeline export --config pipeline.json --out-dir generated/

The generated run_pipeline.py includes all step logic inline, a run_pipeline(dry_run=True) entry point, and an optional test_pipeline.py scaffold.


🧑‍💻 Development Setup

git clone https://github.com/your-org/datapill.git
cd datapill
python -m venv .venv
source .venv/bin/activate       # Windows: .venv\Scripts\activate
pip install -e ".[dev]"

For full embedding support in development:

pip install -e ".[ml,dev]"

Run the tests:

pytest
pytest -m "not integration"     # skip tests that require Docker services
pytest --cov=datapill

Lint:

ruff check src/
ruff format src/

🗺️ Roadmap

  • dp pipeline export - generate standalone scripts from any pipeline config
  • Web UI / dashboard for artifact browsing and profile visualization
  • Custom step registry - register and share reusable step plugins
  • datapill SaaS - hosted pipelines, scheduling, collaboration, and monitoring
  • dbt integration - use datapill as a pre-processing layer before dbt models
  • Great Expectations integration - attach data quality assertions to any pipeline step

🤝 Contributing

Contributions are welcome. Please open an issue before submitting a large pull request so we can discuss the approach.

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/my-feature
  3. Make your changes with tests
  4. Run ruff check and pytest before pushing
  5. Open a pull request against main

📄 License

MIT License. See LICENSE for details.


datapill SaaS - hosted pipelines, scheduling, and collaboration - coming soon.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datapill-0.1.7.tar.gz (297.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datapill-0.1.7-py3-none-any.whl (94.5 kB view details)

Uploaded Python 3

File details

Details for the file datapill-0.1.7.tar.gz.

File metadata

  • Download URL: datapill-0.1.7.tar.gz
  • Upload date:
  • Size: 297.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for datapill-0.1.7.tar.gz
Algorithm Hash digest
SHA256 6fcc442ce64e1e2718f637b8cc4ddfbba8b66e6a027bb4b5ba552c7406b6548f
MD5 2b6d3679ac120df0657ef646048999d6
BLAKE2b-256 a4d631e65dabedf3a6d099b91abf8e5fd5a6336d2f1a9db12835ee1e527e46e4

See more details on using hashes here.

File details

Details for the file datapill-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: datapill-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 94.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for datapill-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 875319172dedaea4ce20cfcdfc6e0b6c5ed00cade0a61b7258c018f4ab26c6ac
MD5 f363eae9f887c6b1ddeb063d3412872e
BLAKE2b-256 869075eb84e379ca6eea993c997dc6f5b09c4f7cfb4581144b2e491e1b86c44a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page