Skip to main content

No project description provided

Project description

Actions Status PyPI Bytewax User Guide

Bytewax

Python Stateful Stream Processing Framework

Bytewax is a Python framework that simplifies event and stream processing. Because Bytewax couples the stream and event processing capabilities of Flink, Spark, and Kafka Streams with the friendly and familiar interface of Python, you can re-use the Python libraries you already know and love. Connect data sources, run stateful transformations and write to various different downstream systems with built-in connectors or existing Python libraries.

Screen Shot 2022-10-07 at 2 22 49 PM

How it all works

Bytewax is a Python framework and Rust distributed processing engine that uses a dataflow computational model to provide parallelizable stream processing and event processing capabilities similar to Flink, Spark, and Kafka Streams. You can use Bytewax for a variety of workloads from moving data à la Kafka Connect style all the way to advanced online machine learning workloads. Bytewax is not limited to streaming applications but excels anywhere that data can be distributed at the input and output.

Bytewax has an accompanying command line interface, waxctl, which supports the deployment of dataflows on cloud vms or kuberentes. You can download it here.


Getting Started with Bytewax

pip install bytewax

Install waxctl

A Bytewax dataflow is Python code that will represent an input, a series of processing steps, and an output. The inputs could range from a Kafka stream to a WebSocket and the outputs could vary from a data lake to a key-value store.

from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig
from bytewax.outputs import ManualOutputConfig

# Bytewax has input and output helpers for common input and output data sources
# but you can also create your own with the ManualOutputConfig.

At a high-level, the dataflow compute model is one in which a program execution is conceptualized as data flowing through a series of operator-based steps. Operators like map and filter are the processing primitives of Bytewax. Each of them gives you a “shape” of data transformation, and you give them regular Python functions to customize them to a specific task you need. See the documentation for a list of the available operators

import json


def deserialize(key_bytes__payload_bytes):
    key_bytes, payload_bytes = key_bytes__payload_bytes
    key = json.loads(key_bytes) if key_bytes else None
    event_data = json.loads(payload_bytes) if payload_bytes else None
    return event_data["user_id"], event_data


def anonymize_email(user_id__event_data):
    user_id, event_data = user_id__event_data
    event_data["email"] = "@".join(["******", event_data["email"].split("@")[-1]])
    return user_id, event_data


def remove_bytewax(user_id__event_data):
    user_id, event_data = user_id__event_data
    return "bytewax" not in event_data["email"]


flow = Dataflow()
flow.input("inp", KafkaInputConfig(brokers=["localhost:9092"], topic="web_events"))
flow.map(deserialize)
flow.map(anonymize_email)
flow.filter(remove_bytewax)

Bytewax is a stateful stream processing framework, which means that some operations remember information across multiple events. Windows and aggregations are also stateful, and can be reconstructed in the event of failure. Bytewax can be configured with different state recovery mechanisms to durably persist state in order to recover from failure.

There are multiple stateful operators available like reduce, stateful_map and fold_window. The complete list can be found in the API documentation for all operators. Below we use the fold_window operator with a tumbling window based on system time to gather events and calculate the number of times events have occurred on a per-user basis.

import datetime
from collections import defaultdict

from bytewax.window import TumblingWindow, SystemClockConfig

cc = SystemClockConfig()
wc = TumblingWindow(length=datetime.timedelta(seconds=5))


def build():
    return defaultdict(lambda: 0)


def count_events(results, event):
    results[event["type"]] += 1
    return results


flow.fold_window("session_state_recovery", cc, wc, build, count_events)

Output mechanisms in Bytewax are managed in the capture operator. There are a number of helpers that allow you to easily connect and write to other systems (output docs). If there isn’t a helper built, it is easy to build a custom version, which we will do below. Similar the input, Bytewax output can be parallelized and the client connection will occur on the worker.

import json

import psycopg2


def output_builder(worker_index, worker_count):
    # create the connection at the worker level
    conn = psycopg2.connect("dbname=website user=bytewax")
    conn.set_session(autocommit=True)
    cur = conn.cursor()

    def write_to_postgres(user_id__user_data):
        user_id, user_data = user_id__user_data
        query_string = """
                    INSERT INTO events (user_id, data)
                    VALUES (%s, %s)
                    ON CONFLICT (user_id)
                    DO
                        UPDATE SET data = %s;"""
        cur.execute(
            query_string, (user_id, json.dumps(user_data), json.dumps(user_data))
        )

    return write_to_postgres


flow.capture(ManualOutputConfig(output_builder))

Bytewax dataflows can be executed on a single host with multiple Python processes, or on multiple hosts. Below is an example of running bytewax across multiple hosts. When processing data in a distributed fashion, Bytewax will ensure that all items with the same key are routed to the same host.

if __name__ == "__main__":
    addresses = ["localhost:2101"]

    cluster_main(flow, addresses=addresses, proc_id=0, worker_count_per_proc=2)

Deploying and Scaling

Bytewax can be run on a local machine or remote machine, just like a regular Python script.

