Skip to main content

Python Stream processing.

Reason this release was yanked:

bad usage of setuptools-scm

Project description

faust

Python Stream Processing Fork

python versions version codecov slack Code style: black pre-commit license downloads

Installation

pip install faust-streaming

Documentation

Why the fork

We have decided to fork the original Faust project because there is a critical process of releasing new versions which causes uncertainty in the community. Everybody is welcome to contribute to this fork, and you can be added as a maintainer.

We want to:

  • Ensure continues release
  • Code quality
  • Use of latest versions of kafka drivers (for now only aiokafka)
  • Support kafka transactions
  • Update the documentation

and more...

Usage

# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust

Faust is a stream processing library, porting the ideas from Kafka Streams to Python.

It is used at Robinhood to build high performance distributed systems and real-time data pipelines that process billions of events every day.

Faust provides both stream processing and event processing, sharing similarity with tools such as Kafka Streams, Apache Spark, Storm, Samza, Flink,

It does not use a DSL, it's just Python! This means you can use all your favorite Python libraries when stream processing: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++

Faust requires Python 3.6 or later for the new async/await_ syntax, and variable type annotations.

Here's an example processing a stream of incoming orders:

app = faust.App('myapp', broker='kafka://localhost')

# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order(faust.Record):
    account_id: str
    amount: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        # process infinite stream of orders.
        print(f'Order for {order.account_id}: {order.amount}')

The Agent decorator defines a "stream processor" that essentially consumes from a Kafka topic and does something for every event it receives.

The agent is an async def function, so can also perform other operations asynchronously, such as web requests.

This system can persist state, acting like a database. Tables are named distributed key/value stores you can use as regular Python dictionaries.

Tables are stored locally on each machine using a super fast embedded database written in C++, called RocksDB.

Tables can also store aggregate counts that are optionally "windowed" so you can keep track of "number of clicks from the last day," or "number of clicks in the last hour." for example. Like Kafka Streams, we support tumbling, hopping and sliding windows of time, and old windows can be expired to stop data from filling up.

For reliability, we use a Kafka topic as "write-ahead-log". Whenever a key is changed we publish to the changelog. Standby nodes consume from this changelog to keep an exact replica of the data and enables instant recovery should any of the nodes fail.

To the user a table is just a dictionary, but data is persisted between restarts and replicated across nodes so on failover other nodes can take over automatically.

You can count page views by URL:

# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic('clicks', key_type=str, value_type=int)

# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)

