Async flows
Project description
Storey
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
In This Document
API Walkthrough
A Storey flow consist of steps linked together by the build_flow
function, each doing it's designated work.
Supported Steps
Input Steps
Source
-AsyncSource
-ReadCSV
-
Processing Steps
Filter
-Map
-FlatMap
-MapWithState
-Batch(max_events, timeout)
- Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.Choice
-JoinWithV3IOTable
-SendToHttp
-AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)
- This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)
- Similar to toAggregateByKey
, but this step is for serving only and does not aggregate the event.WriteToTable(table)
- Persists the data intable
to its associated storage by key.
Output Steps
Complete
-Reduce
-WriteToV3IOStream
-
Usage Examples
Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
from storey import build_flow, Source, Table, V3ioDriver, AggregateByKey, FieldAggregator, WriteToTable
from storey.dtypes import SlidingWindows
v3io_web_api = 'https://webapi.change-me.com'
v3io_acceess_key = '1284ne83-i262-46m6-9a23-810n41f169ea'
table_object = Table('/bigdata/my_features', V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if 'first_activity' not in state:
state['first_activity'] = event.time
event.body['time_since_activity'] = (event.time - state['first_activity']).seconds
state['last_event'] = event.time
event.body['total_activities'] = state['total_activities'] = state.get('total_activities', 0) + 1
return event, state
controller = build_flow([
Source(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object),
WriteToTable(table_object),
WriteToV3IOStream(V3ioDriver(v3io_web_api, v3io_acceess_key), 'features_stream')
]).run()
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
controller = build_flow([
Source(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object)
]).run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
storey-0.1.0.tar.gz
(58.1 kB
view details)
Built Distribution
storey-0.1.0-py3-none-any.whl
(62.6 kB
view details)
File details
Details for the file storey-0.1.0.tar.gz
.
File metadata
- Download URL: storey-0.1.0.tar.gz
- Upload date:
- Size: 58.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.8.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3fc6ffc3b56d39a09e63b55fef6fb813145f09f3923efed2d1a5f5c5268cefe7 |
|
MD5 | 1cca2dce0055ce958b2e034dc7e5a53e |
|
BLAKE2b-256 | f9b15e1bd06105caed26bbc90651151d9f5b6c870b6aaf6e247cf0c01eba9c09 |
File details
Details for the file storey-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: storey-0.1.0-py3-none-any.whl
- Upload date:
- Size: 62.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.8.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6d2e0737db89a9ea749f5c448deb00c3fb59958bf577449b15a6b704ad775414 |
|
MD5 | 0cf0bfbf75dec946181a1927cefc448b |
|
BLAKE2b-256 | e2197ae46c7fc891778ebe94bd1384a90f95fb0dc88517c5e56324587785508a |