python my_dataflow.py

It can also be run in a Docker container as described further in the documentation.

Kubernetes

The recommended way to run dataflows at scale is to leverage the kubernetes ecosystem. To help manage deployment, we built waxctl, which allows you to easily deploy dataflows that will run at huge scale across multiple compute nodes.

waxctl df deploy my_dataflow.py --name my-dataflow

Why Bytewax?

At a high level, Bytewax provides a few major benefits:

  • The operators in Bytewax are largely “data-parallel”, meaning they can operate on independent parts of the data concurrently.
  • Bytewax offers the ability to express higher-level control constructs, like iteration.
  • Bytewax allows you to develop and run your code locally, and then easily scale that code to multiple workers or processes without changes.
  • Bytewax can be used in both a streaming and batch context
  • Ability to leverage the Python ecosystem directly

Community

Slack Is the main forum for communication and discussion.

GitHub Issues is reserved only for actual issues. Please use the slack community for discussions.

Code of Conduct

Usage

Install the latest release with pip:

pip install bytewax

Building From Source

To build a specific branch, you will need to use Maturin and have Rust installed on your machine. Once those have been installed run

maturin develop -E dev

Important: If you are testing with a maturin built version from source, you should use maturin build --release since maturin develop will be slower.

More Examples

For a more complete example, and documentation on the available operators, check out the User Guide and the Examples

For an exhaustive list of examples, checkout the /examples folder

License

Bytewax is licensed under the Apache-2.0 license.

Contributing

Contributions are welcome! This community and project would not be what it is without the contributors. All contributions, from bug reports to new features, are welcome and encouraged. Please view the contribution guidelines before getting started.



With ❤️ Bytewax

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

