Skip to main content

SQL-style query optimization for Unix pipes with bidirectional flow control

Project description

dbbasic-pipe

SQL-style query optimization for Unix pipes

dbbasic-pipe brings bidirectional flow control to Unix pipelines, allowing downstream commands to signal upstream commands to optimize data production—just like SQL query optimizers push predicates down and apply limits early.

The Problem

Traditional Unix pipes are one-way: data flows left-to-right with no way for downstream commands to tell upstream "I only need 100 items" or "stop after the first match."

# Inefficient: cat reads entire 10GB file
cat huge.log | grep ERROR | head -10

The Solution

dbbasic-pipe adds an out-of-band coordination channel via environment variables and Unix sockets, letting commands communicate control messages while data still flows through normal pipes.

# Efficient: pcat stops reading after producing enough matches
dbbasic-pipe pcat huge.log | pfilter ERROR | plimit 10

Installation

pip install dbbasic-pipe

Quick Start

# Create test data
cat > data.json << EOF
{"name": "Alice", "age": 25}
{"name": "Bob", "age": 17}
{"name": "Charlie", "age": 30}
{"name": "David", "age": 15}
{"name": "Eve", "age": 22}
EOF

# Run with coordination (optimized)
dbbasic-pipe pcat data.json | pfilter 'age > 18' | plimit 2

Output:

{"name": "Alice", "age": 25}
{"name": "Charlie", "age": 30}

What happened:

  1. plimit sent backpressure signal: "I only need 2 items"
  2. pfilter received it and forwarded upstream
  3. pcat stopped reading after producing 2 matching records (instead of reading all 5)

Commands

dbbasic-pipe

Wrapper that enables coordination for a pipeline.

dbbasic-pipe <command>

Example:

dbbasic-pipe bash -c 'pcat data.json | pfilter "age > 18" | plimit 10'

pcat <file>

Smart cat - reads files and respects backpressure signals.

pcat data.json
pcat large.log | plimit 100

pfilter '<expression>'

Filter JSON lines by Python expressions.

cat data.json | pfilter 'age > 18'
cat data.json | pfilter 'name == "Alice"'
cat data.json | pfilter 'status == "active" and score > 50'

plimit <count>

Output first N lines and signal backpressure upstream.

cat data.json | plimit 10

How It Works

Architecture

Data flow:     pcat → pipe → pfilter → pipe → plimit
Control flow:  pcat ← socket ← pfilter ← socket ← plimit
                     ↓                       ↓
                     └──→ coordinator ←──────┘

Protocol

  1. dbbasic-pipe sets PIPE_CONTROL_SOCKET environment variable
  2. Starts pipe-coordinator service in background
  3. Commands detect the env var and connect to coordinator socket
  4. Commands exchange JSON control messages:
    • register: Announce presence
    • backpressure: Signal "I only need N more items"
    • complete: Notify when done

Message Format

{"type": "register", "pid": 12345, "command": "pfilter", "predicate": "age > 18"}
{"type": "backpressure", "count": 100}
{"type": "complete", "pid": 12345, "processed": 150, "output": 100}

Standalone Usage

All commands work without coordination too:

# Works fine, just no optimization
pcat data.json | pfilter 'age > 18' | plimit 10

Use Cases

Large Log Files

# Stop after finding 10 errors (doesn't read entire file)
dbbasic-pipe pcat /var/log/huge.log | pfilter 'ERROR' | plimit 10

API Data Processing

With future pcurl command:

# pcurl could add ?limit=100 to API request based on backpressure
dbbasic-pipe pcurl api.com/users | pfilter 'age > 18' | plimit 100

Database Queries

With future psql wrapper:

# psql-smart could build optimized query: WHERE age>18 LIMIT 100
dbbasic-pipe psql-smart -c 'SELECT * FROM users' | pfilter 'age>18' | plimit 100

Comparison to Traditional Pipes

Scenario Traditional With dbbasic-pipe
cat 10GB.log | grep ERROR | head -10 Reads entire 10GB Stops after ~10 matches
cat users.json | filter age>18 | head -100 Processes all records Stops at ~100 matches
API calls Downloads all data Could request ?limit=100

Real-World Inspiration

This approach is inspired by:

  • SQL query optimizers - Push predicates down, apply limits early
  • Reactive Streams - Explicit backpressure protocol
  • Apache Beam/Flink - Distributed query planning
  • Rust iterators - Lazy evaluation with take()
  • PRQL/Kusto - Pipe-style query languages

Development

# Clone repo
git clone https://github.com/askrobots/dbbasic-pipe.git
cd dbbasic-pipe

# Install in development mode
pip install -e .

# Run tests
python -m pytest

Extending

Create your own coordination-aware commands:

#!/usr/bin/env python3
import os, sys, socket, json

ctrl_socket = os.getenv('PIPE_CONTROL_SOCKET')
if ctrl_socket:
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.connect(ctrl_socket)

    # Register
    sock.send(json.dumps({
        'type': 'register',
        'pid': os.getpid(),
        'command': 'my-tool'
    }).encode() + b'\n')

    # Listen for backpressure
    # ... check sock with select()

# Process data via stdin/stdout as normal

Future Ideas

  • pcurl - HTTP client that adds query params based on backpressure
  • psql-smart - Builds optimal SQL queries from downstream filters
  • pgrep - Stops searching after limit reached
  • ptail - Stops tailing after condition met
  • Type system - Commands advertise schema
  • Distributed mode - Coordinator as network service
  • Query optimization - Merge adjacent filters, reorder operations

Contributing

Contributions welcome! Please open issues or PRs at: https://github.com/askrobots/dbbasic-pipe

License

MIT License - see LICENSE file

Author

Dan Quellhorst

Related Projects

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbbasic_pipe-0.1.0.tar.gz (11.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbbasic_pipe-0.1.0-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file dbbasic_pipe-0.1.0.tar.gz.

File metadata

  • Download URL: dbbasic_pipe-0.1.0.tar.gz
  • Upload date:
  • Size: 11.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.1

File hashes

Hashes for dbbasic_pipe-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d780a908d5a0015b2bcf026865eee634f8b78b954ef6b6c448eacd58f8d9e756
MD5 0f38afce88996507f9b001ae1c1be1db
BLAKE2b-256 fafd65543bc30adfaa420b09df6ca10d24a71eaddbce0863f1be7c72b03bae98

See more details on using hashes here.

File details

Details for the file dbbasic_pipe-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dbbasic_pipe-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.1

File hashes

Hashes for dbbasic_pipe-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 786b706b85ccf452df4b5bc59a9dc5a849f85ae33534e6dd45889cc198a6c758
MD5 e4e956d6e1241025d0f5730a09332703
BLAKE2b-256 3ade1973993e333395a6cdbd446af1ac691e04c62b939c312e6556da0d596e3b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page