Skip to main content

No project description provided

Project description

Actions Status PyPI Bytewax User Guide

Bytewax

Python Stateful Stream Processing Framework

Bytewax is a Python framework that simplifies event and stream processing. Because Bytewax couples the stream and event processing capabilities of Flink, Spark, and Kafka Streams with the friendly and familiar interface of Python, you can re-use the Python libraries you already know and love. Connect data sources, run stateful transformations and write to various different downstream systems with built-in connectors or existing Python libraries.

Screen Shot 2022-10-07 at 2 22 49 PM

How it all works

Bytewax is a Python framework and Rust distributed processing engine that uses a dataflow computational model to provide parallelizable stream processing and event processing capabilities similar to Flink, Spark, and Kafka Streams. You can use Bytewax for a variety of workloads from moving data à la Kafka Connect style all the way to advanced online machine learning workloads. Bytewax is not limited to streaming applications but excels anywhere that data can be distributed at the input and output.

Bytewax has an accompanying command line interface, waxctl, which supports the deployment of dataflows on cloud vms or kuberentes. You can download it here.


Getting Started with Bytewax

pip install bytewax

Install waxctl

A Bytewax dataflow is Python code that will represent an input, a series of processing steps, and an output. The inputs could range from a Kafka stream to a WebSocket and the outputs could vary from a data lake to a key-value store.

from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig
from bytewax.outputs import ManualOutputConfig

# Bytewax has input and output helpers for common input and output data sources
# but you can also create your own with the ManualOutputConfig.

At a high-level, the dataflow compute model is one in which a program execution is conceptualized as data flowing through a series of operator-based steps. Operators like map and filter are the processing primitives of Bytewax. Each of them gives you a “shape” of data transformation, and you give them regular Python functions to customize them to a specific task you need. See the documentation for a list of the available operators

import json


def deserialize(key_bytes__payload_bytes):
    key_bytes, payload_bytes = key_bytes__payload_bytes
    key = json.loads(key_bytes) if key_bytes else None
    event_data = json.loads(payload_bytes) if payload_bytes else None
    return event_data["user_id"], event_data


def anonymize_email(user_id__event_data):
    user_id, event_data = user_id__event_data
    event_data["email"] = "@".join(["******", event_data["email"].split("@")[-1]])
    return user_id, event_data


def remove_bytewax(user_id__event_data):
    user_id, event_data = user_id__event_data
    return "bytewax" not in event_data["email"]


flow = Dataflow()
flow.input("inp", KafkaInputConfig(brokers=["localhost:9092"], topic="web_events"))
flow.map(deserialize)
flow.map(anonymize_email)
flow.filter(remove_bytewax)

Bytewax is a stateful stream processing framework, which means that some operations remember information across multiple events. Windows and aggregations are also stateful, and can be reconstructed in the event of failure. Bytewax can be configured with different state recovery mechanisms to durably persist state in order to recover from failure.

There are multiple stateful operators available like reduce, stateful_map and fold_window. The complete list can be found in the API documentation for all operators. Below we use the fold_window operator with a tumbling window based on system time to gather events and calculate the number of times events have occurred on a per-user basis.

import datetime
from collections import defaultdict

from bytewax.window import TumblingWindowConfig, SystemClockConfig

cc = SystemClockConfig()
wc = TumblingWindowConfig(length=datetime.timedelta(seconds=5))


def build():
    return defaultdict(lambda: 0)


def count_events(results, event):
    results[event["type"]] += 1
    return results


flow.fold_window("session_state_recovery", cc, wc, build, count_events)

Output mechanisms in Bytewax are managed in the capture operator. There are a number of helpers that allow you to easily connect and write to other systems (output docs). If there isn’t a helper built, it is easy to build a custom version, which we will do below. Similar the input, Bytewax output can be parallelized and the client connection will occur on the worker.

import json

import psycopg2


def output_builder(worker_index, worker_count):
    # create the connection at the worker level
    conn = psycopg2.connect("dbname=website user=bytewax")
    conn.set_session(autocommit=True)
    cur = conn.cursor()

    def write_to_postgres(user_id__user_data):
        user_id, user_data = user_id__user_data
        query_string = """
                    INSERT INTO events (user_id, data)
                    VALUES (%s, %s)
                    ON CONFLICT (user_id)
                    DO
                        UPDATE SET data = %s;"""
        cur.execute(
            query_string, (user_id, json.dumps(user_data), json.dumps(user_data))
        )

    return write_to_postgres


flow.capture(ManualOutputConfig(output_builder))

Bytewax dataflows can be executed on a single host with multiple Python processes, or on multiple hosts. Below is an example of running bytewax across multiple hosts. When processing data in a distributed fashion, Bytewax will ensure that all items with the same key are routed to the same host.

if __name__ == "__main__":
    addresses = ["localhost:2101"]

    cluster_main(flow, addresses=addresses, proc_id=0, worker_count_per_proc=2)

Deploying and Scaling

Bytewax can be run on a local machine or remote machine, just like a regular Python script.

python my_dataflow.py

It can also be run in a Docker container as described further in the documentation.

