Skip to main content

Data ingestion for humans and AI agents — Singer taps to Parquet in cloud buckets, with MCP server and Python SDK

Project description

DataSpoc Pipe

CI PyPI License Python 3.10+

Data ingestion for humans and AI agents. Singer taps to Parquet in cloud buckets.

Why DataSpoc Pipe?

Most data ingestion tools drown you in orchestration complexity. DataSpoc Pipe does one thing well: connect to any of the 400+ Singer taps (databases, APIs, SaaS), convert to Parquet, and land it in your cloud bucket -- cataloged and ready to query. Works from the terminal, from Python, or as an MCP server for AI agents. No DAGs, no servers, no infrastructure.

400+ data sources -- Streaming (no memory limits) -- Zero infrastructure -- < 15 min setup

Installation

pip install dataspoc-pipe

Cloud storage extras:

pip install dataspoc-pipe[s3]      # AWS S3
pip install dataspoc-pipe[gcs]     # Google Cloud Storage
pip install dataspoc-pipe[azure]   # Azure Blob Storage

Singer taps are installed separately:

pip install tap-csv
pip install tap-postgres

Quick Start

1. Initialize

dataspoc-pipe init

Creates ~/.dataspoc-pipe/ with config.yaml, pipelines/, sources/, and transforms/.

2. Install a Singer tap and prepare data

pip install tap-csv

Create /tmp/sample/users.csv:

id,name,email
1,Alice,alice@example.com
2,Bob,bob@example.com
3,Carol,carol@example.com

3. Create a pipeline

dataspoc-pipe add my-first-pipeline

The interactive wizard prompts for tap name, destination bucket, compression, incremental mode, and schedule. Or create ~/.dataspoc-pipe/pipelines/my-first-pipeline.yaml manually:

source:
  tap: tap-csv
  config:
    files:
      - entity: users
        path: /tmp/sample/users.csv
        keys:
          - id

destination:
  bucket: file:///tmp/my-lake
  path: raw
  compression: zstd

incremental:
  enabled: false

4. Validate and run

dataspoc-pipe validate my-first-pipeline
dataspoc-pipe run my-first-pipeline

5. Check results

dataspoc-pipe status
dataspoc-pipe logs my-first-pipeline
dataspoc-pipe manifest file:///tmp/my-lake

Your data is now at /tmp/my-lake/raw/csv/users/dt=2026-03-20/users_0000.parquet.

How It Works

┌─────────────┐    ┌──────────┐  stdout  ┌───────────────┐    ┌──────────────┐
│ Data Source  │───>│ Singer   │─────────>│ DataSpoc Pipe │───>│ Cloud Bucket │
│ (DB, API, …)│    │ Tap      │          │ transform(df) │    │ (S3/GCS/Az)  │
└─────────────┘    └──────────┘          └───────┬───────┘    └──────────────┘
                                                 │
                                          manifest.json
                                           state.json
                                             logs/
  1. Singer tap extracts data from the source, emits JSON on stdout
  2. Pipe reads the stream, buffers in batches (~10K records)
  3. If ~/.dataspoc-pipe/transforms/<pipeline>.py exists, applies transform(df) per batch
  4. Converts to Parquet (zstd) and uploads to bucket
  5. Updates the manifest catalog and saves execution logs

AI Agent Integration

Pipe works as an MCP server for Claude Desktop, Claude Code, Cursor, and any MCP-compatible AI agent.

pip install dataspoc-pipe[mcp]
dataspoc-pipe mcp                     # Start MCP server (stdio)

Add to your Claude Desktop config (claude_desktop_config.json):

{
  "mcpServers": {
    "dataspoc-pipe": {
      "command": "dataspoc-pipe",
      "args": ["mcp"]
    }
  }
}

Your agent can now list pipelines, trigger runs, check status, and read logs.

Python SDK

from dataspoc_pipe import PipeClient

client = PipeClient()
pipelines = client.pipelines()
result = client.run("sales-data")
status = client.status()
log = client.logs("sales-data")

JSON Output

All CLI commands support --output json for machine-readable output:

dataspoc-pipe status --output json
dataspoc-pipe manifest s3://my-bucket --output json

Commands

dataspoc-pipe init                    # Initialize config structure
dataspoc-pipe add <name>              # Create pipeline (interactive wizard)
dataspoc-pipe run <name>              # Run a pipeline
dataspoc-pipe run <name> --full       # Force full extraction (ignore bookmarks)
dataspoc-pipe run _ --all             # Run all pipelines
dataspoc-pipe status                  # Status table for all pipelines
dataspoc-pipe logs <name>             # Last execution log (JSON)
dataspoc-pipe validate [name]         # Test bucket and tap connectivity
dataspoc-pipe manifest <bucket>       # Show data catalog
dataspoc-pipe schedule install        # Install cron jobs
dataspoc-pipe schedule remove         # Remove cron jobs
dataspoc-pipe mcp                    # Start MCP server for AI agents
dataspoc-pipe --version              # Show version

Incremental Extraction

Enable in pipeline YAML:

incremental:
  enabled: true

Pipe saves Singer bookmarks to <bucket>/.dataspoc/state/<pipeline>/state.json. Next run only fetches new data. Use --full to re-extract everything.

Bucket Convention

This is the public contract between Pipe, Lens, and ML. Do not change without versioning.

<bucket>/
  .dataspoc/
    manifest.json                          # Data catalog
    state/<pipeline>/state.json            # Incremental bookmarks
    logs/<pipeline>/<timestamp>.json       # Execution logs
  raw/<source>/<table>/
    dt=YYYY-MM-DD/                         # Hive-style partitioning
      <table>_0000.parquet                 # Data files

Built-in Taps

Tap Source Extra install
parquet Parquet files (local/cloud) None
google-sheets-public Public Google Sheets None

Any Singer-compatible tap works. Run dataspoc-pipe add to see available templates.

Part of the DataSpoc Platform

Product Role
DataSpoc Pipe (this) Ingestion: Singer taps to Parquet in cloud buckets
DataSpoc Lens Virtual warehouse: SQL + Jupyter + AI over your data lake
DataSpoc ML AutoML: train and deploy models from your lake

The bucket is the contract. Pipe writes. Lens reads. ML learns.

Community

License

Apache 2.0 -- free to use, modify, and distribute.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataspoc_pipe-0.2.0.tar.gz (67.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataspoc_pipe-0.2.0-py3-none-any.whl (32.0 kB view details)

Uploaded Python 3

File details

Details for the file dataspoc_pipe-0.2.0.tar.gz.

File metadata

  • Download URL: dataspoc_pipe-0.2.0.tar.gz
  • Upload date:
  • Size: 67.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for dataspoc_pipe-0.2.0.tar.gz
Algorithm Hash digest
SHA256 411be328a7060bcbd74fd89b62295a16cfcb0b5c00d8c2933ac1db9228d0ed98
MD5 3f6e7044f67a6a431fca8618602d9ee3
BLAKE2b-256 d6b12990148e75fad8ad24bf16783f6b323349750a135aa7a5da1984a976d714

See more details on using hashes here.

File details

Details for the file dataspoc_pipe-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dataspoc_pipe-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 32.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for dataspoc_pipe-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 81418c87dc0a9379c7b4bbd4c1bc349af078ece205ad8435158d1339d2630979
MD5 2099d954e47c2c89eb2ca693767b07b5
BLAKE2b-256 f7a71fa91eff6987167de6baad7217d98a48569395e0061a959fdd49d9fd68b6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page