Skip to main content

No project description provided

Project description

Build Status Coverage Status PyPI

featureflow

featureflow is a python library that allows users to build feature extraction pipelines in a declarative way, and control how and where those features are persisted.

Usage

The following example will compute word frequency in individual text documents, and then over the entire corpus of documents, but featureflow isn’t limited to text data. It’s designed to work well with sequential/streaming data (e.g. audio or video) that is often processed iteratively, in small chunks.

You can see all the code in this example in one place here.

We can define a graph of processing nodes like this:

import featureflow as ff


@ff.simple_in_memory_settings
class Document(ff.BaseModel):
    """
    Define the processing graph needed to extract document-level features,
    whether, and how those features should be persisted.
    """
    raw = ff.ByteStreamFeature(
        ff.ByteStream,
        chunksize=128,
        store=True)

    checksum = ff.JSONFeature(
        CheckSum,
        needs=raw,
        store=True)

    tokens = ff.Feature(
        Tokenizer,
        needs=raw,
        store=False)

    counts = ff.JSONFeature(
        WordCount,
        needs=tokens,
        store=True)

We can define the individual processing “nodes” referenced in the graph above like this:

import featureflow as ff
from collections import Counter
import re
import hashlib

class Tokenizer(ff.Node):
    """
    Tokenize a stream of text into individual, normalized (lowercase)
    words/tokens
    """
    def __init__(self, needs=None):
        super(Tokenizer, self).__init__(needs=needs)
        self._cache = ''
        self._pattern = re.compile('(?P<word>[a-zA-Z]+)\W+')

    def _enqueue(self, data, pusher):
        self._cache += data

    def _dequeue(self):
        matches = list(self._pattern.finditer(self._cache))
        if not matches:
            raise ff.NotEnoughData()
        last_boundary = matches[-1].end()
        self._cache = self._cache[last_boundary:]
        return matches

    def _process(self, data):
        yield map(lambda x: x.groupdict()['word'].lower(), data)


class WordCount(ff.Aggregator, ff.Node):
    """
    Keep track of token frequency
    """
    def __init__(self, needs=None):
        super(WordCount, self).__init__(needs=needs)
        self._cache = Counter()

    def _enqueue(self, data, pusher):
        self._cache.update(data)


class CheckSum(ff.Aggregator, ff.Node):
    """
    Compute the checksum of a text stream
    """
    def __init__(self, needs=None):
        super(CheckSum, self).__init__(needs=needs)
        self._cache = hashlib.sha256()

    def _enqueue(self, data, pusher):
        self._cache.update(data)

    def _process(self, data):
        yield data.hexdigest()

We can also define a graph that will process an entire corpus of stored document features:

import featureflow as ff

@ff.simple_in_memory_settings
class Corpus(ff.BaseModel):
    """
    Define the processing graph needed to extract corpus-level features,
    whether, and how those features should be persisted.
    """
    docs = ff.Feature(
        lambda doc_cls: (doc.counts for doc in doc_cls),
        store=False)

    total_counts = ff.JSONFeature(
        WordCount,
        needs=docs,
        store=True)

Finally, we can execute these processing graphs and access the stored features like this:

from __future__ import print_function
import argparse

def process_urls(urls):
    for url in urls:
        Document.process(raw=url)


def summarize_document(doc):
    return 'doc {_id} with checksum {cs} contains "the" {n} times'.format(
            _id=doc._id,
            cs=doc.checksum,
            n=doc.counts.get('the', 0))


def process_corpus(document_cls):
    corpus_id = Corpus.process(docs=document_cls)
    return Corpus(corpus_id)


def summarize_corpus(corpus):
    return 'The entire text corpus contains "the" {n} times'.format(
        n=corpus.total_counts.get("the", 0))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--url',
        help='specify one or more urls of text files to ingest',
        required=True,
        action='append')
    args = parser.parse_args()

    process_urls(args.url)

    for doc in Document:
        print(summarize_document(doc))

    corpus = process_corpus(Document)
    print(summarize_corpus(corpus))

To see this in action we can:

python wordcount.py \
    --url http://textfiles.com/food/1st_aid.txt \
    --url http://textfiles.com/food/antibiot.txt \
    ...

Installation

Python headers are required. You can install by running:

apt-get install python-dev

Numpy is optional. If you’d like to use it, the Anaconda distribution is highly recommended.

Finally, just

pip install featureflow

Project details


Release history Release notifications

This version
History Node

2.9.0

History Node

2.8.0

History Node

2.7.13

History Node

2.7.12

History Node

2.7.11

History Node

2.7.10

History Node

2.7.9

History Node

2.6.9

History Node

2.5.9

History Node

2.4.9

History Node

2.4.8

History Node

2.4.7

History Node

2.4.6

History Node

2.4.5

History Node

2.4.4

History Node

2.3.4

History Node

2.2.4

History Node

2.2.3

History Node

2.2.1

History Node

2.1.2

History Node

2.1.1

History Node

2.0.1

History Node

2.0.0

History Node

1.21.14

History Node

1.20.14

History Node

1.19.14

History Node

1.17.14

History Node

1.16.14

History Node

1.16.13

History Node

1.16.12

History Node

1.16.11

History Node

1.16.10

History Node

1.15.10

History Node

1.14.10

History Node

1.13.10

History Node

1.12.10

History Node

1.11.10

History Node

1.10.10

History Node

1.9.10

History Node

1.8.10

History Node

1.8.9

History Node

0.8.9

History Node

0.7.9

History Node

0.6.9

History Node

0.5.9

History Node

0.5.8

History Node

0.5.6

History Node

0.5.5

History Node

0.5.4

History Node

0.5.1

History Node

0.4.1

History Node

0.3

History Node

0.2

History Node

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
featureflow-2.9.0.tar.gz (33.3 kB) Copy SHA256 hash SHA256 Source None Jun 26, 2018

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging CloudAMQP CloudAMQP RabbitMQ AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page