Skip to main content

No project description provided

Project description

Actions Status PyPI Bytewax User Guide

Bytewax

Python Stateful Stream Processing Framework

Bytewax is a Python framework that simplifies event and stream processing. Because Bytewax couples the stream and event processing capabilities of Flink, Spark, and Kafka Streams with the friendly and familiar interface of Python, you can re-use the Python libraries you already know and love. Connect data sources, run stateful transformations and write to various different downstream systems with built-in connectors or existing Python libraries.

Screen Shot 2022-10-07 at 2 22 49 PM

How it all works

Bytewax is a Python framework and Rust distributed processing engine that uses a dataflow computational model to provide parallelizable stream processing and event processing capabilities similar to Flink, Spark, and Kafka Streams. You can use Bytewax for a variety of workloads from moving data à la Kafka Connect style all the way to advanced online machine learning workloads. Bytewax is not limited to streaming applications but excels anywhere that data can be distributed at the input and output.

Bytewax has an accompanying command line interface, waxctl, which supports the deployment of dataflows on cloud vms or kuberentes. You can download it here.


Getting Started with Bytewax

pip install bytewax

Install waxctl

A Bytewax dataflow is Python code that will represent an input, a series of processing steps, and an output. The inputs could range from a Kafka stream to a WebSocket and the outputs could vary from a data lake to a key-value store.

from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig 
from bytewax.outputs import ManualOutputConfig
# Bytewax has input and output helpers for common input and output data sources
# but you can also create your own with the ManualOutputConfig.

At a high-level, the dataflow compute model is one in which a program execution is conceptualized as data flowing through a series of operator-based steps. Operators like map and filter are the processing primitives of Bytewax. Each of them gives you a “shape” of data transformation, and you give them regular Python functions to customize them to a specific task you need. See the documentation for a list of the available operators

import json
def deserialize(key_bytes__payload_bytes):
    key_bytes, payload_bytes = key_bytes__payload_bytes
    key = json.loads(key_bytes) if key_bytes else None
    event_data = json.loads(payload_bytes) if payload_bytes else None
    return event_data["user_id"], event_data

def anonymize_email(user_id__event_data):
    user_id, event_data = user_id__event_data
    event_data["email"] = "@".join(["******", event_data["email"].split("@")[-1]])
    return user_id, event_data

def remove_bytewax(user_id__event_data):
    user_id, event_data = user_id__event_data
    return "bytewax" not in event_data["email"]

flow = Dataflow()
flow.input("inp", KafkaInputConfig(brokers=["localhost:9092"], topic="web_events"))
flow.map(deserialize)
flow.map(anonymize_email)
flow.filter(remove_bytewax)

Bytewax is a stateful stream processing framework, which means that some operations remember information across multiple events. Windows and aggregations are also stateful, and can be reconstructed in the event of failure. Bytewax can be configured with different state recovery mechanisms to durably persist state in order to recover from failure.

There are multiple stateful operators available like reduce, stateful_map and fold_window. The complete list can be found in the API documentation for all operators. Below we use the fold_window operator with a tumbling window based on system time to gather events and calculate the number of times events have occurred on a per-user basis.

import datetime
from collections import defaultdict

from bytewax.window import TumblingWindowConfig, SystemClockConfig

cc = SystemClockConfig()
wc = TumblingWindowConfig(length=datetime.timedelta(seconds=5))

def build():
    return defaultdict(lambda: 0)

def count_events(results, event):
    results[event["type"]] += 1
    return results

flow.fold_window("session_state_recovery", cc, wc, build, count_events)

Output mechanisms in Bytewax are managed in the capture operator. There are a number of helpers that allow you to easily connect and write to other systems (output docs). If there isn’t a helper built, it is easy to build a custom version, which we will do below. Similar the input, Bytewax output can be parallelized and the client connection will occur on the worker.

import json

import psycopg2

def output_builder(worker_index, worker_count):
    # create the connection at the worker level
    conn = psycopg2.connect("dbname=website user=bytewax")
    conn.set_session(autocommit=True)
    cur = conn.cursor()

    def write_to_postgres(user_id__user_data):
        user_id, user_data = user_id__user_data
        query_string = '''
                    INSERT INTO events (user_id, data)
                    VALUES (%s, %s)
                    ON CONFLICT (user_id)
                    DO
                        UPDATE SET data = %s;'''
        cur.execute(query_string, (user_id, json.dumps(user_data), json.dumps(user_data)))
    return write_to_postgres