Kubernetes

The recommended way to run dataflows at scale is to leverage the kubernetes ecosystem. To help manage deployment, we built waxctl, which allows you to easily deploy dataflows that will run at huge scale across multiple compute nodes.

waxctl df deploy my_dataflow.py --name my-dataflow

Why Bytewax?

At a high level, Bytewax provides a few major benefits:

  • The operators in Bytewax are largely “data-parallel”, meaning they can operate on independent parts of the data concurrently.
  • Bytewax offers the ability to express higher-level control constructs, like iteration.
  • Bytewax allows you to develop and run your code locally, and then easily scale that code to multiple workers or processes without changes.
  • Bytewax can be used in both a streaming and batch context
  • Ability to leverage the Python ecosystem directly

Community

Slack Is the main forum for communication and discussion.

GitHub Issues is reserved only for actual issues. Please use the slack community for discussions.

Code of Conduct

Usage

Install the latest release with pip:

pip install bytewax

Building From Source

To build a specific branch, you will need to use Maturin and have Rust installed on your machine. Once those have been installed run

maturin develop -E dev

Important: If you are testing with a maturin built version from source, you should use maturin build --release since maturin develop will be slower.

More Examples

For a more complete example, and documentation on the available operators, check out the User Guide and the Examples

For an exhaustive list of examples, checkout the /examples folder

License

Bytewax is licensed under the Apache-2.0 license.

Contributing

Contributions are welcome! This community and project would not be what it is without the contributors. All contributions, from bug reports to new features, are welcome and encouraged. Please view the contribution guidelines before getting started.



With ❤️ Bytewax

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

