Experiment in typed pipelining of data between python, shell and various data sources and sinks.
Project description
slonk
Typed data pipelines with operator-overloaded | syntax.
Slonk lets you build data pipelines by chaining stages with Python's |
operator. Stages can be plain callables, shell commands, file paths,
SQLAlchemy models, or custom handler objects.
Features
- Pipe operator composition — chain stages naturally with
|. - Automatic handler inference — strings become path or shell handlers, callables are wrapped based on their signature, SQLAlchemy models become database sources, transforms, or sinks depending on position.
- Parallel execution — stages run concurrently in threads connected by bounded queues with automatic backpressure (default mode).
- Free-threaded Python support — detected at runtime; uses threads
instead of processes for
parallel()when the GIL is disabled. - Role-based protocols —
Source,Transform, andSinkprotocols define the contract for custom handlers. - Middleware system — observe pipeline lifecycle events (start, end, error) and custom events without modifying handler code.
- Built-in middleware —
TimingMiddleware,LoggingMiddleware, andStatsMiddlewareship out of the box. - Tee/fork — split data to side pipelines with
tee(). - Fan-in — combine multiple sources with
merge()(interleaved, concurrent) orcat()(ordered, sequential). - Data-parallel processing —
parallel()distributes work across a thread or process pool withcloudpickleserialisation.
Requirements
- Python >= 3.14
Installation
pip install slonk
Or with uv:
uv add slonk
Quick start
from slonk import Slonk
# Source -> Transform pipeline
result = (
Slonk()
| (lambda: ["hello", "world"])
| (lambda data: [s.upper() for s in data])
).run()
print(list(result)) # ['HELLO', 'WORLD']
With seed data
result = (
Slonk()
| (lambda data: [s + "!" for s in data])
).run(["hi", "there"])
print(list(result)) # ['hi!', 'there!']
Shell commands
result = (
Slonk()
| (lambda: ["banana", "apple", "cherry"])
| "sort"
).run()
print(list(result)) # ['apple', 'banana', 'cherry']
File I/O
# Write to a file and pass data through
pipeline = (
Slonk()
| (lambda: ["line 1", "line 2"])
| "./output.txt" # PathHandler: write + passthrough
)
Cloud and remote paths are supported via universal-pathlib:
# Read from S3, filter, write to GCS
pipeline = (
Slonk()
| "s3://input-bucket/events.csv"
| (lambda rows: [r for r in rows if "ERROR" in r])
| "gs://output-bucket/errors.csv"
)
SQLAlchemy integration
Pipe SQLAlchemy models directly -- the handler adapts to its position as Source, Transform, or Sink:
from sqlalchemy import create_engine, Column, String
from sqlalchemy.orm import DeclarativeBase, sessionmaker
class Base(DeclarativeBase):
pass
class Record(Base):
__tablename__ = "records"
id = Column(String, primary_key=True)
data = Column(String)
engine = create_engine("sqlite:///data.db")
Session = sessionmaker(bind=engine)
# Read -> transform -> write back
pipeline = (
Slonk(session_factory=Session)
| Record # Source: read all rows
| (lambda rows: [r.upper() for r in rows])
| Record # Sink: bulk-write transformed rows
)
Middleware
from slonk import Slonk, TimingMiddleware
tm = TimingMiddleware()
pipeline = Slonk()
pipeline.add_middleware(tm)
pipeline |= (lambda: ["a", "b", "c"])
pipeline |= (lambda data: [s.upper() for s in data])
pipeline.run()
print(f"Pipeline took {tm.pipeline_duration:.4f}s")
Data-parallel execution
from slonk import Slonk, parallel
result = (
Slonk()
| (lambda: [str(i) for i in range(1000)])
| parallel(lambda chunk: [s + "!" for s in chunk], workers=4, chunk_size=250)
).run()
Custom handlers
from slonk import SlonkBase
class MyTransform(SlonkBase):
def process_transform(self, input_data):
for item in input_data:
self.emit("processing", {"item": item})
yield item.upper()
Fan-in: merge and cat
Combine data from multiple pipelines into one stream:
from slonk import Slonk, merge, cat
api = Slonk() | (lambda: ["api_1", "api_2"])
logs = Slonk() | (lambda: ["log_1", "log_2"])
# merge — interleaved (non-deterministic order, concurrent)
result = (
Slonk()
| (lambda: ["upstream"])
| merge(api, logs)
).run()
sorted(result) # ['api_1', 'api_2', 'log_1', 'log_2', 'upstream']
# cat — ordered (deterministic, sequential)
result = (
Slonk()
| (lambda: ["upstream"])
| cat(api, logs)
).run()
list(result) # ['upstream', 'api_1', 'api_2', 'log_1', 'log_2']
Sequential mode
By default pipelines run with parallel execution (each stage in its own
thread). Pass parallel=False for sequential execution:
result = pipeline.run(parallel=False)
Development
# Install dev dependencies
make install-dev
# Run tests (includes doctests)
make test
# Lint + type-check
make lint
make typecheck
See CONTRIBUTING.md for full details.
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file slonk-0.4.0.tar.gz.
File metadata
- Download URL: slonk-0.4.0.tar.gz
- Upload date:
- Size: 103.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eaa6f05319cd592611dcde3b17cded71e14686e51623aab9e323d58bae2b00cb
|
|
| MD5 |
7c15e9fc0c0c5571abc6e85b4a209a14
|
|
| BLAKE2b-256 |
d85ac25182d1d45af40ff3e4ce89da1961b9cd7fdb4b7f0bbd3359ad5dd1c9c8
|
Provenance
The following attestation bundles were made for slonk-0.4.0.tar.gz:
Publisher:
release.yml on MattOates/slonk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
slonk-0.4.0.tar.gz -
Subject digest:
eaa6f05319cd592611dcde3b17cded71e14686e51623aab9e323d58bae2b00cb - Sigstore transparency entry: 1006948243
- Sigstore integration time:
-
Permalink:
MattOates/slonk@68fce8af750dd8f59bba3c5e3cf4b0975502796f -
Branch / Tag:
refs/heads/main - Owner: https://github.com/MattOates
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@68fce8af750dd8f59bba3c5e3cf4b0975502796f -
Trigger Event:
push
-
Statement type:
File details
Details for the file slonk-0.4.0-py3-none-any.whl.
File metadata
- Download URL: slonk-0.4.0-py3-none-any.whl
- Upload date:
- Size: 31.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf173974b33fb7fa90edaa6d161dbd7f8079559448f5b323757be1e47b75bc09
|
|
| MD5 |
d944f4480e8575b172331ecb9c433ae1
|
|
| BLAKE2b-256 |
deaea61d287afb0d3997b0d0357813da790f4bdccdedc64353d036c4a4390529
|
Provenance
The following attestation bundles were made for slonk-0.4.0-py3-none-any.whl:
Publisher:
release.yml on MattOates/slonk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
slonk-0.4.0-py3-none-any.whl -
Subject digest:
cf173974b33fb7fa90edaa6d161dbd7f8079559448f5b323757be1e47b75bc09 - Sigstore transparency entry: 1006948245
- Sigstore integration time:
-
Permalink:
MattOates/slonk@68fce8af750dd8f59bba3c5e3cf4b0975502796f -
Branch / Tag:
refs/heads/main - Owner: https://github.com/MattOates
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@68fce8af750dd8f59bba3c5e3cf4b0975502796f -
Trigger Event:
push
-
Statement type: