A modular data preparation CLI for ingestion, profiling, classification, preprocessing, and export pipelines
Project description
🗂️ Data Pill
A modular, open-source data preparation CLI for developers, data analysts, data scientists, and data engineers.
datapill gives you a single command-line tool to ingest, profile, classify, preprocess, and export data — from any source, to any destination — with a clean pipeline model and full artifact tracking. A hosted SaaS is coming soon.
💡 Why Data Pill?
Most data prep work happens in ad-hoc scripts, notebooks, or half-finished internal tools. datapill replaces that chaos with a reproducible, composable pipeline that works the same way whether you're exploring a CSV on your laptop or processing millions of rows from Kafka in production.
| If you are… | datapill helps you… | |
|---|---|---|
| 🧑💻 | Developer | Build data pipelines from config files, run them in CI, generate standalone Python scripts |
| 📊 | Data Analyst | Profile any dataset in seconds, detect nulls, distributions, and anomalies without writing code |
| 🤖 | Data Scientist | Classify columns by semantic type, preprocess features (scaling, imputation, encoding) with one command |
| ⚙️ | Data Engineer | Stream-ingest from Postgres, MySQL, S3, Kafka, REST APIs; write back with upsert support |
📦 Installation
pip install datapill
Requires Python 3.11+.
This installs the core CLI (~200 MB) with rule-based classification, profiling, preprocessing, and export. No ML model is required.
With embedding support
To use --mode embedding or --mode hybrid in dp classify, install the ML extras (~3.5 GB, includes PyTorch and sentence-transformers):
pip install "datapill[ml]"
Note: Without
[ml],hybridmode still works — it runs rule-based classification for all columns and skips the embedding fallback. Only columns that would otherwise be sent to the embedding model are affected.
Verify the install:
dp --help
🚀 Quick Start
1. Ingest a local CSV
dp ingest --source local_file --path data/sales.csv
2. Profile the dataset
dp profile --input <run_id>
3. Classify columns by semantic type
dp classify --input <run_id> --mode hybrid
4. Preprocess with a pipeline config
dp preprocess --input <run_id> --pipeline pipeline.json
5. Export to Parquet
dp export --input <run_id> --format parquet --out-path output/result.parquet
🛠️ Commands
dp ingest
Stream data from any supported source into the artifact store.
dp ingest --source postgresql --config pg.json --table orders
dp ingest --source s3 --config s3.json --url s3://my-bucket/data.parquet
dp ingest --source kafka --config kafka.json --topic events --max-records 10000
dp ingest --source rest --config api.json --endpoint /users
dp ingest --source local_file --path data/sales.csv --limit 50000
Supported sources: local_file · postgresql · mysql · s3 · rest · kafka
Supported formats (local / S3): csv · parquet · json · jsonl · excel
Options:
| Option | Description |
|---|---|
--source, -s |
Connector type (required) |
--config, -c |
Path to JSON config file for the connector |
--path |
File path (local_file) |
--table |
Table name (postgresql | mysql) |
--url |
S3 URL, e.g. s3://bucket/key.parquet |
--topic |
Kafka topic name |
--endpoint |
REST endpoint, e.g. /users |
--limit, -n |
Max rows to read |
--batch-size |
Rows per batch (default: 50,000) |
--max-records |
Max records to consume (Kafka only) |
--no-materialize |
Skip Parquet write; store connector ref only. Source must remain available for downstream commands. |
--no-materializewarning: Credentials are stored in plaintext inside the artifact store. For Kafka, each downstream command (profile, preprocess, export) will re-consume from the topic — offsets will advance and data may differ between runs.
dp profile
Compute a full statistical profile of any ingested dataset.
dp profile --input <run_id>
dp profile --input <run_id> --mode summary
dp profile --input <run_id> --sample-strategy random --sample-size 100000
dp profile --input <run_id> --correlation spearman
What you get:
- Per-column: null rate, distinct count, min/max/mean/median, std, skewness, kurtosis, percentiles, histogram
- Top value frequencies with percentages
- Pattern detection: email, URL, phone, UUID, ISO date
- Correlation matrix (Pearson or Spearman) for all numeric columns
- Warnings:
HIGH_NULL_RATE,CONSTANT_COLUMN,SKEWED_DISTRIBUTION,HIGH_CARDINALITY,POTENTIAL_IDENTIFIER
Options:
| Option | Description |
|---|---|
--input, -i |
run_id or full artifact ID (required) |
--mode, -m |
full | summary (default: full) |
--sample-strategy |
none | random | reservoir (default: none) |
--sample-size |
Number of rows to sample (default: 100,000) |
--correlation |
pearson | spearman | none (default: pearson) |
dp classify
Classify every column in a dataset by its semantic type — automatically.
dp classify --input <run_id> --mode hybrid
dp classify --input <run_id> --mode rule_based --threshold 0.65
dp classify --input <run_id> --overrides '{"age": "numerical_continuous", "y": "target_label"}'
Modes:
| Mode | How it works | Requires |
|---|---|---|
rule_based |
Regex patterns on column names + dtype heuristics. Fast, zero ML dependencies. | core |
embedding |
Semantic similarity via sentence-transformers (all-MiniLM-L6-v2) against anchor texts per type. |
datapill[ml] |
hybrid |
Rule-based first; embedding kicks in only for ambiguous or unknown columns. | datapill[ml] for full accuracy |
Without
[ml]:hybridmode runs entirely on rule-based logic. Columns that cannot be resolved by rules are returned asunknowninstead of being sent to the embedding model.
Semantic types detected: identifier · numerical_continuous · numerical_discrete · categorical_nominal · categorical_ordinal · text_freeform · text_structured · datetime · boolean · geospatial · embedding · target_label
Options:
| Option | Description |
|---|---|
--input, -i |
run_id or full artifact ID (required) |
--mode, -m |
rule_based | embedding | hybrid (default: hybrid) |
--threshold, -t |
Minimum confidence to accept a classification, 0.0–1.0 (default: 0.0) |
--overrides |
JSON string to force semantic type for specific columns, e.g. '{"col": "boolean"}' |
dp preprocess
Apply a preprocessing pipeline defined in a JSON config file.
dp preprocess --input <run_id> --pipeline pipeline.json
dp preprocess --input <run_id> --pipeline pipeline.json --dry-run
dp preprocess --input <run_id> --pipeline pipeline.json --checkpoint
Pipeline config format:
{
"steps": [
{ "type": "impute_mean", "scope": { "columns": ["age", "income"] } },
{ "type": "clip_iqr", "scope": { "columns": ["income"] } },
{ "type": "standard_scaler", "scope": { "columns": ["age", "income"] } },
{ "type": "onehot", "scope": { "columns": ["category"] } },
{ "type": "drop_missing", "scope": { "columns": [] } }
]
}
Available steps:
| Category | Steps |
|---|---|
| Missing values | impute_mean · impute_median · impute_mode · drop_missing |
| Outliers | clip_iqr · clip_zscore |
| Scaling | standard_scaler · minmax_scaler · robust_scaler |
| Encoding | onehot · ordinal |
| Structure | select_columns · drop_columns · rename_columns · cast_dtype · deduplicate |
| Custom | custom_python (sandboxed via RestrictedPython) |
Options:
| Option | Description |
|---|---|
--input, -i |
run_id or full artifact ID (required) |
--pipeline, -p |
Path to pipeline JSON config file (required) |
--dry-run |
Run on first 1,000 rows, no artifact saved |
--checkpoint |
Save a Parquet checkpoint after each step |
dp export
Export a processed dataset to a file or write it back to a database or S3.
# Export to file
dp export --input <run_id> --format parquet --out-path output/result.parquet
# Write back to PostgreSQL (upsert)
dp export --input <run_id> --format parquet \
--connector pg.json --write-mode upsert --primary-keys id
# Write to S3
dp export --input <run_id> --format csv --connector s3.json
Write modes: replace · append · upsert
Output formats: csv · parquet · json · jsonl · excel
Options:
| Option | Description |
|---|---|
--input, -i |
run_id or full artifact ID (required) |
--format, -f |
Output format (required) |
--out-path |
Output file path (required unless --connector is used) |
--write-mode |
replace | append | upsert (default: replace) |
--primary-keys |
Comma-separated key columns for upsert |
--connector, -c |
Connector config JSON for write-back to DB or S3 |
--dry-run |
Print first 10 rows, skip write |
--compression |
snappy | zstd | gzip (Parquet only) |
--out, -o |
Artifact store directory (default: src/datapill/artifacts) |
dp pipeline export
Generate a standalone Python script from a preprocess pipeline artifact — no datapill dependency required at runtime.
dp pipeline export -i <run_id> -s local_file --path data.csv
dp pipeline export -i <run_id> -s postgresql -c pg.json
dp pipeline export -i <run_id> -s local_file --path data.csv --with-tests
dp pipeline export -i <run_id> --out-dir ./generated
What you get:
- Preprocessing steps reconstructed from the saved config artifact
- Ingest configuration merged into a single self-contained script
- A
run_<name>.pyentry point with a--dry-runflag - An optional
test_<name>.pyscaffold (with--with-tests)
Note:
dp pipeline exportrequires a preprocess artifact saved without--dry-run. If only a dry-run artifact exists, re-rundp preprocesswithout that flag first.
Options:
| Option | Description |
|---|---|
--input, -i |
run_id or preprocess artifact ID (required) |
--source, -s |
Connector type: local_file · postgresql · mysql · s3 (default: local_file) |
--ingest-config, -c |
Connector JSON config (same as dp ingest --config) |
--path |
File path (local_file) |
--table |
Table name (postgresql | mysql) |
--url |
S3 URL |
--format, -f |
Output format (default: parquet) |
--out-path |
Output path hard-coded into the generated script (default: output/result.parquet) |
--name, -n |
Base name for generated files, e.g. orders → run_orders.py |
--compression |
snappy | zstd | gzip (Parquet only) |
--with-tests |
Also generate test_<name>.py |
--out-dir, -o |
Directory to write generated files (default: generated/) |
--store |
Artifact store directory (default: src/datapill/artifacts) |
Generated files:
| File | Description |
|---|---|
run_<name>.py |
Main pipeline script — runs without datapill |
test_<name>.py |
pytest scaffold (only with --with-tests) |
Run the generated pipeline:
python generated/run_<name>.py --dry-run
python generated/run_<name>.py
Run the generated tests:
python -m pytest generated/test_<name>.py -v
dp connector
Inspect and interact with any connector directly — without running a full pipeline.
dp connector test --source postgresql --config pg.json
dp connector schema --source postgresql --config pg.json --table orders
dp connector upload --source s3 --config s3.json --src-path data.csv --dest-url s3://bucket/data.csv
dp connector download --source s3 --config s3.json --url s3://bucket/data.csv --out-path ./data.csv
dp connector list --source s3 --config s3.json --prefix input/
dp connector exec --source postgresql --config pg.json --sql "DELETE FROM orders WHERE status='cancelled'"
dp connector truncate --source postgresql --config pg.json --table orders
dp connector produce --source kafka --config kafka.json --topic events --file records.json
Actions:
| Action | Description | Supported sources |
|---|---|---|
test |
Check connectivity and measure latency | all |
schema |
Inspect column names, types, and nullable flags | all |
upload |
Upload a local file to a destination | s3, local_file |
download |
Download a remote file to local disk | s3 |
list |
List objects under a key prefix | s3 |
exec |
Run an arbitrary SQL statement | postgresql, mysql |
truncate |
Truncate a table | postgresql, mysql |
produce |
Publish records from a JSON or CSV file to a topic | kafka |
dp list
List all artifacts in the store.
dp list
dp list --feature ingest
dp list --limit 50
Options:
| Option | Description |
|---|---|
--feature, -f |
Filter by feature: ingest | profile | preprocess | classify | export |
--limit, -n |
Max number of artifacts to show (default: 20) |
dp run
Run a full ingest + profile pipeline from a single config file.
dp run pipeline.json
Config format:
{
"source": "postgresql",
"connector": { "host": "localhost", "database": "mydb", "user": "u", "password": "p" },
"query": { "table": "orders" },
"ingest": { "batch_size": 10000 },
"profile": { "mode": "full", "correlation": "pearson" }
}
🔌 Connector Configuration
All connectors are configured via JSON files passed with --config.
PostgreSQL / MySQL
{
"host": "localhost",
"port": 5432,
"database": "mydb",
"user": "myuser",
"password": "mypassword"
}
S3
{
"aws_access_key_id": "AKIA...",
"aws_secret_access_key": "...",
"region": "us-east-1",
"bucket": "my-bucket"
}
Kafka
{
"bootstrap_servers": ["localhost:9092"],
"group_id": "datapill",
"value_format": "json",
"security_protocol": "PLAINTEXT"
}
SASL/SSL is supported — add sasl_mechanism, sasl_username, sasl_password, and ssl_cafile as needed.
REST API
{
"base_url": "https://api.example.com",
"headers": { "Authorization": "Bearer <token>" },
"response_path": "data",
"pagination": {
"type": "offset",
"limit": 100,
"limit_param": "limit",
"offset_param": "offset"
}
}
Pagination modes: offset · cursor · link_header
🗃️ Artifact Store
Every pipeline run produces artifacts — Parquet files and JSON metadata — stored locally and tracked in a registry.
By default, artifacts are stored in .datapill/artifacts/ inside your current working directory:
.datapill/artifacts/
├── registry.json
├── a1b2c3d4_ingest_output.parquet
├── a1b2c3d4_ingest_schema.json
├── e5f6g7h8_profile_detail.json
└── e5f6g7h8_profile_summary.json
You can reference any artifact by its run_id (short 8-char hex) or full artifact_id. datapill resolves ambiguity automatically using feature-aware priority rules — for example, dp profile prefers ingest_output over preprocess_output when given only a run_id.
Overriding the artifact directory:
dp export supports --out-path to write the exported file to any location. For all commands, you can override the artifact store directory with the DATAPILL_ARTIFACT_DIR environment variable:
export DATAPILL_ARTIFACT_DIR=/my/artifacts
dp ingest --source local_file --path data.csv
dp profile --input <run_id>
🏗️ Architecture
datapill/
├── cli/ # Typer CLI — entry point for all commands
├── connectors/ # Source adapters (local, PG, MySQL, S3, REST, Kafka)
├── core/ # PipelineContext, ProgressEvent, FeaturePipeline interface
├── executor/ # Sandboxed code execution (RestrictedPython + Docker)
├── features/
│ ├── ingest/ # Stream ingestion → Parquet artifacts
│ ├── profile/ # Statistical profiling + correlation
│ ├── classify/ # Semantic type classification (rule-based + embedding)
│ ├── preprocess/ # Step-based transformation pipeline
│ └── export/ # File export + DB/S3 write-back + code generation
└── storage/ # ArtifactStore — registry, save/load, resolve
Every feature implements the same FeaturePipeline interface: validate → plan → execute. Pipelines emit async ProgressEvent streams so the CLI can render live progress bars.
🐍 Custom Python Steps (Sandboxed)
You can write arbitrary Python transformation logic and run it safely inside the preprocess pipeline.
# my_transform.py
def transform(df):
return df.with_columns(
(pl.col("revenue") / pl.col("units")).alias("avg_price")
)
{
"steps": [
{
"type": "custom_python",
"scope": { "columns": [] },
"params": { "code": "<contents of my_transform.py>", "func": "transform" }
}
]
}
Custom code is validated by an AST analyzer (banned imports, banned builtins, dunder access) before execution. Two sandbox backends are available:
- RestrictedPython — in-process, low overhead, suitable for most use cases
- Docker — full container isolation (
--network none, read-only FS, memory + CPU limits), for untrusted code
🧑💻 Development Setup
git clone https://github.com/your-org/datapill.git
cd datapill
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -e ".[dev]"
For full embedding support in development:
pip install -e ".[ml,dev]"
Run the tests:
pytest
pytest -m "not integration" # skip tests that require Docker services
pytest --cov=datapill
Lint:
ruff check src/
ruff format src/
🗺️ Roadmap
- Web UI / dashboard for artifact browsing and profile visualization
- Custom step registry — register and share reusable step plugins
- datapill SaaS — hosted pipelines, scheduling, collaboration, and monitoring
- dbt integration — use datapill as a pre-processing layer before dbt models
- Great Expectations integration — attach data quality assertions to any pipeline step
🤝 Contributing
Contributions are welcome. Please open an issue before submitting a large pull request so we can discuss the approach.
- Fork the repository
- Create a feature branch:
git checkout -b feature/my-feature - Make your changes with tests
- Run
ruff checkandpytestbefore pushing - Open a pull request against
main
📄 License
MIT License. See LICENSE for details.
datapill SaaS — hosted pipelines, scheduling, and collaboration — coming soon.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file datapill-0.2.6.tar.gz.
File metadata
- Download URL: datapill-0.2.6.tar.gz
- Upload date:
- Size: 275.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5c3c5c3a03935accbb3c55e5dc52db997c6b9990cbf3f5f1efb828b0ca94ce46
|
|
| MD5 |
7ece465f25fac2a2b84e7021bd6e9870
|
|
| BLAKE2b-256 |
3e6bfcb9232c0b9752b6ac5cf3254746a13d3a341bcca00355e71df1132b82c8
|
File details
Details for the file datapill-0.2.6-py3-none-any.whl.
File metadata
- Download URL: datapill-0.2.6-py3-none-any.whl
- Upload date:
- Size: 90.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d1d77c91f05fdd44c7168925039813f004d94c0017ba6887d2c08c43c366b8ee
|
|
| MD5 |
2e3be048d828fe83c2c0ef51384e3773
|
|
| BLAKE2b-256 |
e5e936c656b1b1c49643f672b64665f75ce2defe375e8e084f0cc6c5c9f17050
|