@app.agent(click_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

The data sent to the Kafka topic is partitioned, which means the clicks will be sharded by URL in such a way that every count for the same URL will be delivered to the same Faust worker instance.

Faust supports any type of stream data: bytes, Unicode and serialized structures, but also comes with "Models" that use modern Python syntax to describe how keys and values in streams are serialized:

# Order is a json serialized dictionary,
# having these fields:

class Order(faust.Record):
    account_id: str
    product_id: str
    price: float
    quantity: float = 1.0

orders_topic = app.topic('orders', key_type=str, value_type=Order)

@app.agent(orders_topic)
async def process_order(orders):
    async for order in orders:
        # process each order using regular Python
        total_price = order.price * order.quantity
        await send_order_received_email(order.account_id, order)

Faust is statically typed, using the mypy type checker, so you can take advantage of static types when writing applications.

The Faust source code is small, well organized, and serves as a good resource for learning the implementation of Kafka Streams.

Learn more about Faust in the introduction introduction page to read more about Faust, system requirements, installation instructions, community resources, and more.

or go directly to the quickstart tutorial to see Faust in action by programming a streaming application.

then explore the User Guide for in-depth information organized by topic.

Local development

  1. Clone the project
  2. Create a virtualenv: python3.7 -m venv venv && source venv/bin/activate
  3. Install the requirements: ./scripts/install
  4. Run lint: ./scripts/lint
  5. Run tests: ./scripts/tests

Faust key points

Simple

Faust is extremely easy to use. To get started using other stream processing solutions you have complicated hello-world projects, and infrastructure requirements. Faust only requires Kafka, the rest is just Python, so If you know Python you can already use Faust to do stream processing, and it can integrate with just about anything.

Here's one of the easier applications you can make::

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

You're probably a bit intimidated by the async and await keywords, but you don't have to know how asyncio works to use Faust: just mimic the examples, and you'll be fine.

The example application starts two tasks: one is processing a stream, the other is a background thread sending events to that stream. In a real-life application, your system will publish events to Kafka topics that your processors can consume from, and the background thread is only needed to feed data into our example.

Highly Available

Faust is highly available and can survive network problems and server crashes. In the case of node failure, it can automatically recover, and tables have standby nodes that will take over.

Distributed

Start more instances of your application as needed.

Fast

A single-core Faust worker instance can already process tens of thousands of events every second, and we are reasonably confident that throughput will increase once we can support a more optimized Kafka client.

Flexible

Faust is just Python, and a stream is an infinite asynchronous iterator. If you know how to use Python, you already know how to use Faust, and it works with your favorite Python libraries like Django, Flask, SQLAlchemy, NTLK, NumPy, SciPy, TensorFlow, etc.

Bundles

Faust also defines a group of setuptools extensions that can be used to install Faust and the dependencies for a given feature.

You can specify these in your requirements or on the pip command-line by using brackets. Separate multiple bundles using the comma:

pip install "faust[rocksdb]"

pip install "faust[rocksdb,uvloop,fast,redis, aerospike]"

The following bundles are available:

Faust with extras

Stores

pip install faust[rocksdb] for using RocksDB for storing Faust table state. Recommended in production.

pip install faust[aerospike] for using Aerospike for storing Faust table state. Recommended if supported

Aerospike Configuration

Aerospike can be enabled as the state store by specifying store="aerospike://"

By default, all tables backed by Aerospike use use_partitioner=True and generate changelog topic events similar to a state store backed by RocksDB. The following configuration options should be passed in as keys to the options parameter in Table namespace : aerospike namespace

ttl: TTL for all KV's in the table

username: username to connect to the Aerospike cluster

password: password to connect to the Aerospike cluster

hosts : the hosts parameter as specified in the aerospike client

policies: the different policies for read/write/scans policies

client: a dict of host and policies defined above

Caching

faust[redis] for using Redis as a simple caching backend (Memcached-style).

Codecs

faust[yaml] for using YAML and the PyYAML library in streams.

Optimization

faust[fast] for installing all the available C speedup extensions to Faust core.

Sensors

faust[datadog] for using the Datadog Faust monitor.

faust[statsd] for using the Statsd Faust monitor.

faust[prometheus] for using the Prometheus Faust monitor.

Event Loops

faust[uvloop] for using Faust with uvloop.

faust[eventlet] for using Faust with eventlet

Debugging

faust[debug] for using aiomonitor to connect and debug a running Faust worker.

faust[setproctitle]when the setproctitle module is installed the Faust worker will use it to set a nicer process name in ps/top listings.vAlso installed with the fast and debug bundles.

Downloading and installing from source

Download the latest version of Faust from https://pypi.org/project/faust-streaming/

You can install it by doing:

$ tar xvfz faust-streaming-0.0.0.tar.gz
$ cd faust-streaming-0.0.0
$ python setup.py build
# python setup.py install

The last command must be executed as a privileged user if you are not currently using a virtualenv.

Using the development version

With pip

You can install the latest snapshot of Faust using the following pip command:

pip install https://github.com/faust-streaming/faust/zipball/master#egg=faust

FAQ

Can I use Faust with Django/Flask/etc

Yes! Use eventlet as a bridge to integrate with asyncio.

Using eventlet

This approach works with any blocking Python library that can work with eventlet

Using eventlet requires you to install the faust-aioeventlet module, and you can install this as a bundle along with Faust:

pip install -U faust[eventlet]

Then to actually use eventlet as the event loop you have to either use the -L <faust --loop> argument to the faust program:

faust -L eventlet -A myproj worker -l info

or add import mode.loop.eventlet at the top of your entry point script:

#!/usr/bin/env python3
import mode.loop.eventlet  # noqa

It's very important this is at the very top of the module, and that it executes before you import libraries.

Can I use Faust with Tornado

Yes! Use the tornado.platform.asyncio bridge

Can I use Faust with Twisted

Yes! Use the asyncio reactor implementation: https://twistedmatrix.com/documents/current/api/twisted.internet.asyncioreactor.html

Will you support Python 2.7 or Python 3.5

No. Faust requires Python 3.7 or later, since it heavily uses features that were introduced in Python 3.6 (async, await, variable type annotations).

I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this

You may need to increase the limit for the maximum number of open files. On macOS and Linux you can use:

ulimit -n max_open_files to increase the open files limit to max_open_files.

On docker, you can use the --ulimit flag:

docker run --ulimit nofile=50000:100000 <image-tag> where 50000 is the soft limit, and 100000 is the hard limit See the difference.

What kafka versions faust supports

Faust supports kafka with version >= 0.10.

Getting Help

Slack

For discussions about the usage, development, and future of Faust, please join the fauststream Slack.

Resources

Bug tracker

If you have any suggestions, bug reports, or annoyances please report them to our issue tracker at https://github.com/faust-streaming/faust/issues/

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

Contributing

Development of Faust happens at GitHub

You're highly encouraged to participate in the development of Faust.

Code of Conduct

Everyone interacting in the project's code bases, issue trackers, chat rooms, and mailing lists is expected to follow the Faust Code of Conduct.

As contributors and maintainers of these projects, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.

We are committed to making participation in these projects a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality.

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery
  • Personal attacks
  • Trolling or insulting/derogatory comments
  • Public or private harassment
  • Publishing other's private information, such as physical or electronic addresses, without explicit permission
  • Other unethical or unprofessional conduct.

Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team.

This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community.

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faust-streaming-0.9.3.tar.gz (752.4 kB view details)

Uploaded Source

Built Distributions

faust_streaming-0.9.3-cp310-cp310-musllinux_1_1_x86_64.whl (1.1 MB view details)

Uploaded CPython 3.10 musllinux: musl 1.1+ x86-64

faust_streaming-0.9.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.0 MB view details)

