Async flows
Project description
Storey
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
In This Document
API Walkthrough
A Storey flow consist of steps linked together by the build_flow
function, each doing it's designated work.
Supported Steps
Input Steps
Source
-AsyncSource
-ReadCSV
-
Processing Steps
Filter
-Map
-FlatMap
-MapWithState
-Batch(max_events, timeout)
- Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.Choice
-JoinWithV3IOTable
-SendToHttp
-AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)
- This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)
- Similar to toAggregateByKey
, but this step is for serving only and does not aggregate the event.WriteToTable(table)
- Persists the data intable
to its associated storage by key.
Output Steps
Complete
-Reduce
-WriteToV3IOStream
-
Usage Examples
Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
from storey import build_flow, Source, Table, V3ioDriver, AggregateByKey, FieldAggregator, WriteToTable
from storey.dtypes import SlidingWindows
v3io_web_api = 'https://webapi.change-me.com'
v3io_acceess_key = '1284ne83-i262-46m6-9a23-810n41f169ea'
table_object = Table('/bigdata/my_features', V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if 'first_activity' not in state:
state['first_activity'] = event.time
event.body['time_since_activity'] = (event.time - state['first_activity']).seconds
state['last_event'] = event.time
event.body['total_activities'] = state['total_activities'] = state.get('total_activities', 0) + 1
return event, state
controller = build_flow([
Source(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object),
WriteToTable(table_object),
WriteToV3IOStream(V3ioDriver(v3io_web_api, v3io_acceess_key), 'features_stream')
]).run()
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
controller = build_flow([
Source(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object)
]).run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
storey-0.3.1.tar.gz
(68.8 kB
view details)
Built Distribution
storey-0.3.1-py3-none-any.whl
(78.3 kB
view details)
File details
Details for the file storey-0.3.1.tar.gz
.
File metadata
- Download URL: storey-0.3.1.tar.gz
- Upload date:
- Size: 68.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.6.1 requests/2.25.1 setuptools/51.1.0 requests-toolbelt/0.9.1 tqdm/4.55.0 CPython/3.8.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a1cf2de63377f8067e0778a7b44d61729fe3e5e51589a87ba813e6d0162514c6 |
|
MD5 | 64a642aced27f82fbd11e112da6b2597 |
|
BLAKE2b-256 | 4bb5aec3ac2b8a2b300cd91d26a4404286657ad11db221280528fdf0cbbfc6e4 |
File details
Details for the file storey-0.3.1-py3-none-any.whl
.
File metadata
- Download URL: storey-0.3.1-py3-none-any.whl
- Upload date:
- Size: 78.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.6.1 requests/2.25.1 setuptools/51.1.0 requests-toolbelt/0.9.1 tqdm/4.55.0 CPython/3.8.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4030d2252b2492040a93ce7dab4428fcecfe6a945a456855a2c71f5ee435614a |
|
MD5 | a3233e2ae0a9db4ec4248cb3a3137c30 |
|
BLAKE2b-256 | fe935f10451d2002faed41705bde94af6013ca0faf2e64bc633539515b653deb |