Python I/O pipe utilities
Project description
Tubing is a Python I/O library. What makes tubing so freakin’ cool is the gross abuse of the bitwise OR operator (|). Have you ever been writing python code and thought to yourself, “Man, this is great, but I really wish it was a little more like bash.” Welp, we’ve made python a little more like bash.If you are a super lame nerd-kid, you can replace any of the bitwise ORs with the pipe() function and pray we don’t overload any other operators in future versions. If you do avoid the bitwise OR, we don’t know if we want to hang out with you.
Tubing is pretty bare-bones at the moment. We’ve tried to make it easy to add your own functionality. Hopefully you find it not all that unpleasant. There are three sections below for adding sources, pipes and sink. If you do make some additions, think about committing them back upstream. We’d love to have a full suite of tools.
Now, witness the full power of this fully operational I/O library.
from tubing import sources, pipes, sinks
objs = [
dict(
name="Bob Corsaro",
birthdate="08/03/1977",
alignment="evil",
),
dict(
name="Tom Brady",
birthdate="08/03/1977",
alignment="good",
),
]
sources.Objects(objs) \
| pipes.JSONSerializer() \
| pipes.Joined(by=b"\n") \
| pipes.Gzip() \
| sinks.File("output.gz", "wb")
Then in our old friend bash.
$ zcat output.gz
{"alignment": "evil", "birthdate": "08/03/1977", "name": "Bob Corsaro"}
{"alignment": "good", "birthdate": "08/03/1977", "name": "Tom Brady"}
$
We need to seriously think about renaming pipes to tubes.. man, what was I thinking?
Catalog
Sources
Objects |
Takes a list of python objects. |
File |
Creates a stream from a file. |
Bytes |
Creates a stream from a byte string. |
Pipes
Gunzip |
Unzips a binary stream. |
Gzip |
Zips a binary stream. |
JSONParser |
Parses a byte string stream of raw JSON objects. |
JSONSerializer |
Serializes an object stream using json.dumps. |
Split |
Splits a stream that supports the split method. |
Joined |
Joins a stream of the same type as the by argument. |
Debugger |
Proxies stream, writing each chunk to the tubing.pipes debugger with level DEBUG. |
Sinks
Bytes |
Saves each chunk self.results. |
File |
Writes each chunk to a file. |
Debugger |
Writes each chunk to the tubing.pipes debugger with level DEBUG. |
Extensions
s3.S3Source |
Create stream from an S3 object. |
s3.S3Sink |
Stream data to S3 object. |
elasticsearch.BulkSink |
Stream elasticsearch.DocUpdate objects to the elasticsearch _bulk endpoint. |
Sources
To make your own source, create a Reader class with the following interface.
class MyReader(object):
"""
MyReader returns count instances of data.
"""
def __init__(self, data="hello world\n", count=10):
self.data = data
self.count = count
def read(self, amt):
"""
read(amt) returns $amt of data and a boolean indicating EOF.
"""
if not amt:
amt = self.count
r = self.data * min(amt, self.count)
self.count -= amt
return r, self.count <= 0
The important thing to remember is that your read function should return an iterable of units of data, not a single piece of data. Then wrap your reader in the loving embrace of MakeSource.
from tubing import sources
MySource = sources.MakeSource(MyReader)
Now it can be used in a pipeline!
from __future__ import print_function
from tubing import pipes
sink = MySource(data="goodbye cruel world!", count=1) \
| pipes.Joined(by=b"\n") \
| sinks.Bytes()
print(sinks.result)
# Output: goodby cruel world!
Pipes
Making your own pipe is a lot more fun, trust me. First make a Transformer.
class OptimusPrime(object):
def transform(self, chunk):
return list(reversed(chunk))
chunk is an iterable with a len() of whatever type of data the stream is working with. In Transformers, you don’t need to worry about buffer size or closing or exception, just transform an iterable to another iterable. There are lots of examples in pipes.py.
Next give Optimus Prime a hug.
from tubing import pipes
AllMixedUp = pipes.MakePipe(OptimusPrime)
Ready to mix up some data?
from __future__ import print_function
import json
from tubing import sources, sinks
objs = [{"number": i} for i in range(0, 10)]
sink = sources.Objects(objs) \
| AllMixedUp(chunk_size=2) \
| sinks.Objects()
print(json.dumps(sink))
# Output: [{"number": 1}, {"number": 0}, {"number": 3}, {"number": 2}, {"number": 5}, {"number": 4}, {"number": 7}, {"number": 6}, {"number": 9}, {"number": 8}]
Sinks
Really getting tired of making documentation… Maybe I’ll finish later. I have real work to do.
Well.. I’m this far, let’s just push through.
from __future__ import print_function
from tubing import sources, pipes, sinks
class StdoutWriter(object):
def write(self, chunk):
for part in chunk:
print(part)
def close(self):
# this function is optional
print("That's all folks!")
def abort(self):
# this is also optional
print("Something terrible has occurred.")
Debugger = sinks.MakeSink(StdoutWriter)
objs = [{"number": i} for i in range(0, 10)]
sink = sources.Objects(objs) \
| AllMixedUp(chunk_size=2) \
| pipes.JSONSerializer() \
| pipes.Joined(by=b"\n") \
| Debugger()
# Output:
#{"number": 1}
#{"number": 0}
#{"number": 3}
#{"number": 2}
#{"number": 5}
#{"number": 4}
#{"number": 7}
#{"number": 6}
#{"number": 9}
#{"number": 8}
#That's all folks!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file tubing-0.0.2.post115.tar.gz
.
File metadata
- Download URL: tubing-0.0.2.post115.tar.gz
- Upload date:
- Size: 12.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4457e457574f3b9a9c8220fc3006b2809f424de91ef216f449b7a1579d955d9b |
|
MD5 | deb711c140224b9fd8308cbf42767e51 |
|
BLAKE2b-256 | bb67cec6dd916171eee5f869a9139ecaa2e5f96152306a20a96541a0250a27d0 |
File details
Details for the file tubing-0.0.2.post115-py2.py3-none-any.whl
.
File metadata
- Download URL: tubing-0.0.2.post115-py2.py3-none-any.whl
- Upload date:
- Size: 15.5 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0c5347d5516e433a9db6dfef9e954bb5ca9dfd3673b27a1798faa0983d385f5c |
|
MD5 | 8010981e12d4a2d5e0194f828fd5f5d0 |
|
BLAKE2b-256 | fe4da4ba7dfdaf69fbcd79843b3350f1348ad6f3f9c77fbbdf389c9b85706759 |