Skip to main content

No project description provided

Project description

StreamIOPipe

A library that allows to process streams using simple user defined functions in a sequence as if you were using pipes.

Installation

pip install streamoipipe

Examples

  1. Read text from a StringIO and add line numbers
from streamiopipe import *
import sys
import io

@stringiopipe
def add_lineno(outstream,instream):
    """ prepend line with an auto increment
    """
    lineno = 1

    for line in instream:
        outstream.write(f"{lineno:02d}:{line}")
        lineno+=1


text_in = io.StringIO("""January
February
March
April""")

text_out = io.StringIO()
with StreamIOPipe(filein=text_in,fileout=text_out,text=True) as sp:
    sp.run(add_lineno)

print(text_out.getvalue())
  1. Read text from a StringIO and add line numbers and then remove them
from streamiopipe import *
import sys
import io

@stringiopipe
def add_lineno(outstream,instream):
    """ prepend line with an auto increment
    """
    lineno = 1

    for line in instream:
        outstream.write(f"{lineno:02d}:{line}")
        lineno+=1

@stringiopipe
def remove_lineno(outstream,instream):
    """ prepend line with an auto increment
    """

    for line in instream:
        outstream.write(line[3:])



text_in = io.StringIO("""January
February
March
April""")

text_out = io.StringIO()
with StreamIOPipe(filein=text_in,fileout=text_out,text=True) as sp:
    sp.iterate([(add_lineno,),(remove_lineno,)])

print(text_out.getvalue())
  1. XOR a binary file from stdin and write to stdout
from streamiopipe import *
import sys
import io
import struct

@byteiopipe
def xor(outstream,instream,value=0xFA):
    """ Iterate all bytes and xor them with value
    """
    for block in iter(lambda: instream.read(1), b''):
        byte = block[0]
        out = byte ^ value
        outstream.write(struct.pack("=B",out))

# read from stdin, xor and write to stdout
with StreamIOPipe(text=False) as sp:
    sp.run(xor,value=0xBC)
  1. XOR and Rotate Left a file from stdin and write to stdout
from streamiopipe import *
import sys
import io
import struct

@byteiopipe
def xor(outstream,instream,value=0xAA):
    """ Iterate all bytes and xor them with value
    """
    for block in iter(lambda: instream.read(1), b''):
        byte = block[0]
        out = byte ^ value
        outstream.write(struct.pack("=B",out))

@byteiopipe
def rol(outstream,instream,value=1):
    """ Iterate all bytes and rotate left by value
    """
    div,mod = divmod(value,8)
    rest = 8 - mod
    for block in iter(lambda: instream.read(1), b''):
        byte = block[0]
        out = 255 & (byte << mod | byte >> rest)
        outstream.write(struct.pack("=B",out))

# read from stdin, xor and write to stdout
with StreamIOPipe(text=False) as sp:
    sp.iterate([(xor,{'value':0xD0}), (rol,{'value':4})])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamiopipe-0.1.1.tar.gz (2.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamiopipe-0.1.1-py3-none-any.whl (3.0 kB view details)

Uploaded Python 3

File details

Details for the file streamiopipe-0.1.1.tar.gz.

File metadata

  • Download URL: streamiopipe-0.1.1.tar.gz
  • Upload date:
  • Size: 2.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.7

File hashes

Hashes for streamiopipe-0.1.1.tar.gz
Algorithm Hash digest
SHA256 9c5ffc76cf22d0a4e6aced671e2bc91e47596612ac9872d9fd7295543aa77060
MD5 581cc9820da5aa03c52f50b9a601b6bb
BLAKE2b-256 c0b50d63a0cb079aafb2536b7285fea5e47891d22ccba4d9babceca7b410caed

See more details on using hashes here.

File details

Details for the file streamiopipe-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: streamiopipe-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 3.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.7

File hashes

Hashes for streamiopipe-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 17fbbc6b3bd98f012e5dddddc3a6888fb749e5e6e63afa50af576fe32c56f98f
MD5 bf0f19fb0f29af6d5b89b4e0335fa065
BLAKE2b-256 d16f6df83a4b1e6aa3cc9f1eb7f60b8ea320e3508a75060918b100d829e70a74

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page