No project description provided
Project description
Python Stateful Stream Processing Framework
Bytewax is a Python framework that simplifies event and stream processing. Because Bytewax couples the stream and event processing capabilities of Flink, Spark, and Kafka Streams with the friendly and familiar interface of Python, you can re-use the Python libraries you already know and love. Connect data sources, run stateful transformations and write to various different downstream systems with built-in connectors or existing Python libraries.
How it all works
Bytewax is a Python framework and Rust distributed processing engine that uses a dataflow computational model to provide parallelizable stream processing and event processing capabilities similar to Flink, Spark, and Kafka Streams. You can use Bytewax for a variety of workloads from moving data à la Kafka Connect style all the way to advanced online machine learning workloads. Bytewax is not limited to streaming applications but excels anywhere that data can be distributed at the input and output.
Bytewax has an accompanying command line interface, waxctl, which supports the deployment of dataflows on cloud servers or Kubernetes. You can download it here.
Getting Started with Bytewax
pip install bytewax
Dataflow, Input and Operators
A Bytewax dataflow is Python code that will represent an input, a series of processing steps, and an output. The inputs could range from a Kafka stream to a WebSocket and the outputs could vary from a data lake to a key-value store.
import json
from bytewax import operators as op
from bytewax.connectors.kafka import operators as kop
from bytewax.dataflow import Dataflow
Bytewax has input and output helpers for common input and output data sources but you can also create your own with the Sink and Source API.
At a high-level, the dataflow compute model is one in which a program execution is conceptualized as data flowing through a series of operator-based steps. Operators like map
and filter
are the processing primitives of Bytewax. Each of them gives you a “shape” of data transformation, and you give them regular Python functions to customize them to a specific task you need. See the documentation for a list of the available operators
BROKERS = ["localhost:19092"]
IN_TOPICS = ["in_topic"]
OUT_TOPIC = "out_topic"
ERR_TOPIC = "errors"
def deserialize(kafka_message):
return json.loads(kafka_message.value)
def anonymize_email(event_data):
event_data["email"] = "@".join(["******", event_data["email"].split("@")[-1]])
return event_data
def remove_bytewax(event_data):
return "bytewax" not in event_data["email"]
flow = Dataflow("kafka_in_out")
stream = kop.input("inp", flow, brokers=BROKERS, topics=IN_TOPICS)
# we can inspect the stream coming from the kafka topic to view the items within on std out for debugging
op.inspect("inspect-oks", stream.oks)
# we can also inspect kafka errors as a separate stream and raise an exception when one is encountered
errs = op.inspect("errors", stream.errs).then(op.raises, "crash-on-err")
deser_msgs = op.map("deserialize", stream.oks, deserialize)
anon_msgs = op.map("anon", deser_msgs, anonymize_email)
filtered_msgs = op.filter("filter_employees", anon_msgs, remove_bytewax)
processed = op.map("map", anon_msgs, lambda m: KafkaSinkMessage(None, json.dumps(m)))
# and finally output the cleaned data to a new topic
kop.output("out1", processed, brokers=BROKERS, topic=OUT_TOPIC)
Windowing, Reducing and Aggregating
Bytewax is a stateful stream processing framework, which means that some operations remember information across multiple events. Windows and aggregations are also stateful, and can be reconstructed in the event of failure. Bytewax can be configured with different state recovery mechanisms to durably persist state in order to recover from failure.
There are multiple stateful operators available like reduce
, stateful_map
and fold_window
. The complete list can be found in the API documentation for all operators. Below we use the fold_window
operator with a tumbling window based on system time to gather events and calculate the number of times events have occurred on a per-user basis.
from datetime import datetime, timedelta, timezone
from bytewax.dataflow import Dataflow
import bytewax.operators.window as win
from bytewax.operators.window import EventClockConfig, TumblingWindow
from bytewax.testing import TestingSource
flow = Dataflow("window_eg")
src = [
{"user_id": "123", "value": 5, "time": "2023-1-1T00:00:00Z"},
{"user_id": "123", "value": 7, "time": "2023-1-1T00:00:01Z"},
{"user_id": "123", "value": 2, "time": "2023-1-1T00:00:07Z"},
]
inp = op.input("inp", flow, TestingSource(src))
keyed_inp = op.key_on("keyed_inp", inp, lambda x: x["user_id"])
# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
# Note that the datetime MUST be UTC. If the datetime is using a different
# representation, we would have to convert it here.
def get_event_time(event):
return datetime.fromisoformat(event["time"])
# Configure the `fold_window` operator to use the event time.
clock = EventClockConfig(get_event_time, wait_for_system_duration=timedelta(seconds=10))
# And a 5 seconds tumbling window
align_to = datetime(2023, 1, 1, tzinfo=timezone.utc)
windower = TumblingWindow(align_to=align_to, length=timedelta(seconds=5))
five_sec_buckets = win.collect_window("five_sec_buckets", keyed_inp, clock, windower)
def calc_avg(bucket):
values = [event["value"] for event in bucket]
if len(values) > 0:
return sum(values) / len(values)
else:
return None
five_sec_avgs = op.map_value("avg_in_bucket", five_sec_buckets, calc_avg)
Merges and Joins
Merging or Joining multiple input streams is a common task for stream processing, Bytewax enables different types of joins to facilitate different patters.
Merging Streams
Merging streams is like concatenating, there is no logic and the resulting stream will potentially include heterogenous records.
from bytewax import operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
flow = Dataflow("merge")
src_1 = [
{"user_id": "123", "name": "Bumble"},
]
inp1 = op.input("inp1", flow, TestingSource(src_1))
src_2 = [
{"user_id": "123", "email": "bee@bytewax.com"},
{"user_id": "456", "email": "hive@bytewax.com"},
]
inp2 = op.input("inp2", flow, TestingSource(src_2))
merged_stream = op.merge("merge", inp1, inp2)
op.inspect("debug", merged_stream)
Joining Streams
Joining streams is different than merging because it uses logic to join the records in the streams together. The joins in Bytewax can be running or not. A regular join in streaming is more closely related to an inner join in SQL in that the dataflow will emit data downstream from a join when all of the sides of the join have matched on the key.
from bytewax import operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
flow = Dataflow("join")
src_1 = [
{"user_id": "123", "name": "Bumble"},
]
inp1 = op.input("inp1", flow, TestingSource(src_1))
keyed_inp_1 = op.key_on("key_stream_1", inp1, lambda x: x["user_id"])
src_2 = [
{"user_id": "123", "email": "bee@bytewax.com"},
{"user_id": "456", "email": "hive@bytewax.com"},
]
inp2 = op.input("inp2", flow, TestingSource(src_2))
keyed_inp_2 = op.key_on("key_stream_2", inp2, lambda x: x["user_id"])
merged_stream = op.join("join", keyed_inp_1, keyed_inp_2)
op.inspect("debug", merged_stream)
Output
Output in Bytewax is described as a sink and these are grouped into connectors. There are a number of basic connectors in the bytewax repo to help you during development. In addition to the built-in connectors, it is possible to use the input and output API to build a custom sink and source. There is also a hub for connectors built by the community, partners and Bytewax. Below is an example of a custom connector for Postgres using the psycopg2 library.
% skip: next
import psycopg2
from bytewax import operators as op
from bytewax.outputs import FixedPartitionedSink, StatefulSinkPartition
class PsqlSink(StatefulSinkPartition):
def __init__(self):
self.conn = psycopg2.connect("dbname=website user=bytewax")
self.conn.set_session(autocommit=True)
self.cur = self.conn.cursor()
def write_batch(self, values):
query_string = """
INSERT INTO events (user_id, data)
VALUES (%s, %s)
ON CONFLICT (user_id)
DO UPDATE SET data = %s;
"""
self.cur.execute_values(query_string, values)
def snapshot(self):
pass
def close(self):
self.conn.close()
class PsqlOutput(FixedPartitionedSink):
def list_parts(self):
return ["single"]
def build_part(step_id, for_part, resume_state):
return PsqlSink()
Execution
Bytewax dataflows can be executed in a single Python process, or on multiple processes on multiple hosts with multiple worker threads. When processing data in a distributed fashion, Bytewax uses routing keys to ensure your state is updated in a correct way automatically.
# a single worker locally
python -m bytewax.run my_dataflow:flow
# Start two worker threads in a single process.
python -m bytewax.run my_dataflow -w 2
# Start a process on two separate machines to form a Bytewax cluster.
# Start the first process with two worker threads on `machine_one`.
machine_one$ python -m bytewax.run my_dataflow -w 2 -i0 -a "machine_one:2101;machine_two:2101"
# Then start the second process with three worker threads on `machine_two`.
machine_two$ python -m bytewax.run my_dataflow -w 3 -i1 -a "machine_one:2101;machine_two:2101"
It can also be run in a Docker container as described further in the documentation.
Kubernetes
The recommended way to run dataflows at scale is to leverage the kubernetes ecosystem. To help manage deployment, we built waxctl, which allows you to easily deploy dataflows that will run at huge scale across multiple compute nodes.
waxctl df deploy my_dataflow.py --name my-dataflow
Why Bytewax?
At a high level, Bytewax provides a few major benefits:
- The operators in Bytewax are largely “data-parallel”, meaning they can operate on independent parts of the data concurrently.
- Bytewax offers the ability to express higher-level control constructs, like iteration.
- Bytewax allows you to develop and run your code locally, and then easily scale that code to multiple workers or processes without changes.
- Bytewax can be used in both a streaming and batch context
- Ability to leverage the Python ecosystem directly
Community
Slack Is the main forum for communication and discussion.
GitHub Issues is reserved only for actual issues. Please use the slack community for discussions.
Usage
Install the latest release with pip:
pip install bytewax
Building From Source
To build a specific branch, you will need to use Maturin and have Rust installed on your machine. Once those have been installed run
maturin develop -E dev
Important: If you are testing with a maturin built version from source, you should use maturin build --release
since maturin develop
will be slower.
More Examples
For a more complete example, and documentation on the available operators, check out the User Guide and the /examples folder.
License
Bytewax is licensed under the Apache-2.0 license.
Contributing
Contributions are welcome! This community and project would not be what it is without the contributors. All contributions, from bug reports to new features, are welcome and encouraged. Please view the contribution guidelines before getting started.
With ❤️ Bytewax
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distributions
Hashes for bytewax-0.19.0-cp311-none-win_amd64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2dc3b820f67d9b37744309f948b7409de2c88d2ca05d28f219d8dcca882c5936 |
|
MD5 | 8f3ea53966dcbfc7c163f72814d486b0 |
|
BLAKE2b-256 | 6d41d5aff4cae8417360e93cb644862fe78524cadb4a6823cc3a7c5edb16e2fd |
Hashes for bytewax-0.19.0-cp311-cp311-manylinux_2_31_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bf0f18d2586d7f13cfa776c57a85bd0b61620e4a497d818958a5f40124c07603 |
|
MD5 | f139d0958a7bbaf7097a35c1d1e287e4 |
|
BLAKE2b-256 | 24d439434f2b3957697b7f3b7cad2d06687c557da6e7b45dfa1f3f835988ad5f |
Hashes for bytewax-0.19.0-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3888b6171fb5340a7f09c9bc0f21193cdf915194238da9425d99c5eb184c9f19 |
|
MD5 | b7486141a38f686766c2c439b1ab868d |
|
BLAKE2b-256 | 37ceaac16fb8b94e1894b271912650d00801b57f61f62c9f2ac960034e9d5f1d |
Hashes for bytewax-0.19.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2250faf626267eb59f14544d3ceafd4a80b4d03b30b87febdb4c5e219ada6ff7 |
|
MD5 | 578049d81ba7ecbb42a5d2111a130734 |
|
BLAKE2b-256 | 85175194cf856cdd4616158fe044ce72a346cb4396f1cf4fd278173213b55f98 |
Hashes for bytewax-0.19.0-cp311-cp311-macosx_11_0_arm64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 98de3312cec3ff74689be8c9a03e5ca7cc8d81f27277c5d5383224f32c634edc |
|
MD5 | ee785abb9defc074d3c4ecafcba41f21 |
|
BLAKE2b-256 | b11923a8d7db34fea69a40bbc08a445e31d64baa2d5448d2688757d49094d890 |
Hashes for bytewax-0.19.0-cp311-cp311-macosx_10_12_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d3422f7d7a22cb8bef82db211c8f0a19f162909f5da6b11c05271b4de1558f67 |
|
MD5 | 3060b49307f50126963e32835b282fac |
|
BLAKE2b-256 | fbdfae6194909e5aa7c20b16f9131d44999d592a15a18e1c7e341a9500e62be0 |
Hashes for bytewax-0.19.0-cp310-none-win_amd64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c3fe13d7bfda80625f4c2543f19f3d5d916ff2fe1741340062ff3ee26937c1a9 |
|
MD5 | 5a9a820b0d2ee03ba88d3f06be2ed726 |
|
BLAKE2b-256 | d7d910a0275e2f86741fc8de31311d79b5d3ed99fd956f7a88dba210e2d17256 |
Hashes for bytewax-0.19.0-cp310-cp310-manylinux_2_31_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 42726a2cdf238f5d9baf3e54682de86fc1897e75b170d9d5e922a41e3f45779c |
|
MD5 | 6efcbfa2dc700e6ca7c07a724c42a4f5 |
|
BLAKE2b-256 | 9f2407c20cfaf5c789f2c17cadf88520f4376111d0498f7c273a67f9cdfd1faf |
Hashes for bytewax-0.19.0-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1d72335bbbb7cae4b1404538d71afc9c7f8ab9790e50b650bbef42572ce12f26 |
|
MD5 | 8d8005af357bffbe65a3d6d61918f7f6 |
|
BLAKE2b-256 | 651668c12b1ca9860ff0e4bf5ed1e9c98fa819914932d42c5479c46ff91cdfd8 |
Hashes for bytewax-0.19.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3c05080cd35fda9114aae6ea04886546611d95d084f8bf1227363ef96b0aabe8 |
|
MD5 | f62955f65ef884f28bb5889cfe8cce52 |
|
BLAKE2b-256 | 2829b6cc64ac45f4512206fd95064362994084224f6ddc481b134a737cf4fad4 |
Hashes for bytewax-0.19.0-cp310-cp310-macosx_11_0_arm64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9c377ddfb0316ed0abd66737cef678d00596e932708cf8777cefc23313f435f8 |
|
MD5 | add1edea657ac428817e84dcc21c208e |
|
BLAKE2b-256 | 0b96fcf7523737da0e51447433361e9013ca48cce5a2f58879de27028b9e6500 |
Hashes for bytewax-0.19.0-cp310-cp310-macosx_10_12_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 523b50f83eecd62be62d14b4036cbd08326b97d148521a566bcbd72baf6580cf |
|
MD5 | 628d8cb538a8dc32448f0fc8845473b3 |
|
BLAKE2b-256 | 125af68c9639d003d403fba194b2e4d360ea6c01ac16433b21fa27a8e0a779ba |
Hashes for bytewax-0.19.0-cp39-none-win_amd64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9f36c6bca2872d6fa71a4144a62e096b1a77b526ff5cae466be9b5fd2699b20d |
|
MD5 | d4d19a99d5195b8df600fb9e7b37acf9 |
|
BLAKE2b-256 | c2ff61097423e7f752f07e4a32c661df164b13d5f3ab85af83e6e961d6955e29 |
Hashes for bytewax-0.19.0-cp39-cp39-manylinux_2_31_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9f97e354b37dc3bdb8069740617c53be55e62f6c5ed015300b387004c1432011 |
|
MD5 | 3d96ce0fcbef8eca84bdccfa29b4afb8 |
|
BLAKE2b-256 | 94662a29e69aefee3a61207fde1d2b2b8f79486ae1d8f4278b75740c074e8754 |
Hashes for bytewax-0.19.0-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f113ac96636a7485d8c8f9cadc40461023f8b8feefff1129573fb82a3ab12e34 |
|
MD5 | 9f8c699e1fb01dec46c67d4c6b6912ce |
|
BLAKE2b-256 | be6ccbac43cdae0f21c5488d85110feb23d81da7ec02ca65e2c848d4fd319df5 |
Hashes for bytewax-0.19.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 19b610440dc6a04045915c88ccabe09596518251538994ab577fd01ce00837be |
|
MD5 | 1bddd3e7fa27c5667ce8652818a45799 |
|
BLAKE2b-256 | c91aeb0ba1b6296ac6298f3915f48f8b74564e8b316d7a3d6edab5baf835061a |
Hashes for bytewax-0.19.0-cp39-cp39-macosx_11_0_arm64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e4b99bfd2c0b283ee35fd3ba76f850bc61544f1bf757c7b4328034678b8cf182 |
|
MD5 | 7ad8b6aab79a77a1e00a11bf34915087 |
|
BLAKE2b-256 | 21d08704586ed1cfdf7ef1f6b943bd52187f4330a33a686dc70fd850ece9966b |
Hashes for bytewax-0.19.0-cp39-cp39-macosx_10_12_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c830edfe6120417bf25d0bc0424ed266c965ee29c01929d090b2b5078194abb8 |
|
MD5 | f6e4e7b1c6d5139412b1456ca950f91a |
|
BLAKE2b-256 | 663b28dfd255ea7fea95f30b9bb892e70a50d21d7a51785607601e692b579e41 |
Hashes for bytewax-0.19.0-cp38-none-win_amd64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bb897502481c80309ecfb8a4b95e8910c00cee1f4ce7647b44e92f327dc47ef0 |
|
MD5 | 4c5cf7860036d963839613c097b5661e |
|
BLAKE2b-256 | d438094fbe245c09c11cb2ce3c81372becfd91f69a5458c792e9a07b0471236f |
Hashes for bytewax-0.19.0-cp38-cp38-manylinux_2_31_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e73a6a843e3d088b58150e2b0ab55b19f8aaa40aeab641423a2cd549e825bfc9 |
|
MD5 | 3fea5d152781218754c2eddfec5c4429 |
|
BLAKE2b-256 | 1e1feac1c8ea29bf90c121d5643b2087dad1f4f3279570b85031e8274ccb9702 |
Hashes for bytewax-0.19.0-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 033fc77fc47b8bc1ef863a30427b70332cc6602cf45594897626087835105e97 |
|
MD5 | 41116c704f3ab662cdbca2342fe19111 |
|
BLAKE2b-256 | c6177a22afeb6583ca4dfa13735d04da222ff45b092218fd2f15c4f4f41ab768 |
Hashes for bytewax-0.19.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 62ea3503cba35821922fb13f66a5dc0f446788fbe56d76a7668abb5c73152fb2 |
|
MD5 | aa98d2da1cb1d7772a567dc59a9ec8fb |
|
BLAKE2b-256 | 4663f09033707fa034aa779d34d3786a4b926dca460b8e97e1a1f9232348d0c8 |
Hashes for bytewax-0.19.0-cp38-cp38-macosx_11_0_arm64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0cef48e342e8939eed4e30791e844c352e12a5b894442b6fcb00b9973e42b52a |
|
MD5 | 13417d4794c942654414a8b0ac6033f7 |
|
BLAKE2b-256 | 561b6c15e82dfeb906a7a4d20cacdad0efddd893ca8d41dce277eefd7238c55c |
Hashes for bytewax-0.19.0-cp38-cp38-macosx_10_12_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 81f8bf2d661a8e388f0734ac65cf99d2d3cc7481e2a069f31672e49168d3b18a |
|
MD5 | 44082a1fc1fc3cceb0d2d44eea66b708 |
|
BLAKE2b-256 | b46a9983f3a3a64dde778c5660918a15f072dcddc9d26598b2d4a80809a7e39c |