Uploaded CPython 3.10 manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.9.3-cp310-cp310-macosx_10_9_x86_64.whl (486.9 kB view details)

Uploaded CPython 3.10 macOS 10.9+ x86-64

faust_streaming-0.9.3-cp39-cp39-musllinux_1_1_x86_64.whl (1.1 MB view details)

Uploaded CPython 3.9 musllinux: musl 1.1+ x86-64

faust_streaming-0.9.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.0 MB view details)

Uploaded CPython 3.9 manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.9.3-cp39-cp39-macosx_10_9_x86_64.whl (486.7 kB view details)

Uploaded CPython 3.9 macOS 10.9+ x86-64

faust_streaming-0.9.3-cp38-cp38-musllinux_1_1_x86_64.whl (1.2 MB view details)

Uploaded CPython 3.8 musllinux: musl 1.1+ x86-64

faust_streaming-0.9.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB view details)

Uploaded CPython 3.8 manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.9.3-cp38-cp38-macosx_10_9_x86_64.whl (485.4 kB view details)

Uploaded CPython 3.8 macOS 10.9+ x86-64

faust_streaming-0.9.3-cp37-cp37m-musllinux_1_1_x86_64.whl (1.0 MB view details)

Uploaded CPython 3.7m musllinux: musl 1.1+ x86-64

faust_streaming-0.9.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (963.0 kB view details)

Uploaded CPython 3.7m manylinux: glibc 2.17+ x86-64 manylinux: glibc 2.5+ x86-64

faust_streaming-0.9.3-cp37-cp37m-macosx_10_9_x86_64.whl (482.7 kB view details)

Uploaded CPython 3.7m macOS 10.9+ x86-64

File details

Details for the file faust-streaming-0.9.3.tar.gz.

File metadata

  • Download URL: faust-streaming-0.9.3.tar.gz
  • Upload date:
  • Size: 752.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.15

File hashes

