Async flows
Project description
Storey
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
In This Document
▶ For more information, see the Storey Python package documentation.
API Walkthrough
A Storey flow consist of steps linked together by the build_flow
function, each doing it's designated work.
Supported Steps
Input Steps
Source
AsyncSource
ReadCSV
ReadParquet
DataframeSource
Processing Steps
Filter
Map
FlatMap
MapWithState
Batch(max_events, timeout)
- Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.Choice
JoinWithV3IOTable
SendToHttp
AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)
- This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)
- Similar to toAggregateByKey
, but this step is for serving only and does not aggregate the event.WriteToTable(table)
- Persists the data intable
to its associated storage by key.Extend
JoinWithTable
Output Steps
Complete
Reduce
WriteToV3IOStream
WriteToCSV
ReduceToDataFrame
WriteToTSDB
WriteToParquet
Usage Examples
Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
from storey import build_flow, Source, Table, V3ioDriver, AggregateByKey, FieldAggregator, WriteToTable
from storey.dtypes import SlidingWindows
v3io_web_api = 'https://webapi.change-me.com'
v3io_acceess_key = '1284ne83-i262-46m6-9a23-810n41f169ea'
table_object = Table('/projects/my_features', V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if 'first_activity' not in state:
state['first_activity'] = event.time
event.body['time_since_activity'] = (event.time - state['first_activity']).seconds
state['last_event'] = event.time
event.body['total_activities'] = state['total_activities'] = state.get('total_activities', 0) + 1
return event, state
controller = build_flow([
Source(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object),
WriteToTable(table_object),
WriteToV3IOStream(V3ioDriver(v3io_web_api, v3io_acceess_key), 'features_stream')
]).run()
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
controller = build_flow([
Source(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object)
]).run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file storey-0.3.8.tar.gz
.
File metadata
- Download URL: storey-0.3.8.tar.gz
- Upload date:
- Size: 83.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.7.3 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.9.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d83f62b32d1ef4fa3d228e681f4351cfeb713a4dcbecfa55aabfe58d87797db7 |
|
MD5 | 42f5c31c5aa871e8324a4d03e8220acc |
|
BLAKE2b-256 | d5919b2c66983fe5914070187c2a2e20c914074e6814175fb0e8a5ffa3fb66b5 |
File details
Details for the file storey-0.3.8-py3-none-any.whl
.
File metadata
- Download URL: storey-0.3.8-py3-none-any.whl
- Upload date:
- Size: 92.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.7.3 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.9.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3cfd0c35f2f35cbf19947347026899867e652825afd370ffb68281c1b21a4708 |
|
MD5 | 2f70e1c33c0d68dd4d455367bdcc4229 |
|
BLAKE2b-256 | 0a03b756d30db69657004ac64bb3673dbe95e1c59014375aea63d83c9c267be0 |