Skip to main content

No project description provided

Project description

Actions Status PyPI Bytewax User Guide

Bytewax

Python Stateful Stream Processing Framework

Bytewax is a Python framework that simplifies event and stream processing. Because Bytewax couples the stream and event processing capabilities of Flink, Spark, and Kafka Streams with the friendly and familiar interface of Python, you can re-use the Python libraries you already know and love. Connect data sources, run stateful transformations and write to various different downstream systems with built-in connectors or existing Python libraries.

Screen Shot 2022-10-07 at 2 22 49 PM

How it all works

Bytewax is a Python framework and Rust distributed processing engine that uses a dataflow computational model to provide parallelizable stream processing and event processing capabilities similar to Flink, Spark, and Kafka Streams. You can use Bytewax for a variety of workloads from moving data à la Kafka Connect style all the way to advanced online machine learning workloads. Bytewax is not limited to streaming applications but excels anywhere that data can be distributed at the input and output.

Bytewax has an accompanying command line interface, waxctl, which supports the deployment of dataflows on cloud vms or kuberentes. You can download it here.


Getting Started with Bytewax

pip install bytewax

Install waxctl

A Bytewax dataflow is Python code that will represent an input, a series of processing steps, and an output. The inputs could range from a Kafka stream to a WebSocket and the outputs could vary from a data lake to a key-value store.

from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig 
from bytewax.outputs import ManualOutputConfig
# Bytewax has input and output helpers for common input and output data sources
# but you can also create your own with the ManualOutputConfig.

At a high-level, the dataflow compute model is one in which a program execution is conceptualized as data flowing through a series of operator-based steps. Operators like map and filter are the processing primitives of Bytewax. Each of them gives you a “shape” of data transformation, and you give them regular Python functions to customize them to a specific task you need. See the documentation for a list of the available operators

import json
def deserialize(key_bytes__payload_bytes):
    key_bytes, payload_bytes = key_bytes__payload_bytes
    key = json.loads(key_bytes) if key_bytes else None
    event_data = json.loads(payload_bytes) if payload_bytes else None
    return event_data["user_id"], event_data

def anonymize_email(user_id__event_data):
    user_id, event_data = user_id__event_data
    event_data["email"] = "@".join(["******", event_data["email"].split("@")[-1]])
    return user_id, event_data

def remove_bytewax(user_id__event_data):
    user_id, event_data = user_id__event_data
    return "bytewax" not in event_data["email"]

flow = Dataflow()
flow.input("inp", KafkaInputConfig(brokers=["localhost:9092"], topic="web_events"))
flow.map(deserialize)
flow.map(anonymize_email)
flow.filter(remove_bytewax)

Bytewax is a stateful stream processing framework, which means that some operations remember information across multiple events. Windows and aggregations are also stateful, and can be reconstructed in the event of failure. Bytewax can be configured with different state recovery mechanisms to durably persist state in order to recover from failure.

There are multiple stateful operators available like reduce, stateful_map and fold_window. The complete list can be found in the API documentation for all operators. Below we use the fold_window operator with a tumbling window based on system time to gather events and calculate the number of times events have occurred on a per-user basis.

import datetime
from collections import defaultdict

from bytewax.window import TumblingWindowConfig, SystemClockConfig

cc = SystemClockConfig()
wc = TumblingWindowConfig(length=datetime.timedelta(seconds=5))

def build():
    return defaultdict(lambda: 0)

def count_events(results, event):
    results[event["type"]] += 1
    return results

flow.fold_window("session_state_recovery", cc, wc, build, count_events)

Output mechanisms in Bytewax are managed in the capture operator. There are a number of helpers that allow you to easily connect and write to other systems (output docs). If there isn’t a helper built, it is easy to build a custom version, which we will do below. Similar the input, Bytewax output can be parallelized and the client connection will occur on the worker.

import json

import psycopg2

def output_builder(worker_index, worker_count):
    # create the connection at the worker level
    conn = psycopg2.connect("dbname=website user=bytewax")
    conn.set_session(autocommit=True)
    cur = conn.cursor()

    def write_to_postgres(user_id__user_data):
        user_id, user_data = user_id__user_data
        query_string = '''
                    INSERT INTO events (user_id, data)
                    VALUES (%s, %s)
                    ON CONFLICT (user_id)
                    DO
                        UPDATE SET data = %s;'''
        cur.execute(query_string, (user_id, json.dumps(user_data), json.dumps(user_data)))
    return write_to_postgres

