Skip to main content

Async flows

Project description

Storey

CI

Storey is an asynchronous streaming library, for real time event processing and feature extraction.

In This Document

▶ For more information, see the Storey Python package documentation.

API Walkthrough

A Storey flow consist of steps linked together by the build_flow function, each doing it's designated work.

Supported Steps

Input Steps

  • SyncEmitSource
  • AsyncEmitSource
  • CSVSource
  • ParquetSource
  • DataframeSource

Processing Steps

  • Filter
  • Map
  • FlatMap
  • MapWithState
  • Batch(max_events, timeout) - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.
  • Choice
  • JoinWithV3IOTable
  • SendToHttp
  • AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None) - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.
  • QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None) - Similar to to AggregateByKey, but this step is for serving only and does not aggregate the event.
  • NoSqlTarget(table) - Persists the data in table to its associated storage by key.
  • Extend
  • JoinWithTable

Output Steps

  • Complete
  • Reduce
  • StreamTarget
  • CSVTarget
  • ReduceToDataFrame
  • TSDBTarget
  • ParquetTarget

Usage Examples

Using Aggregates

The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.

from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget
from storey.dtypes import SlidingWindows

v3io_web_api = 'https://webapi.change-me.com'
v3io_acceess_key = '1284ne83-i262-46m6-9a23-810n41f169ea'
table_object = Table('/projects/my_features', V3ioDriver(v3io_web_api, v3io_acceess_key))

def enrich(event, state):
    if 'first_activity' not in state:
        state['first_activity'] = event.time
    event.body['time_since_activity'] = (event.time - state['first_activity']).seconds
    state['last_event'] = event.time
    event.body['total_activities'] = state['total_activities'] = state.get('total_activities', 0) + 1
    return event, state

controller = build_flow([
    SyncEmitSource(),
    MapWithState(table_object, enrich, group_by_key=True, full_event=True),
    AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
                                    SlidingWindows(['1h','2h', '24h'], '10m')),
                    FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
                                    SlidingWindows(['1h','2h', '24h'], '10m')),
                    FieldAggregator("failed_activities", "activity", ["count"],
                                    SlidingWindows(['1h'], '10m'),
                                    aggr_filter=lambda element: element['activity_status'] == 'fail'))],
                   table_object),
    NoSqlTarget(table_object),
    StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), 'features_stream')
]).run()

We can also create a serving function, which sole purpose is to read data from the feature store and emit it further

controller = build_flow([
    SyncEmitSource(),
    QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
                                           SlidingWindows(['1h','2h', '24h'], '10m')),
                           FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
                                           SlidingWindows(['1h','2h', '24h'], '10m')),
                           FieldAggregator("failed_activities", "activity", ["count"],
                                           SlidingWindows(['1h'], '10m'),
                                           aggr_filter=lambda element: element['activity_status'] == 'fail'))],
                           table_object)
]).run()

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

storey-0.8.9.tar.gz (103.5 kB view details)

Uploaded Source

Built Distribution

storey-0.8.9-py3-none-any.whl (113.2 kB view details)

Uploaded Python 3

File details

Details for the file storey-0.8.9.tar.gz.

File metadata

  • Download URL: storey-0.8.9.tar.gz
  • Upload date:
  • Size: 103.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for storey-0.8.9.tar.gz
Algorithm Hash digest
SHA256 05dba92b3703cb995cbb9f8d5e76c2836de70fd6e855ad35de0f3b2211b191c5
MD5 f12f29b68a391861989236892c42beb3
BLAKE2b-256 1f2d3dfa57e3b9e93afabcfbd959b5349037f7a3ddd1fdb6f4f70caa0fbec946

See more details on using hashes here.

File details

Details for the file storey-0.8.9-py3-none-any.whl.

File metadata

  • Download URL: storey-0.8.9-py3-none-any.whl
  • Upload date:
  • Size: 113.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for storey-0.8.9-py3-none-any.whl
Algorithm Hash digest
SHA256 dc3bc2a56bb9dc3711899a05cb8ef9cbf813668e597ffe73e9dd8dc2747760b1
MD5 87e7a4017f36e2c793e96c38a2cad796
BLAKE2b-256 28583b4fb67b97f6d82c9810f2401b15a4849a3be88734f3c297f20a9524a98f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page