flow.capture(ManualOutputConfig(output_builder))

Bytewax dataflows can be executed on a single host with multiple Python processes, or on multiple hosts. Below is an example of running bytewax across multiple hosts. When processing data in a distributed fashion, Bytewax will ensure that all items with the same key are routed to the same host.

if __name__ == "__main__":
    addresses = [
    "localhost:2101"
    ]

    cluster_main(
        flow,
        addresses=addresses,
        proc_id=0,
        worker_count_per_proc=2)

Deploying and Scaling

Bytewax can be run on a local machine or remote machine, just like a regular Python script.

python my_dataflow.py

It can also be run in a Docker container as described further in the documentation.

Kubernetes

The recommended way to run dataflows at scale is to leverage the kubernetes ecosystem. To help manage deployment, we built waxctl, which allows you to easily deploy dataflows that will run at huge scale across multiple compute nodes.

waxctl df deploy my_dataflow.py --name my-dataflow

Why Bytewax?

At a high level, Bytewax provides a few major benefits:

  • The operators in Bytewax are largely “data-parallel”, meaning they can operate on independent parts of the data concurrently.
  • Bytewax offers the ability to express higher-level control constructs, like iteration.
  • Bytewax allows you to develop and run your code locally, and then easily scale that code to multiple workers or processes without changes.
  • Bytewax can be used in both a streaming and batch context
  • Ability to leverage the Python ecosystem directly

Community

Slack Is the main forum for communication and discussion.

GitHub Issues is reserved only for actual issues. Please use the slack community for discussions.

Code of Conduct

Usage

Install the latest release with pip:

pip install bytewax

Building From Source

To build a specific branch, you will need to use Maturin and have Rust installed on your machine. Once those have been installed run

maturin develop -E dev

Important: If you are testing with a maturin built version from source, you should use maturin build --release since maturin develop will be slower.

More Examples

For a more complete example, and documentation on the available operators, check out the User Guide and the Examples

For an exhaustive list of examples, checkout the /examples folder

License

Bytewax is licensed under the Apache-2.0 license.

Contributing

Contributions are welcome! This community and project would not be what it is without the contributors. All contributions, from bug reports to new features, are welcome and encouraged. Please view the contribution guidelines before getting started.



With ❤️ Bytewax

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

bytewax-0.13.0-cp310-cp310-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.31+ x86-64

bytewax-0.13.0-cp310-cp310-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.10macOS 11.0+ ARM64

bytewax-0.13.0-cp310-cp310-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.10macOS 10.7+ x86-64

bytewax-0.13.0-cp39-cp39-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.9manylinux: glibc 2.31+ x86-64

bytewax-0.13.0-cp39-cp39-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.9macOS 11.0+ ARM64

bytewax-0.13.0-cp39-cp39-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.9macOS 10.7+ x86-64

bytewax-0.13.0-cp38-cp38-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.31+ x86-64

bytewax-0.13.0-cp38-cp38-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.8macOS 11.0+ ARM64

bytewax-0.13.0-cp38-cp38-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.8macOS 10.7+ x86-64

bytewax-0.13.0-cp37-cp37m-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.31+ x86-64