bytewax-0.16.0-cp310-cp310-manylinux_2_31_x86_64.whl (9.2 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.31+ x86-64

bytewax-0.16.0-cp310-cp310-manylinux_2_27_x86_64.whl (9.1 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.27+ x86-64

bytewax-0.16.0-cp310-cp310-macosx_11_0_arm64.whl (6.9 MB view details)

Uploaded CPython 3.10macOS 11.0+ ARM64

bytewax-0.16.0-cp310-cp310-macosx_10_7_x86_64.whl (7.2 MB view details)

Uploaded CPython 3.10macOS 10.7+ x86-64

bytewax-0.16.0-cp39-cp39-manylinux_2_31_x86_64.whl (9.2 MB view details)

Uploaded CPython 3.9manylinux: glibc 2.31+ x86-64

bytewax-0.16.0-cp39-cp39-manylinux_2_27_x86_64.whl (9.1 MB view details)

Uploaded CPython 3.9manylinux: glibc 2.27+ x86-64

bytewax-0.16.0-cp39-cp39-macosx_11_0_arm64.whl (6.9 MB view details)

Uploaded CPython 3.9macOS 11.0+ ARM64

bytewax-0.16.0-cp39-cp39-macosx_10_7_x86_64.whl (7.2 MB view details)

Uploaded CPython 3.9macOS 10.7+ x86-64

bytewax-0.16.0-cp38-cp38-manylinux_2_31_x86_64.whl (9.2 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.31+ x86-64

bytewax-0.16.0-cp38-cp38-manylinux_2_27_x86_64.whl (9.1 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.27+ x86-64

bytewax-0.16.0-cp38-cp38-macosx_11_0_arm64.whl (6.9 MB view details)

Uploaded CPython 3.8macOS 11.0+ ARM64

bytewax-0.16.0-cp38-cp38-macosx_10_7_x86_64.whl (7.2 MB view details)

Uploaded CPython 3.8macOS 10.7+ x86-64

bytewax-0.16.0-cp37-cp37m-manylinux_2_31_x86_64.whl (9.2 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.31+ x86-64

bytewax-0.16.0-cp37-cp37m-manylinux_2_27_x86_64.whl (9.1 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.27+ x86-64

bytewax-0.16.0-cp37-cp37m-macosx_11_0_arm64.whl (6.9 MB view details)

Uploaded CPython 3.7mmacOS 11.0+ ARM64

bytewax-0.16.0-cp37-cp37m-macosx_10_7_x86_64.whl (7.2 MB view details)

Uploaded CPython 3.7mmacOS 10.7+ x86-64

File details

Details for the file bytewax-0.16.0-cp310-cp310-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp310-cp310-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 c59f2276b228dd1a6729538fa4e56f8c26d579473c92049b6524209c675c8893
MD5 ed1e69e5139c3afc4c02ed26630be139
BLAKE2b-256 9215d417baf670b4198b922aab51f8630e3262e106c797bcf40192bef52a137a

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp310-cp310-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp310-cp310-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 80b6ee41ca47f1535511e3f7280589447505c19e90b9041cd66283803ffb2f2b
MD5 bede7b05d876edefc5676c335e9dd64f
BLAKE2b-256 1c81c5645a06a8a3c3d6f3c443cf95f19020060de93134eb66996820784865b4

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp310-cp310-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp310-cp310-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 86ce6360e939a73b110a0d18f017909f66b0229db2e456f5bad56f571c1d4ae2
MD5 9d50a38907cc5b5b26ef1a9694763323
BLAKE2b-256 c3a3912c423ed9b4412a7f77e95ea5b355ac528d3318804bb6991281919c5e4b

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp310-cp310-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp310-cp310-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 171b3969c5b9e657824174efa6a32e74e7f882eda984dc5e0cd0e78ce75cdddc
MD5 a84c3ec115eb7d694f263477ad40551f
BLAKE2b-256 da875e2406938852678a74886db12621b7d0255236263eab219170edf077bfc3

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp39-cp39-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp39-cp39-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 0b067f0213716cf183d401970e2c0e4950fed24765c024c243827968fe392949
MD5 53f9a3b9b6e04f3a8943950b07fdfbc4
BLAKE2b-256 6df91c50bc02a3f97e1964928e466e78e37fe2a844925796dd6a480e4e554f18

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp39-cp39-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp39-cp39-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 f38284a00bb9ac62f59fec79642ea2c3101394705e64d25300ca414f34a791ea
MD5 4d4b95e709e51047cd0fdff636dac229
BLAKE2b-256 429f95e670115ea505b5065ec3c7510ca7501833c0f51454081304d002f4e2ac

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp39-cp39-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp39-cp39-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 c6d8ae20195c25f2b895fdb31013535bf3c9155101d77af8386b9769e9283d29
MD5 f2de05059f4b1dc2a8ba8205f57143c7
BLAKE2b-256 254ecf8c94786893809cdffb7866ea88e9cba16fbf50a54fb9606422c3c0e1ee

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp39-cp39-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp39-cp39-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 cfdd117a46a56e12bfb87d6130885b29a865e85ed1d22c6ce9a418e06d778b11
MD5 f316bbcb1f5b9e86e441c1a0ec4f6534
BLAKE2b-256 c60db40da679ce75ef6defda2da9ff3d20083f4076417a4ca63d610dca0e61a8

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp38-cp38-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp38-cp38-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 d7cc84777ee59b7b7de037084104c20dbc675c6d0b45c54c0c47ae4195b629cd
MD5 3b745e93e3819d121ca8a59fb50ba94e
BLAKE2b-256 267aabed46ec651e2f5d84f210e456a08f5baa21234dfbfda4957947097342b2

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp38-cp38-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp38-cp38-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 8516c0555d6b021e1d33cb718e9d5d7f23f6910b6bab1a151e4516967829672e
MD5 12250dedcc5ce7557d435d516bc3bff7
BLAKE2b-256 c3b6615c0d86ab4eaa69facc0f94ad322134f3acbdbf38cfac6c2e78533409c8

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp38-cp38-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp38-cp38-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 7fc0948cc3f07c22e9809411458d47aca387f0b186b4e7f2b9aee5d466aa74c3
MD5 c844656b1157e8a7f7731f663a503677
BLAKE2b-256 52db34037c89a2401ac6850500dbeff6d8f3a8852ed6bc3e277237c9c63ecd89

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp38-cp38-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp38-cp38-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 4a6297090a72a764557eb99e5b4fd104e68c4508e38df1ac1fd91c31a0772f3a
MD5 d573de0f0f409ab9a382c2e30afade77
BLAKE2b-256 2cb4bdd995c4acf04cbb576979cbaca051959f0b171238d09be21867a03a75e8

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp37-cp37m-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp37-cp37m-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 268d99dd512c5ca80fc420b3ee8ceb54627e54de40edde77b96a83cf23e8e740
MD5 61434836a6f18560109b6c91ea997109
BLAKE2b-256 41ad97999b5a9490f8a1572e43761230eab8aa5b4a203d8b09f519c82f09c29b

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp37-cp37m-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp37-cp37m-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 2a6e6a8fa5014737cc5f0b9d277b269a8d5e8cc9e235df2f0aa5ba69ece3693c
MD5 519e5844794e8add9ad4c4cc87b67b3d
BLAKE2b-256 45752e6e44777e0bbf1e6ebbd5e64c2a6393dd5f668133922625ef9ef7b0d509

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp37-cp37m-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp37-cp37m-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 8038e309f9e8d7fd41bb67ce8c40d34c4c596e29063c128821498356a090e7ff
MD5 60663d6db8459dbf17a4bcc92d4669cc
BLAKE2b-256 b0df83d31ef7e950b36877a200934f46f3be4fa9dbf429d1c9c5d049857047fa

See more details on using hashes here.

File details

Details for the file bytewax-0.16.0-cp37-cp37m-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.16.0-cp37-cp37m-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 a0c0e1c5b5821fc21698ac55a9a1b20fdb08d019ca6a337a573e6df847fd3b84
MD5 5a56e3ae5ced47ee90c3aa68136110d7
BLAKE2b-256 be1956c6b638a2de9ebc0562c5b069f3070a372119ca9b91e86058fa261bbd35

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page