flow.capture(ManualOutputConfig(output_builder))

Bytewax dataflows can be executed on a single host with multiple Python processes, or on multiple hosts. Below is an example of running bytewax across multiple hosts. When processing data in a distributed fashion, Bytewax will ensure that all items with the same key are routed to the same host.

if __name__ == "__main__":
    addresses = [
    "localhost:2101"
    ]

    cluster_main(
        flow,
        addresses=addresses,
        proc_id=0,
        worker_count_per_proc=2)

Deploying and Scaling

Bytewax can be run on a local machine or remote machine, just like a regular Python script.

python my_dataflow.py

It can also be run in a Docker container as described further in the documentation.

Kubernetes

The recommended way to run dataflows at scale is to leverage the kubernetes ecosystem. To help manage deployment, we built waxctl, which allows you to easily deploy dataflows that will run at huge scale across multiple compute nodes.

waxctl df deploy my_dataflow.py --name my-dataflow

Why Bytewax?

At a high level, Bytewax provides a few major benefits:

  • The operators in Bytewax are largely “data-parallel”, meaning they can operate on independent parts of the data concurrently.
  • Bytewax offers the ability to express higher-level control constructs, like iteration.
  • Bytewax allows you to develop and run your code locally, and then easily scale that code to multiple workers or processes without changes.
  • Bytewax can be used in both a streaming and batch context
  • Ability to leverage the Python ecosystem directly

Community

Slack Is the main forum for communication and discussion.

GitHub Issues is reserved only for actual issues. Please use the slack community for discussions.

Code of Conduct

Usage

Install the latest release with pip:

pip install bytewax

Building From Source

To build a specific branch, you will need to use Maturin and have Rust installed on your machine. Once those have been installed run

maturin develop -E dev

Important: If you are testing with a maturin built version from source, you should use maturin build --release since maturin develop will be slower.

More Examples

For a more complete example, and documentation on the available operators, check out the User Guide and the Examples

For an exhaustive list of examples, checkout the /examples folder

License

Bytewax is licensed under the Apache-2.0 license.

Contributing

Contributions are welcome! This community and project would not be what it is without the contributors. All contributions, from bug reports to new features, are welcome and encouraged. Please view the contribution guidelines before getting started.



With ❤️ Bytewax

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

bytewax-0.13.1-cp310-cp310-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.31+ x86-64

bytewax-0.13.1-cp310-cp310-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.10macOS 11.0+ ARM64

bytewax-0.13.1-cp310-cp310-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.10macOS 10.7+ x86-64

bytewax-0.13.1-cp39-cp39-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.9manylinux: glibc 2.31+ x86-64

bytewax-0.13.1-cp39-cp39-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.9macOS 11.0+ ARM64

bytewax-0.13.1-cp39-cp39-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.9macOS 10.7+ x86-64

bytewax-0.13.1-cp38-cp38-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.31+ x86-64