bytewax-0.13.0-cp37-cp37m-manylinux_2_27_x86_64.whl (8.2 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.27+ x86-64

bytewax-0.13.0-cp37-cp37m-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.7mmacOS 11.0+ ARM64

bytewax-0.13.0-cp37-cp37m-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.7mmacOS 10.7+ x86-64

File details

Details for the file bytewax-0.13.0-cp310-cp310-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp310-cp310-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 6e38ebc7ba3ab90f8bf5c3e4fe9e323145d609c772cf6915979c081f40050286
MD5 193e24c244d7bf806f04cf727206a748
BLAKE2b-256 670a7571dd654806a7b03adc2351e802b724dc62f755c5c63b273c9ecc644f88

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp310-cp310-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp310-cp310-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 9412bae5ec9b8042ecee46a0ae1606c8771b46f3afb9dc381dbf01f988a13c10
MD5 7875783260f6efc4f870d1c8aef528f0
BLAKE2b-256 078a1345c19f6cbfe94b31617b17cb00978c2d94073aa29a89d9eb8465fa9327

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp310-cp310-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp310-cp310-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 16f6cfb22969978db7340a591a08fe89134937ab5bf03d61ec17db8142c0a20e
MD5 3441ad6cb168a3d1857dc82b752c28bb
BLAKE2b-256 2b827dd3b52f93ea3d63890fd9f02c732e78c82aebc8ad12c0c5c214add70130

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp39-cp39-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp39-cp39-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 710f3f0dce3657e27b8b7cd515cc7f8e3e3e2954c32b593cac29e5f2dc31e6c8
MD5 08c806cd3b3bb247238cd25fc77a752c
BLAKE2b-256 64414e4ef1f311ea2e24b1c4c21bfdcd37774d366c5e702ad627418c10045b33

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp39-cp39-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp39-cp39-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 37c14aea23d2986f84098962f51713f6fe3e6ff129a6929c911f828fe286117c
MD5 cd54633c3793b78c9dc61ebc0fc21da1
BLAKE2b-256 81919116c2d6bfc51c661a0baa5cc424742ecf3cf8f5a48f73d2fc37ab8950aa

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp39-cp39-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp39-cp39-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 a5a0957c74c9c4cdb25ba17588751287b36da9c39133d8eb456a5a4969e7a61a
MD5 d28cb355949e1683e71f695544e233cc
BLAKE2b-256 8a276e49a8c94663264e247f5dde7a6bf600c581adb7b0ceba8bee934b938045

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp38-cp38-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp38-cp38-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 2642ca95beeec56969e52049cb31113eb6c547221d5a0e425c45ca51803af1c5
MD5 37d91312dcc913b2bdb00acdc8e97690
BLAKE2b-256 4bde4d0b22059a3158024d447ca99d71dadf8ea0f3b52c1652fe0528ea241e6e

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp38-cp38-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp38-cp38-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 c06bfdd587fff9690ca2b749701c57ebb9d87d988230e5faa3b89c8b07f3c984
MD5 2b6c3af2640e9163646bdbee5b538bac
BLAKE2b-256 8d1f2d4ff3079174a28dc9cc9b9b2d8884de00373280fdfd01d2f94853e4d04a

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp38-cp38-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp38-cp38-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 63b9690d836cf9d42ea8c7fc09bebaaa230b7f16986ade59b06dbf9ae88c05d9
MD5 5db01031205e64d811c79c6619d14863
BLAKE2b-256 1284c47d31beb7dc38c9b2a60b93bd8069a9d927509ba6ff46512cd2acd6bc62

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp37-cp37m-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp37-cp37m-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 77dd0afb35eb4370c26e7e1ce92d6632b8aa1a29d69fc3ed7ab9d2729d46e76f
MD5 97dcab6b744621de4e69ac7cdffbdf4c
BLAKE2b-256 040c79560280528db5e730c84f61b63bf428c1b48aec4b60c8c6ea429604e5f7

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp37-cp37m-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp37-cp37m-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 3cb2057a02d1d9b50459901a8ee78a11c64198547fa936782a37e3917f0e3f13
MD5 5c911f829b25a2d45f58c7d1b054af30
BLAKE2b-256 1ff8495d4360a0fb1c6fa789dd17aa3399cf10625b619272c2f2b8aa86128e21

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp37-cp37m-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp37-cp37m-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 c65a944dfbc7efd0913d904264dd5bb1cc9055823670113411591909f9d4abe6
MD5 11a58ec4045a3517387883f18bf27bf5
BLAKE2b-256 a965d87b903967be6a555c4c5a0b00b3d678367e356d879f9c5cc832f70a49b3

See more details on using hashes here.

File details

Details for the file bytewax-0.13.0-cp37-cp37m-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.0-cp37-cp37m-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 d49ded323ac58f02c5f90b13e180337309c2da3b1d084d5f63391e30fb9ce1e1
MD5 e009eb5a6d4ff73268411295f5c8b0ca
BLAKE2b-256 0ff59c9bb272c74d19efc9df0439280692fd888fa9bc1a61de039f35f54d19f3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page