Skip to main content

Experiment in typed pipelining of data between python, shell and various data sources and sinks.

Project description

slonk

Typed data pipelines with operator-overloaded | syntax.

Slonk lets you build data pipelines by chaining stages with Python's | operator. Stages can be plain callables, shell commands, file paths, SQLAlchemy models, or custom handler objects.

Features

  • Pipe operator composition — chain stages naturally with |.
  • Automatic handler inference — strings become path or shell handlers, callables are wrapped based on their signature, SQLAlchemy models become database sources, transforms, or sinks depending on position.
  • Parallel execution — stages run concurrently in threads connected by bounded queues with automatic backpressure (default mode).
  • Free-threaded Python support — detected at runtime; uses threads instead of processes for parallel() when the GIL is disabled.
  • Role-based protocolsSource, Transform, and Sink protocols define the contract for custom handlers.
  • Middleware system — observe pipeline lifecycle events (start, end, error) and custom events without modifying handler code.
  • Built-in middlewareTimingMiddleware, LoggingMiddleware, and StatsMiddleware ship out of the box.
  • Tee/fork — split data to side pipelines with tee().
  • Data-parallel processingparallel() distributes work across a thread or process pool with cloudpickle serialisation.

Requirements

  • Python >= 3.14

Installation

pip install slonk

Or with uv:

uv add slonk

Quick start

from slonk import Slonk

# Source -> Transform pipeline
result = (
    Slonk()
    | (lambda: ["hello", "world"])
    | (lambda data: [s.upper() for s in data])
).run()

print(list(result))  # ['HELLO', 'WORLD']

With seed data

result = (
    Slonk()
    | (lambda data: [s + "!" for s in data])
).run(["hi", "there"])

print(list(result))  # ['hi!', 'there!']

Shell commands

result = (
    Slonk()
    | (lambda: ["banana", "apple", "cherry"])
    | "sort"
).run()

print(list(result))  # ['apple', 'banana', 'cherry']

File I/O

# Write to a file and pass data through
pipeline = (
    Slonk()
    | (lambda: ["line 1", "line 2"])
    | "./output.txt"  # PathHandler: write + passthrough
)

Cloud and remote paths are supported via universal-pathlib:

# Read from S3, filter, write to GCS
pipeline = (
    Slonk()
    | "s3://input-bucket/events.csv"
    | (lambda rows: [r for r in rows if "ERROR" in r])
    | "gs://output-bucket/errors.csv"
)

SQLAlchemy integration

Pipe SQLAlchemy models directly -- the handler adapts to its position as Source, Transform, or Sink:

from sqlalchemy import create_engine, Column, String
from sqlalchemy.orm import DeclarativeBase, sessionmaker

class Base(DeclarativeBase):
    pass

class Record(Base):
    __tablename__ = "records"
    id = Column(String, primary_key=True)
    data = Column(String)

engine = create_engine("sqlite:///data.db")
Session = sessionmaker(bind=engine)

# Read -> transform -> write back
pipeline = (
    Slonk(session_factory=Session)
    | Record                     # Source: read all rows
    | (lambda rows: [r.upper() for r in rows])
    | Record                     # Sink: bulk-write transformed rows
)

Middleware

from slonk import Slonk, TimingMiddleware

tm = TimingMiddleware()
pipeline = Slonk()
pipeline.add_middleware(tm)

pipeline |= (lambda: ["a", "b", "c"])
pipeline |= (lambda data: [s.upper() for s in data])
pipeline.run()

print(f"Pipeline took {tm.pipeline_duration:.4f}s")

Data-parallel execution

from slonk import Slonk, parallel

result = (
    Slonk()
    | (lambda: [str(i) for i in range(1000)])
    | parallel(lambda chunk: [s + "!" for s in chunk], workers=4, chunk_size=250)
).run()

Custom handlers

from slonk import SlonkBase

class MyTransform(SlonkBase):
    def process_transform(self, input_data):
        for item in input_data:
            self.emit("processing", {"item": item})
            yield item.upper()

Sequential mode

By default pipelines run with parallel execution (each stage in its own thread). Pass parallel=False for sequential execution:

result = pipeline.run(parallel=False)

Development

# Install dev dependencies
make install-dev

# Run tests (includes doctests)
make test

# Lint + type-check
make lint
make typecheck

See CONTRIBUTING.md for full details.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

slonk-0.2.0.tar.gz (92.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

slonk-0.2.0-py3-none-any.whl (26.7 kB view details)

Uploaded Python 3

File details

Details for the file slonk-0.2.0.tar.gz.

File metadata

  • Download URL: slonk-0.2.0.tar.gz
  • Upload date:
  • Size: 92.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slonk-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6abdbbeffe35d983fdee8dc8b6db6915705102cd6e3a0e5bd3d7cea33c6b8ca6
MD5 bdc64e1905b3167e12819cd1e21447e1
BLAKE2b-256 7d387cfc30ac92df78bdbbfd4532c16711712e1a8bc7cc046b2b574f692addb5

See more details on using hashes here.

Provenance

The following attestation bundles were made for slonk-0.2.0.tar.gz:

Publisher: release.yml on MattOates/slonk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file slonk-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: slonk-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 26.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slonk-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 39b8962cfadd16c7feee39ae021c15576a7bb8be78e08ad16e3ca2eea8aab4bb
MD5 e21308175b72fbdaf65c5d111f064ea7
BLAKE2b-256 b6a27ae3a506c183ac08cab3a3ab07379ff66bbdb1d971e0ebb80bb353f6530e

See more details on using hashes here.

Provenance

The following attestation bundles were made for slonk-0.2.0-py3-none-any.whl:

Publisher: release.yml on MattOates/slonk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page