Skip to main content

Experiment in typed pipelining of data between python, shell and various data sources and sinks.

Project description

slonk

Typed data pipelines with operator-overloaded | syntax.

Slonk lets you build data pipelines by chaining stages with Python's | operator. Stages can be plain callables, shell commands, file paths, SQLAlchemy models, or custom handler objects.

Features

  • Pipe operator composition — chain stages naturally with |.
  • Automatic handler inference — strings become path or shell handlers, callables are wrapped based on their signature, SQLAlchemy models become database sources, transforms, or sinks depending on position.
  • Parallel execution — stages run concurrently in threads connected by bounded queues with automatic backpressure (default mode).
  • Free-threaded Python support — detected at runtime; uses threads instead of processes for parallel() when the GIL is disabled.
  • Role-based protocolsSource, Transform, and Sink protocols define the contract for custom handlers.
  • Middleware system — observe pipeline lifecycle events (start, end, error) and custom events without modifying handler code.
  • Built-in middlewareTimingMiddleware, LoggingMiddleware, and StatsMiddleware ship out of the box.
  • Tee/fork — split data to side pipelines with tee().
  • Fan-in — combine multiple sources with merge() (interleaved, concurrent) or cat() (ordered, sequential).
  • Data-parallel processingparallel() distributes work across a thread or process pool with cloudpickle serialisation.

Requirements

  • Python >= 3.14

Installation

pip install slonk

Or with uv:

uv add slonk

Quick start

from slonk import Slonk

# Source -> Transform pipeline
result = (
    Slonk()
    | (lambda: ["hello", "world"])
    | (lambda data: [s.upper() for s in data])
).run()

print(list(result))  # ['HELLO', 'WORLD']

With seed data

result = (
    Slonk()
    | (lambda data: [s + "!" for s in data])
).run(["hi", "there"])

print(list(result))  # ['hi!', 'there!']

Shell commands

result = (
    Slonk()
    | (lambda: ["banana", "apple", "cherry"])
    | "sort"
).run()

print(list(result))  # ['apple', 'banana', 'cherry']

File I/O

# Write to a file and pass data through
pipeline = (
    Slonk()
    | (lambda: ["line 1", "line 2"])
    | "./output.txt"  # PathHandler: write + passthrough
)

Cloud and remote paths are supported via universal-pathlib:

# Read from S3, filter, write to GCS
pipeline = (
    Slonk()
    | "s3://input-bucket/events.csv"
    | (lambda rows: [r for r in rows if "ERROR" in r])
    | "gs://output-bucket/errors.csv"
)

SQLAlchemy integration

Pipe SQLAlchemy models directly -- the handler adapts to its position as Source, Transform, or Sink:

from sqlalchemy import create_engine, Column, String
from sqlalchemy.orm import DeclarativeBase, sessionmaker

class Base(DeclarativeBase):
    pass

class Record(Base):
    __tablename__ = "records"
    id = Column(String, primary_key=True)
    data = Column(String)

engine = create_engine("sqlite:///data.db")
Session = sessionmaker(bind=engine)

# Read -> transform -> write back
pipeline = (
    Slonk(session_factory=Session)
    | Record                     # Source: read all rows
    | (lambda rows: [r.upper() for r in rows])
    | Record                     # Sink: bulk-write transformed rows
)

Middleware

from slonk import Slonk, TimingMiddleware

tm = TimingMiddleware()
pipeline = Slonk()
pipeline.add_middleware(tm)

pipeline |= (lambda: ["a", "b", "c"])
pipeline |= (lambda data: [s.upper() for s in data])
pipeline.run()

print(f"Pipeline took {tm.pipeline_duration:.4f}s")

Data-parallel execution

from slonk import Slonk, parallel

result = (
    Slonk()
    | (lambda: [str(i) for i in range(1000)])
    | parallel(lambda chunk: [s + "!" for s in chunk], workers=4, chunk_size=250)
).run()

Custom handlers

from slonk import SlonkBase

class MyTransform(SlonkBase):
    def process_transform(self, input_data):
        for item in input_data:
            self.emit("processing", {"item": item})
            yield item.upper()

Fan-in: merge and cat

Combine data from multiple pipelines into one stream:

from slonk import Slonk, merge, cat

api  = Slonk() | (lambda: ["api_1", "api_2"])
logs = Slonk() | (lambda: ["log_1", "log_2"])

# merge — interleaved (non-deterministic order, concurrent)
result = (
    Slonk()
    | (lambda: ["upstream"])
    | merge(api, logs)
).run()

sorted(result)  # ['api_1', 'api_2', 'log_1', 'log_2', 'upstream']
# cat — ordered (deterministic, sequential)
result = (
    Slonk()
    | (lambda: ["upstream"])
    | cat(api, logs)
).run()

list(result)  # ['upstream', 'api_1', 'api_2', 'log_1', 'log_2']

Sequential mode

By default pipelines run with parallel execution (each stage in its own thread). Pass parallel=False for sequential execution:

result = pipeline.run(parallel=False)

Development

# Install dev dependencies
make install-dev

# Run tests (includes doctests)
make test

# Lint + type-check
make lint
make typecheck

See CONTRIBUTING.md for full details.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

slonk-0.4.1.tar.gz (103.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

slonk-0.4.1-py3-none-any.whl (31.3 kB view details)

Uploaded Python 3

File details

Details for the file slonk-0.4.1.tar.gz.

File metadata

  • Download URL: slonk-0.4.1.tar.gz
  • Upload date:
  • Size: 103.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slonk-0.4.1.tar.gz
Algorithm Hash digest
SHA256 d156a5e7086a32dd73b51ae7c3c921a56e297aee02455d636b52fc348ff8d8a2
MD5 0cb9d8ce066dad745a751a0f78927bd0
BLAKE2b-256 14a37e9856e36aa33706e01eedee6d38db8dfaeed858021d5816f317f214c352

See more details on using hashes here.

Provenance

The following attestation bundles were made for slonk-0.4.1.tar.gz:

Publisher: release.yml on MattOates/slonk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file slonk-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: slonk-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 31.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slonk-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a70887cf8290d74940cbfe3498526c5fd3b347f0d60c1b10e1016c090895b5f0
MD5 5ce390710241c0d628cdaf7d00a86ab5
BLAKE2b-256 125e2d5c4a81797b1880fe2acd4207bb226cdb46c5abb49c8e7e919839976b7a

See more details on using hashes here.

Provenance

The following attestation bundles were made for slonk-0.4.1-py3-none-any.whl:

Publisher: release.yml on MattOates/slonk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page