No project description provided
Project description
StreamIOPipe
A library that allows to process streams using simple user defined functions in a sequence as if you were using pipes.
Installation
pip install streamoipipe
Examples
- Read text from a StringIO and add line numbers
from streamiopipe import *
import sys
import io
@stringiopipe
def add_lineno(outstream,instream):
""" prepend line with an auto increment
"""
lineno = 1
for line in instream:
outstream.write(f"{lineno:02d}:{line}")
lineno+=1
text_in = io.StringIO("""January
February
March
April""")
text_out = io.StringIO()
with StreamIOPipe(filein=text_in,fileout=text_out,text=True) as sp:
sp.run(add_lineno)
print(text_out.getvalue())
- Read text from a StringIO and add line numbers and then remove them
from streamiopipe import *
import sys
import io
@stringiopipe
def add_lineno(outstream,instream):
""" prepend line with an auto increment
"""
lineno = 1
for line in instream:
outstream.write(f"{lineno:02d}:{line}")
lineno+=1
@stringiopipe
def remove_lineno(outstream,instream):
""" prepend line with an auto increment
"""
for line in instream:
outstream.write(line[3:])
text_in = io.StringIO("""January
February
March
April""")
text_out = io.StringIO()
with StreamIOPipe(filein=text_in,fileout=text_out,text=True) as sp:
sp.iterate([(add_lineno,),(remove_lineno,)])
print(text_out.getvalue())
- XOR a binary file from stdin and write to stdout
from streamiopipe import *
import sys
import io
import struct
@byteiopipe
def xor(outstream,instream,value=0xFA):
""" Iterate all bytes and xor them with value
"""
for block in iter(lambda: instream.read(1), b''):
byte = block[0]
out = byte ^ value
outstream.write(struct.pack("=B",out))
# read from stdin, xor and write to stdout
with StreamIOPipe(text=False) as sp:
sp.run(xor,value=0xBC)
- XOR and Rotate Left a file from stdin and write to stdout
from streamiopipe import *
import sys
import io
import struct
@byteiopipe
def xor(outstream,instream,value=0xAA):
""" Iterate all bytes and xor them with value
"""
for block in iter(lambda: instream.read(1), b''):
byte = block[0]
out = byte ^ value
outstream.write(struct.pack("=B",out))
@byteiopipe
def rol(outstream,instream,value=1):
""" Iterate all bytes and rotate left by value
"""
div,mod = divmod(value,8)
rest = 8 - mod
for block in iter(lambda: instream.read(1), b''):
byte = block[0]
out = 255 & (byte << mod | byte >> rest)
outstream.write(struct.pack("=B",out))
# read from stdin, xor and write to stdout
with StreamIOPipe(text=False) as sp:
sp.iterate([(xor,{'value':0xD0}), (rol,{'value':4})])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
streamiopipe-0.1.1.tar.gz
(2.6 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamiopipe-0.1.1.tar.gz.
File metadata
- Download URL: streamiopipe-0.1.1.tar.gz
- Upload date:
- Size: 2.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.11.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c5ffc76cf22d0a4e6aced671e2bc91e47596612ac9872d9fd7295543aa77060
|
|
| MD5 |
581cc9820da5aa03c52f50b9a601b6bb
|
|
| BLAKE2b-256 |
c0b50d63a0cb079aafb2536b7285fea5e47891d22ccba4d9babceca7b410caed
|
File details
Details for the file streamiopipe-0.1.1-py3-none-any.whl.
File metadata
- Download URL: streamiopipe-0.1.1-py3-none-any.whl
- Upload date:
- Size: 3.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.11.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
17fbbc6b3bd98f012e5dddddc3a6888fb749e5e6e63afa50af576fe32c56f98f
|
|
| MD5 |
bf0f19fb0f29af6d5b89b4e0335fa065
|
|
| BLAKE2b-256 |
d16f6df83a4b1e6aa3cc9f1eb7f60b8ea320e3508a75060918b100d829e70a74
|