Skip to main content

Library for working with file-likes as piped streams

Project description

Build Status codecov PyPI PyPI - Python Version License: MIT Codacy Badge

fPipe

fpipe is a simple framework for creating and running data manipulation pipelines.

The need to cache files on disk between steps becomes problematic when performance is a concern. Unix pipes are well suited for some problems, but become insufficient once things get too complex.

An example is unpacking a tar file from a remote source (e.g. s3/ftp/http) and storing it to another remote store.

One possible solution using fPipe:

import boto3
from fpipe.workflow import WorkFlow
from fpipe.gen import S3, Tar
from fpipe.file import S3File
from fpipe.meta import Path

client = boto3.client('s3')
resource = boto3.resource('s3')
bucket = 'bucket'
key = 'source.tar'

WorkFlow(
    S3(client, resource),
    Tar(),
    S3(
        client,
        resource,
        process_meta=(
            lambda x: Path(f"MyPrefix/{x[Path]}"),
        ),
    ),
).compose(S3File(bucket, key)).flush()

Installing

Framework is tested with Python 3.6 and above.

brew install python3
# apt, yum, apk...
pip3 install fpipe
# For aws s3 support you will need boto3
pip3 install boto3

Simple example

Calculates size and md5 of stream, while writing the stream to disk.

from fpipe.file import ByteFile
from fpipe.gen import Local, Meta
from fpipe.meta import Path, Size, MD5, Stream
from fpipe.workflow import WorkFlow

workflow = WorkFlow(
    Local(pass_through=True),
    Meta(Size, MD5)
)

sources = [
    ByteFile(b'x' * 10, Path('x.dat')),
    ByteFile(b'y' * 20, Path('y.dat'))
]

for file in workflow.compose(sources):
    print(f'\n{"-"*46}\n')
    print("Path name:", file[Path])
    print("Stream content: ", file[Stream].read().decode('utf-8'))
    with open(file[Path]) as f:
        print("File content:", f.read())
    print("Stream md5:", file[MD5])
    print("Stream size:", file[Size])

Subprocess script example

Stores original stream, calculates md5, encrypts using cli, stores encrypted file, calculates md5, decrypts and stores decrypted file

from fpipe.file import ByteFile
from fpipe.gen import Local, Meta, Program
from fpipe.meta import Path, MD5
from fpipe.workflow import WorkFlow

workflow = WorkFlow(
    Meta(MD5),
    Local(pass_through=True),

    Program("gpg --batch --symmetric --passphrase 'secret'"),
    Meta(MD5),
    Local(pass_through=True, process_meta=lambda x: Path(f'{x[Path]}.gpg')),

    Program("gpg --batch --decrypt --passphrase 'secret'"),
    Meta(MD5),
    Local(pass_through=True, process_meta=lambda x: Path(f'{x[Path]}.decrypted'))
)

sources = (
    ByteFile(b'x' * 10, Path('x.orig')),
    ByteFile(b'y' * 20, Path('y.orig'))
)

for f in workflow.compose(sources).flush_iter():
    print(f'\n{"-"*46}\n')
    print("Original path:", f[Path, 2])
    print("Original md5:", f[MD5, 2], end='\n\n')
    print("Encrypted path:", f[Path, 1])
    print("Encrypted md5:", f[MD5, 1], end='\n\n')
    print("Decrypted path:", f[Path])
    print("Decrypted md5:", f[MD5])

See unittests for more examples

Run tests and verify pypi compatibility

To run tests install tox and twine with pip, go to project root and run tox

# Create virtualenv
python3 -m venv .venv
# Activate virtualenv
source .venv/bin/activate
# Run tests
tox -e py37
# Build distribution
python setup.py sdist bdist_wheel
# Validate distribution
twine check dist/*

Built With

Contributing

The framework is functional, but in the early stages, so any feedback on alternatives, usefulness, api-design, etc. would be appreciated

See CONTRIBUTING.md

Versioning

License

This project is licensed under the MIT License - see the LICENSE.txt file for details

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fpipe-0.0.24.tar.gz (19.2 kB view details)

Uploaded Source

Built Distribution

fpipe-0.0.24-py3-none-any.whl (30.2 kB view details)

Uploaded Python 3

File details

Details for the file fpipe-0.0.24.tar.gz.

File metadata

  • Download URL: fpipe-0.0.24.tar.gz
  • Upload date:
  • Size: 19.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.0.0 requests-toolbelt/0.9.1 tqdm/4.41.1 CPython/3.7.1

File hashes

Hashes for fpipe-0.0.24.tar.gz
Algorithm Hash digest
SHA256 1ef066ccf285103f975cdbc59ba2e106811db9b32876dfa5547491f39d12dae6
MD5 3dcd4524642a001c8e7a50c0f5a246ee
BLAKE2b-256 f7b45c8b8b5ce2ff12522ad29128e511a32df633d0391457afa938acd020188e

See more details on using hashes here.

File details

Details for the file fpipe-0.0.24-py3-none-any.whl.

File metadata

  • Download URL: fpipe-0.0.24-py3-none-any.whl
  • Upload date:
  • Size: 30.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.0.0 requests-toolbelt/0.9.1 tqdm/4.41.1 CPython/3.7.1

File hashes

Hashes for fpipe-0.0.24-py3-none-any.whl
Algorithm Hash digest
SHA256 bd2927a439d95877972be48ad2f45f983775b08e5659a7b1998057b06e6af5e1
MD5 89cf91e9faf4298a09c5550a3b4a9203
BLAKE2b-256 435afe0403a36dbda9c91376226e8ef3e8329b3486dd72d664bd7aafcd7ea5b5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page