bytewax-0.13.1-cp38-cp38-manylinux_2_27_x86_64.whl (8.7 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.27+ x86-64

bytewax-0.13.1-cp38-cp38-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.8macOS 11.0+ ARM64

bytewax-0.13.1-cp38-cp38-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.8macOS 10.7+ x86-64

bytewax-0.13.1-cp37-cp37m-manylinux_2_31_x86_64.whl (8.3 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.31+ x86-64

bytewax-0.13.1-cp37-cp37m-manylinux_2_27_x86_64.whl (8.2 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.27+ x86-64

bytewax-0.13.1-cp37-cp37m-macosx_11_0_arm64.whl (6.0 MB view details)

Uploaded CPython 3.7mmacOS 11.0+ ARM64

bytewax-0.13.1-cp37-cp37m-macosx_10_7_x86_64.whl (6.5 MB view details)

Uploaded CPython 3.7mmacOS 10.7+ x86-64

File details

Details for the file bytewax-0.13.1-cp310-cp310-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp310-cp310-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 92627d42263fba3f88201f77b523ddb461ba683ef8d4b0125c19a6e59e82afaa
MD5 ac4d233c5d1ee12c38893d7f48e7a86c
BLAKE2b-256 37e0401f62cc250f23ed5381fac8c26ae1014888af4bbedb567c44781a02f837

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp310-cp310-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp310-cp310-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 97feb3ad271a5ac2f1ca4af67e83b8b69e5460fd34f2c84047d1da2021cf4845
MD5 ff34e920cec7aafc6a2b824a014defd7
BLAKE2b-256 21fb536cf2c5b3252c81e6cf720dfab3e5d361f9ebeaceb44ceffca130d2ed8b

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp310-cp310-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp310-cp310-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 16de500cf200a6b7a78e1d4d62ed2a2dfd171c7662c1dfad9da7ef8225285a0c
MD5 8172f87de8c353ac3f0ea4e2afe98bf8
BLAKE2b-256 0465d947ca5b3635daad750ac317932faf9f09696dac78465637e969004f5714

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp39-cp39-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp39-cp39-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 4d73f83bacdf0a7dfcd63c87ce042d81cb235a52317691571176825da7c50fb3
MD5 a6bc48f35667e9c20123323d14524d9f
BLAKE2b-256 e1078b540c86c65d3aae7efa262c40c5925e3c86df3b70550d2e8bd712f5efb9

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp39-cp39-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp39-cp39-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 abf5128cbbad22cac58b220e9e49ae9891b26f639a748c6356c3265ed4d23e36
MD5 da658224b90a43dbb0213569a8d590e5
BLAKE2b-256 7934781bcdaeb2e99131545d6885e0dac34fd8e10c6181cf769bd0147b3516de

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp39-cp39-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp39-cp39-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 c7ae306e5b11cd6ab17637166dacaab441f5d4445f64a26ce1a3b6eb27e14944
MD5 ca84a6a5fb9d6d10d9eccde5d1126884
BLAKE2b-256 1695c05b62557b78cb82b74188e0e27241bd175f6121736858ccfb216b00c952

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp38-cp38-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp38-cp38-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 82ba7120cb6ba57b13036374530b66b5c10c370e53a4402c73703f55d4480344
MD5 94de4189ae4b2d22ffc0415230c9e9a3
BLAKE2b-256 87fcc450cbd4a0dec4e02d1e05c1eaf46e2406803279111b4c2be9fc3b04ca8a

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp38-cp38-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp38-cp38-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 3081ad0f6e819e9e0bbdda7ff486d364b773c368f0e6bd7ed3b3b829ed9a4722
MD5 eff7353581014c91c6212ec326ceedcc
BLAKE2b-256 b7b6dac80a52b49d35b826d0da8823be4881f02d9df5f255272a8f59ca9f4eaf

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp38-cp38-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp38-cp38-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 6df7a343bde979d8bd63c9263cd1829193d3f2666b713bf2c0971f27e9d19b0e
MD5 c5a243984faff2878cae761b4607d7ad
BLAKE2b-256 11354984a6cc69a3e28f6c43008895224feafca63a2900449eff553c88ad2ed3

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp38-cp38-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp38-cp38-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 02efde4a4994afd05fa0716141fc3c4ed68d11dd9012846db04a6bd71606d0fa
MD5 5be1490e3d27c87e27fc028a9db0fded
BLAKE2b-256 7bcd4eb21becc52dabb5417fecf32ab9810e4add85f04ba3ed55ac6d654ba116

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp37-cp37m-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp37-cp37m-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 802c2ab0ab37ee9927e2368f8e576a8419d6cce09743ce1aed464776fda368a3
MD5 141c22f8179e98cca496e491c79e1796
BLAKE2b-256 fc5faa5325138cec933c3006c0c96d0f73722b5d4f3b7b91a23d0e13ce2d4f4d

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp37-cp37m-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp37-cp37m-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 e0dca208551076a6d773403f2a40ce010e636853e6408c9a6d6f712444bf9d24
MD5 84ff07b36e4651251dff101b6dd0cfbc
BLAKE2b-256 cb789194fb0e1cd0cd9dc9e8e1cbe4f2d2a30fee1bee6dd4c9cdd244e07397f2

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp37-cp37m-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp37-cp37m-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 33c1b947c96666c436cc9f68c06fe7333833839a015780983b8736f4617d05e2
MD5 706d23e9c07dac8482951ee550055929
BLAKE2b-256 ee46005b8b170f81f4f4b3af59007f17efc3ac1b7453efca94030197313e64c6

See more details on using hashes here.

File details

Details for the file bytewax-0.13.1-cp37-cp37m-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.13.1-cp37-cp37m-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 f8ccec167dc4b1fdcdc3314814dd89b26dc253522490d22030fef58381e5b1f5
MD5 8089739d7c6f3dce6bd88d12e1d15f93
BLAKE2b-256 6e68c26c2a02ab6e0abe47afdfdd6070ec2e0b50887c7c2965b8ce58eb746161

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page