Hashes for faust-streaming-0.9.3.tar.gz
Algorithm Hash digest
SHA256 815b5c95048c00eeeccf1e84773a9f9cba011e9281ff32a6115ea7c0fb377219
MD5 2ce11041f18f12f22c46b3694d7870da
BLAKE2b-256 d54b12831fc87361abdd88cd9188f497a38544b1c6bf1f2d9335c9db16aaf2eb

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp310-cp310-musllinux_1_1_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp310-cp310-musllinux_1_1_x86_64.whl
Algorithm Hash digest
SHA256 fdda9fd6fd95d37bb0a67fe5aa2ee4c7a0867ecc34be3c921fb22931d258c1c3
MD5 a09cbf28a6ed36b9a91e08374da8e541
BLAKE2b-256 6eb85640f1d29774206c9b20ef41030ca2c65d36111f30ba447278ce8af15426

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 89b2f86c3b82dad5a8c3d94a024b1de2cce20379783e25b8843b16862aff45aa
MD5 b2fefc4f9bcb4b7650f881ee00031b8d
BLAKE2b-256 72b50850f0473a2fe150d508f0a6d0b0ea1f2db0b1dc66b76b986c04a37c184e

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp310-cp310-macosx_10_9_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp310-cp310-macosx_10_9_x86_64.whl
Algorithm Hash digest
SHA256 5354eabcf1dc715ccf9d1ce23202c9fc55279df5ea426c936920bee71720fcb1
MD5 c66c72b58beb4d9deeae0352485c1f2f
BLAKE2b-256 3b1579ef677c6f5ce8a1f50ed3f9d34a7446a08c6d3140968a084cd26bfeb851

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp39-cp39-musllinux_1_1_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp39-cp39-musllinux_1_1_x86_64.whl
Algorithm Hash digest
SHA256 1596191d6ffb45845409f992552eb2e95b01f96ce762aedf0949fd0bfde0282e
MD5 40bd0988f3494dd024cb94a96cc878f7
BLAKE2b-256 10b68d6143900b501296c73e3d21abfd767bda0975cc4c80e6d206ce30637aad

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 fe1099357a81875da4f33f5f066916db67f74c9ca2b7fa42e9f7453fce14a9da
MD5 c98ab0d468e1e835ec13f6c95f991ea3
BLAKE2b-256 f0f3b2efe56b8761e41e66cbfbb5fa38e459b5a24370cd1bd7d6c32f5c78d731

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp39-cp39-macosx_10_9_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp39-cp39-macosx_10_9_x86_64.whl
Algorithm Hash digest
SHA256 078297c377e4781a19bdd00cb9d6fa368eca9bd1ee206f003cb1793a5e79d9ad
MD5 2d4a2b3b8fd3c9f808a674f91f46ea91
BLAKE2b-256 ed5bfcf36d18dd38114429092e57a237903e2bc8e245155de127eb2bc6476b7a

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp38-cp38-musllinux_1_1_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp38-cp38-musllinux_1_1_x86_64.whl
Algorithm Hash digest
SHA256 ad86b4d9859015525340ca15a935a82a39f6269355ef119280c41f2a6ff8ec6e
MD5 f748a6cf797c4b70de32f4662117f519
BLAKE2b-256 a421b946c9a86d41e497a18112ced8131c0dd6a6501c5b452fb44f63c4324564

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 30986d5f65ccca6596020921f07cf97b9f2ef8de24ad0e128ba52bf599b09497
MD5 ee39a66d23c83433e4cba5fa1140f9b1
BLAKE2b-256 8f2b832ba21b238615514e371338540c744088bce4d093a12be73cfaf5ef7fef

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp38-cp38-macosx_10_9_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp38-cp38-macosx_10_9_x86_64.whl
Algorithm Hash digest
SHA256 a65af52359d8357b59682c47928d30e801a8935f41534d2231c72ca25cbf919e
MD5 57677a7ad2ae866f911bbd1973364b29
BLAKE2b-256 75a5c7d4da78841ff940f7a436674aeeba18e00ddb8d24c582779adae5c4e7cb

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp37-cp37m-musllinux_1_1_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp37-cp37m-musllinux_1_1_x86_64.whl
Algorithm Hash digest
SHA256 6bfc90043062aed60730d57ff859ac7069e82be5d619310c4c3ca449e134ab5a
MD5 1a903e5ef6ebd70bfab3036ca919f004
BLAKE2b-256 1db4bf02a2dbd71376ae9cb190afb320c0049da93f3f81a7e397fa1673ae45d4

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 6a3f4afd0f44954be945def5b88e0ab6efe13497a59936800c5e36978d6eb17e
MD5 b47c887cc007628621e348285f72c555
BLAKE2b-256 c0f1eac78e581d3240d62fa6bcf814771f09f24f5773b9640d2f9c5aa2d534b2

See more details on using hashes here.

File details

Details for the file faust_streaming-0.9.3-cp37-cp37m-macosx_10_9_x86_64.whl.

File metadata

File hashes

Hashes for faust_streaming-0.9.3-cp37-cp37m-macosx_10_9_x86_64.whl
Algorithm Hash digest
SHA256 c2e88a347793b20eb54a16b2c3706250163d3b4526dfb1134ed80c556ff81b0b
MD5 d6d2fd3fa34e0c1aeecf173d2289ad59
BLAKE2b-256 463f2de25dc71d0b53a41494277654a834b985e7052ff860b96caff84f3b64a5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page