Async flows
Project description
Storey
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
In This Document
API Walkthrough
A Storey flow consist of steps linked together by the build_flow
function, each doing it's designated work.
Supported Steps
Input Steps
Source
-AsyncSource
-ReadCSV
-
Processing Steps
Filter
-Map
-FlatMap
-MapWithState
-Batch(max_events, timeout)
- Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.Choice
-JoinWithV3IOTable
-SendToHttp
-AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)
- This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)
- Similar to toAggregateByKey
, but this step is for serving only and does not aggregate the event.WriteToTable(table)
- Persists the data intable
to its associated storage by key.
Output Steps
Complete
-Reduce
-WriteToV3IOStream
-
Usage Examples
Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
from storey import build_flow, Source, Table, V3ioDriver, AggregateByKey, FieldAggregator, WriteToTable
from storey.dtypes import SlidingWindows
v3io_web_api = 'https://webapi.change-me.com'
v3io_acceess_key = '1284ne83-i262-46m6-9a23-810n41f169ea'
table_object = Table('/bigdata/my_features', V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if 'first_activity' not in state:
state['first_activity'] = event.time
event.body['time_since_activity'] = (event.time - state['first_activity']).seconds
state['last_event'] = event.time
event.body['total_activities'] = state['total_activities'] = state.get('total_activities', 0) + 1
return event, state
controller = build_flow([
Source(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object),
WriteToTable(table_object),
WriteToV3IOStream(V3ioDriver(v3io_web_api, v3io_acceess_key), 'features_stream')
]).run()
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
controller = build_flow([
Source(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(['1h','2h', '24h'], '10m')),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(['1h'], '10m'),
aggr_filter=lambda element: element['activity_status'] == 'fail'))],
table_object)
]).run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
storey-0.3.0.tar.gz
(68.7 kB
view details)
Built Distribution
storey-0.3.0-py3-none-any.whl
(78.2 kB
view details)
File details
Details for the file storey-0.3.0.tar.gz
.
File metadata
- Download URL: storey-0.3.0.tar.gz
- Upload date:
- Size: 68.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.6.1 requests/2.25.1 setuptools/51.1.0 requests-toolbelt/0.9.1 tqdm/4.55.0 CPython/3.8.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ba2d63c9b0ae44c37c589906418a679cb026998de3533d4390ef47e9a2f9f686 |
|
MD5 | 22a394db984e2ed7d551d6c908ddc3ee |
|
BLAKE2b-256 | de3fcf4fe3a8e8ba9bcd880630f208597e201863d64fc482aae312fab722b9d8 |
File details
Details for the file storey-0.3.0-py3-none-any.whl
.
File metadata
- Download URL: storey-0.3.0-py3-none-any.whl
- Upload date:
- Size: 78.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.6.1 requests/2.25.1 setuptools/51.1.0 requests-toolbelt/0.9.1 tqdm/4.55.0 CPython/3.8.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3e1711d836dda31d1b3d46fa28e388bec10787e56457874957ecdb0c82449ebc |
|
MD5 | 5d5b32b0945e253fba7148b7f51e2039 |
|
BLAKE2b-256 | b204bf0f057a0b2df09321da016bb486fee3e60a127cb9f99438efeae8481d0a |