bytewax-0.15.0-cp310-cp310-manylinux_2_31_x86_64.whl (8.9 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.31+ x86-64

bytewax-0.15.0-cp310-cp310-manylinux_2_27_x86_64.whl (8.8 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.27+ x86-64

bytewax-0.15.0-cp310-cp310-macosx_11_0_arm64.whl (6.5 MB view details)

Uploaded CPython 3.10macOS 11.0+ ARM64

bytewax-0.15.0-cp310-cp310-macosx_10_7_x86_64.whl (7.0 MB view details)

Uploaded CPython 3.10macOS 10.7+ x86-64

bytewax-0.15.0-cp39-cp39-manylinux_2_31_x86_64.whl (8.9 MB view details)

Uploaded CPython 3.9manylinux: glibc 2.31+ x86-64

bytewax-0.15.0-cp39-cp39-manylinux_2_27_x86_64.whl (8.8 MB view details)

Uploaded CPython 3.9manylinux: glibc 2.27+ x86-64

bytewax-0.15.0-cp39-cp39-macosx_11_0_arm64.whl (6.5 MB view details)

Uploaded CPython 3.9macOS 11.0+ ARM64

bytewax-0.15.0-cp39-cp39-macosx_10_7_x86_64.whl (7.0 MB view details)

Uploaded CPython 3.9macOS 10.7+ x86-64

bytewax-0.15.0-cp38-cp38-manylinux_2_31_x86_64.whl (8.9 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.31+ x86-64

bytewax-0.15.0-cp38-cp38-manylinux_2_27_x86_64.whl (8.8 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.27+ x86-64

bytewax-0.15.0-cp38-cp38-macosx_11_0_arm64.whl (6.5 MB view details)

Uploaded CPython 3.8macOS 11.0+ ARM64

bytewax-0.15.0-cp38-cp38-macosx_10_7_x86_64.whl (7.0 MB view details)

Uploaded CPython 3.8macOS 10.7+ x86-64

bytewax-0.15.0-cp37-cp37m-manylinux_2_31_x86_64.whl (8.9 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.31+ x86-64

bytewax-0.15.0-cp37-cp37m-manylinux_2_27_x86_64.whl (8.8 MB view details)

Uploaded CPython 3.7mmanylinux: glibc 2.27+ x86-64

bytewax-0.15.0-cp37-cp37m-macosx_11_0_arm64.whl (6.5 MB view details)

Uploaded CPython 3.7mmacOS 11.0+ ARM64

bytewax-0.15.0-cp37-cp37m-macosx_10_7_x86_64.whl (7.0 MB view details)

Uploaded CPython 3.7mmacOS 10.7+ x86-64

File details

Details for the file bytewax-0.15.0-cp310-cp310-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp310-cp310-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 2ef8c46fdba8264cb55ae34f869ae68787ef14735a2722d2c4bd23e98cfbe35f
MD5 b92048968ee69639d9d71f250ec71096
BLAKE2b-256 463181d940d80a0ab998f5922e3814b19f584b3fa7bdc3c77c5d3343e247204c

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp310-cp310-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp310-cp310-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 71e36f7eb2aba90a4ce4d9f6dd5bd39a2ee30ea2b874c508de8dda33df6c56f0
MD5 e7d64c489b39833060b8446bf9b44d19
BLAKE2b-256 bbc9ab236e1c5e6a8d263ff9bba6e786ce000602c86d0a0c790061134c77afbf

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp310-cp310-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp310-cp310-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 024d2e29f9993c9aed9292f465699b8466533ac1d422b99190c7e54af0d18a45
MD5 ecf93656c354b0cef465b5c5b7698008
BLAKE2b-256 a08ef0a7ea565999047442c5b706d3fcbec18282e655df6901f5f1891c12567b

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp310-cp310-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp310-cp310-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 ed4ad3e6ebcf32f2d415540009c5279f8900b59393d40752896e3fcf97bd78d4
MD5 fc2397c7b7cf8ca177da26ca2771e69e
BLAKE2b-256 50abd275506576f6972c295141335c072282d9e061535226f176feff5177ce3f

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp39-cp39-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp39-cp39-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 7dab17e31a047909fe6f6610ab83f0b9f1ee741ca60e0f071e8e5a19d963fb95
MD5 4ef2d2939c8116c8bdea41cc02060097
BLAKE2b-256 3ed9b5833ecb573cfdf9bcefc10de613216726c470568edea4c298fc4d31f280

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp39-cp39-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp39-cp39-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 fa52dedbaedc93211bc985df5e721367be217b3087d4987bc3228f2be482995d
MD5 432815881734b4c358d60756fc587a10
BLAKE2b-256 226b066d354f4f75480ce6f02bd74fbf4f22ca1cf57bb1212067636d9690f149

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp39-cp39-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp39-cp39-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 16f639c9a559774c13fd2265dae6006fa3226ad62120bf68be43ee14ab4208ab
MD5 2f44873c3369918808c76fd6fa3e94b1
BLAKE2b-256 e9f836f71a11de3c639628e532620d333448396343fd68dcf2cee4b06175b529

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp39-cp39-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp39-cp39-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 24de5da19686daa27d2a872017364bf42579cb41ca0d648613850d0cebeac4cd
MD5 529a01144647df5702b29f951f07555b
BLAKE2b-256 89dba11129e76cef578fea7f8f0ecb9d5eab8db82c264e2e3aa5fb650d5a308e

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp38-cp38-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp38-cp38-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 868614218fce737739faf78077f9743976543b7c1b936f1ceff53a4f5670b8fd
MD5 d6830dc5dc876ac4d50d704b2ce66fe2
BLAKE2b-256 0dd2abcbb9ad87cfeb03e95c8750405520e005e9df4be0b5aa6bbf9bb4e75dec

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp38-cp38-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp38-cp38-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 591b072680f96f07322e8ee8ce43a8bc5290cddc019f1fd2e23ae6f8e3bf7a31
MD5 23acf52fc9649440644eb842590cdd75
BLAKE2b-256 e3b41d3bfa0432713539bd6657fc2e06a3adb307c010d623c7582b27921932f9

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp38-cp38-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp38-cp38-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 65b6a8051d197199add547a795cde4f7114586808bdefd3b4738eda555330bcb
MD5 1f5e59211f3a38332d89aafd8681d504
BLAKE2b-256 8a0fd39c5cfedccbbcc39685aa279c75cd2382af1c9a78ee39d7048e6c2de5fc

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp38-cp38-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp38-cp38-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 88617d991cdc9f34406c2d3ea9d571d301712e94f577c4e0af4014cfbf86f1ed
MD5 28dcb9e357e8b718a44b37386f0e1b95
BLAKE2b-256 a2e5685e4a52afefb8964fc23f7ea62b23956496e7f3a41bede9368a3228017e

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp37-cp37m-manylinux_2_31_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp37-cp37m-manylinux_2_31_x86_64.whl
Algorithm Hash digest
SHA256 6fc594e7ee042bdeaa8e9c1ca5dabbf94d67c4fad37f7dd41f9631a836a2f45b
MD5 5084248c87144448f63020d542e93c04
BLAKE2b-256 d0bc5b4e574ece23820a39bafb912821f937ed71b6edf326d692f220dc8e39a5

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp37-cp37m-manylinux_2_27_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp37-cp37m-manylinux_2_27_x86_64.whl
Algorithm Hash digest
SHA256 1ea5a00796140d0e5ac25c63490728a58ac9d300628f81bd35c2be00abed1fcb
MD5 20112f8e7810282abc85e0775ce29434
BLAKE2b-256 21f8674d1f3cff7c03dc6ac42ea7308ae9ebe98a08e36734e54893416ce8a9f3

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp37-cp37m-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp37-cp37m-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 ba0ab526ab3775bcc56919273460466b721b7aa3f41eff274f56d09148798d7c
MD5 ce7dd1b621d93dd97fda158d30126f91
BLAKE2b-256 e358c6f8dff0943d70dbe349d47619177fc0d663f1c9358f4719359b48989d09

See more details on using hashes here.

File details

Details for the file bytewax-0.15.0-cp37-cp37m-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for bytewax-0.15.0-cp37-cp37m-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 50c778318a27e06227d2a9d76de5bc98d13c1509cd3c16051ad698cad05bd72a
MD5 acc9b85f549cdf3165699ce26f7d8ab8
BLAKE2b-256 c8927a33168b961172fe173428d6429e0449c7f77ed0e4fa42af75d7087228fc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page