Skip to main content

Experiment in typed pipelining of data between python, shell and various data sources and sinks.

Project description

slonk

Typed data pipelines with operator-overloaded | syntax.

Slonk lets you build data pipelines by chaining stages with Python's | operator. Stages can be plain callables, shell commands, file paths, SQLAlchemy models, or custom handler objects.

Features

  • Pipe operator composition — chain stages naturally with |.
  • Automatic handler inference — strings become path or shell handlers, callables are wrapped based on their signature, SQLAlchemy models become database sources, transforms, or sinks depending on position.
  • Parallel execution — stages run concurrently in threads connected by bounded queues with automatic backpressure (default mode).
  • Free-threaded Python support — detected at runtime; uses threads instead of processes for parallel() when the GIL is disabled.
  • Role-based protocolsSource, Transform, and Sink protocols define the contract for custom handlers.
  • Middleware system — observe pipeline lifecycle events (start, end, error) and custom events without modifying handler code.
  • Built-in middlewareTimingMiddleware, LoggingMiddleware, and StatsMiddleware ship out of the box.
  • Tee/fork — split data to side pipelines with tee().
  • Fan-in — combine multiple sources with merge() (interleaved, concurrent) or cat() (ordered, sequential).
  • Data-parallel processingparallel() distributes work across a thread or process pool with cloudpickle serialisation.

Requirements

  • Python >= 3.14

Installation

pip install slonk

Or with uv:

uv add slonk

Quick start

from slonk import Slonk

# Source -> Transform pipeline
result = (
    Slonk()
    | (lambda: ["hello", "world"])
    | (lambda data: [s.upper() for s in data])
).run()

print(list(result))  # ['HELLO', 'WORLD']

With seed data

result = (
    Slonk()
    | (lambda data: [s + "!" for s in data])
).run(["hi", "there"])

print(list(result))  # ['hi!', 'there!']

Shell commands

result = (
    Slonk()
    | (lambda: ["banana", "apple", "cherry"])
    | "sort"
).run()

print(list(result))  # ['apple', 'banana', 'cherry']

File I/O

# Write to a file and pass data through
pipeline = (
    Slonk()
    | (lambda: ["line 1", "line 2"])
    | "./output.txt"  # PathHandler: write + passthrough
)

Cloud and remote paths are supported via universal-pathlib:

# Read from S3, filter, write to GCS
pipeline = (
    Slonk()
    | "s3://input-bucket/events.csv"
    | (lambda rows: [r for r in rows if "ERROR" in r])
    | "gs://output-bucket/errors.csv"
)

SQLAlchemy integration

Pipe SQLAlchemy models directly -- the handler adapts to its position as Source, Transform, or Sink:

from sqlalchemy import create_engine, Column, String
from sqlalchemy.orm import DeclarativeBase, sessionmaker

class Base(DeclarativeBase):
    pass

class Record(Base):
    __tablename__ = "records"
    id = Column(String, primary_key=True)
    data = Column(String)

engine = create_engine("sqlite:///data.db")
Session = sessionmaker(bind=engine)

# Read -> transform -> write back
pipeline = (
    Slonk(session_factory=Session)
    | Record                     # Source: read all rows
    | (lambda rows: [r.upper() for r in rows])
    | Record                     # Sink: bulk-write transformed rows
)

Middleware

from slonk import Slonk, TimingMiddleware

tm = TimingMiddleware()
pipeline = Slonk()
pipeline.add_middleware(tm)

pipeline |= (lambda: ["a", "b", "c"])
pipeline |= (lambda data: [s.upper() for s in data])
pipeline.run()

print(f"Pipeline took {tm.pipeline_duration:.4f}s")

Data-parallel execution

from slonk import Slonk, parallel

result = (
    Slonk()
    | (lambda: [str(i) for i in range(1000)])
    | parallel(lambda chunk: [s + "!" for s in chunk], workers=4, chunk_size=250)
).run()

Custom handlers

from slonk import SlonkBase

class MyTransform(SlonkBase):
    def process_transform(self, input_data):
        for item in input_data:
            self.emit("processing", {"item": item})
            yield item.upper()

Fan-in: merge and cat

Combine data from multiple pipelines into one stream:

from slonk import Slonk, merge, cat

api  = Slonk() | (lambda: ["api_1", "api_2"])
logs = Slonk() | (lambda: ["log_1", "log_2"])

# merge — interleaved (non-deterministic order, concurrent)
result = (
    Slonk()
    | (lambda: ["upstream"])
    | merge(api, logs)
).run()

sorted(result)  # ['api_1', 'api_2', 'log_1', 'log_2', 'upstream']
# cat — ordered (deterministic, sequential)
result = (
    Slonk()
    | (lambda: ["upstream"])
    | cat(api, logs)
).run()

list(result)  # ['upstream', 'api_1', 'api_2', 'log_1', 'log_2']

Sequential mode

By default pipelines run with parallel execution (each stage in its own thread). Pass parallel=False for sequential execution:

result = pipeline.run(parallel=False)

Development

# Install dev dependencies
make install-dev

# Run tests (includes doctests)
make test

# Lint + type-check
make lint
make typecheck

See CONTRIBUTING.md for full details.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

slonk-0.3.0.tar.gz (96.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

slonk-0.3.0-py3-none-any.whl (28.0 kB view details)

Uploaded Python 3

File details

Details for the file slonk-0.3.0.tar.gz.

File metadata

  • Download URL: slonk-0.3.0.tar.gz
  • Upload date:
  • Size: 96.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slonk-0.3.0.tar.gz
Algorithm Hash digest
SHA256 9b7577e2531d21980a50cf58a1bbf41758b68e531c339dceafc96b049e63a325
MD5 5459a5b7932c881609ac0904af91d6a9
BLAKE2b-256 c205de66060e252e76731adfb481f825a91e0d5478f2dd9615119bf82ea9f464

See more details on using hashes here.

Provenance

The following attestation bundles were made for slonk-0.3.0.tar.gz:

Publisher: release.yml on MattOates/slonk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file slonk-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: slonk-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 28.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slonk-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 342834231ea0f47c9a149bf696645fc4d71bbf111440e6eddaa5fbdd230fbbe7
MD5 68bc14431ac1b7d8063a676c98ed6c77
BLAKE2b-256 c5addac83b1263147985a9873998e50dbc9bf40455e4856bc2d00419a2f322d8

See more details on using hashes here.

Provenance

The following attestation bundles were made for slonk-0.3.0-py3-none-any.whl:

Publisher: release.yml on MattOates/slonk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page