Async flows
Project description
Storey
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
In This Document
▶ For more information, see the Storey Python package documentation.
API Walkthrough
A Storey flow consist of steps linked together by the build_flow function, each doing it's designated work.
Supported Steps
Input Steps
SyncEmitSourceAsyncEmitSourceCSVSourceParquetSourceDataframeSource
Processing Steps
FilterMapFlatMapMapWithStateBatch(max_events, timeout)- Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.ChoiceJoinWithV3IOTableSendToHttpAggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)- This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)- Similar to toAggregateByKey, but this step is for serving only and does not aggregate the event.NoSqlTarget(table)- Persists the data intableto its associated storage by key.ExtendJoinWithTable
Output Steps
CompleteReduceStreamTargetCSVTargetReduceToDataFrameTSDBTargetParquetTarget
Usage Examples
Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget
from storey.dtypes import SlidingWindows
v3io_web_api = "https://webapi.change-me.com"
v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea"
table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if "first_activity" not in state:
state["first_activity"] = event.time
event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds
state["last_event"] = event.time
event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1
return event, state
controller = build_flow([
SyncEmitSource(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time"),
NoSqlTarget(table_object),
StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream")
]).run()
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
controller = build_flow([
SyncEmitSource(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time")
]).run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file storey-1.3.17.tar.gz.
File metadata
- Download URL: storey-1.3.17.tar.gz
- Upload date:
- Size: 132.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
552e93c1c5e397a319f7b8d02a6a5efcaef0ae1a3ce3788e54c7ecf29e7352c2
|
|
| MD5 |
1e9637972f5a060f4ec55c282a68403f
|
|
| BLAKE2b-256 |
b071a374f6bb8af9375e38c2a148eeff69c03e6037cdaadee64d9dc937d913e4
|
File details
Details for the file storey-1.3.17-py3-none-any.whl.
File metadata
- Download URL: storey-1.3.17-py3-none-any.whl
- Upload date:
- Size: 157.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
437816d72de87b0121e3efa017974b3d849bcdc638d459132d543529624a5fb3
|
|
| MD5 |
38e7114ff34ef06b27da9c1ec495b9f5
|
|
| BLAKE2b-256 |
46dcefc22f9eeddf57b8504b3a0961d15521413ea9605bac9067330c859fbe99
|