Library for working with file-likes as piped streams
Project description
fPipe
fpipe is a simple framework for creating and running data manipulation pipelines.
The need to cache files on disk between steps becomes problematic when performance is a concern. Unix pipes are well suited for some problems, but become insufficient once things get too complex.
An example is unpacking a tar file from a remote source (e.g. s3/ftp/http) and storing it to another remote store.
One possible solution using fPipe:
import boto3 from fpipe.workflow import WorkFlow from fpipe.gen import S3, Tar from fpipe.file import S3File from fpipe.meta import Path client = boto3.client('s3') resource = boto3.resource('s3') bucket = 'bucket' key = 'source.tar' WorkFlow( S3(client, resource), Tar(), S3( client, resource, process_meta=( lambda x: Path(f"MyPrefix/{x[Path]}"), ), ), ).compose(S3File(bucket, key)).flush()
Installing
Framework is tested with Python 3.6 and above.
brew install python3 # apt, yum, apk... pip3 install fpipe # For aws s3 support you will need boto3 pip3 install boto3
Simple example
Calculates size and md5 of stream, while writing the stream to disk.
from fpipe.file import ByteFile from fpipe.gen import Local, Meta from fpipe.meta import Path, Size, MD5, Stream from fpipe.workflow import WorkFlow workflow = WorkFlow( Local(pass_through=True), Meta(Size, MD5) ) sources = [ ByteFile(b'x' * 10, Path('x.dat')), ByteFile(b'y' * 20, Path('y.dat')) ] for file in workflow.compose(sources): print(f'\n{"-"*46}\n') print("Path name:", file[Path]) print("Stream content: ", file[Stream].read().decode('utf-8')) with open(file[Path]) as f: print("File content:", f.read()) print("Stream md5:", file[MD5]) print("Stream size:", file[Size])
Subprocess script example
Stores original stream, calculates md5, encrypts using cli, stores encrypted file, calculates md5, decrypts and stores decrypted file
from fpipe.file import ByteFile from fpipe.gen import Local, Meta, Program from fpipe.meta import Path, MD5 from fpipe.workflow import WorkFlow workflow = WorkFlow( Meta(MD5), Local(pass_through=True), Program("gpg --batch --symmetric --passphrase 'secret'"), Meta(MD5), Local(pass_through=True, process_meta=lambda x: Path(f'{x[Path]}.gpg')), Program("gpg --batch --decrypt --passphrase 'secret'"), Meta(MD5), Local(pass_through=True, process_meta=lambda x: Path(f'{x[Path]}.decrypted')) ) sources = ( ByteFile(b'x' * 10, Path('x.orig')), ByteFile(b'y' * 20, Path('y.orig')) ) for f in workflow.compose(sources).flush_iter(): print(f'\n{"-"*46}\n') print("Original path:", f[Path, 2]) print("Original md5:", f[MD5, 2], end='\n\n') print("Encrypted path:", f[Path, 1]) print("Encrypted md5:", f[MD5, 1], end='\n\n') print("Decrypted path:", f[Path]) print("Decrypted md5:", f[MD5])
See unittests for more examples
Run tests and verify pypi compatibility
To run tests install tox and twine with pip, go to project root and run tox
# Create virtualenv python3 -m venv .venv # Activate virtualenv source .venv/bin/activate # Run tests tox -e py37 # Build distribution python setup.py sdist bdist_wheel # Validate distribution twine check dist/*
Built With
Contributing
The framework is functional, but in the early stages, so any feedback on alternatives, usefulness, api-design, etc. would be appreciated
See CONTRIBUTING.md
Versioning
License
This project is licensed under the MIT License - see the LICENSE.txt file for details
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.