Skip to main content

Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow.

Project description

Python SDK for Numaflow

Build black License Release Version

This SDK provides the interface for writing UDFs and UDSinks in Python.

Installation

Install the package using pip.

pip install pynumaflow

Build locally

This project uses Poetry for dependency management and packaging. To build the package locally, run the following command from the root of the project.

make setup

To run unit tests:

make test

To format code style using black and ruff:

make lint

Setup pre-commit hooks:

pre-commit install

Implement a User Defined Function (UDF)

Map

from pynumaflow.function import Messages, Message, Datum, Server


def my_handler(keys: list[str], datum: Datum) -> Messages:
    val = datum.value
    _ = datum.event_time
    _ = datum.watermark
    return Messages(Message(value=val, keys=keys))


if __name__ == "__main__":
    grpc_server = Server(map_handler=my_handler)
    grpc_server.start()

MapT - Map with event time assignment capability

In addition to the regular Map function, MapT supports assigning a new event time to the message. MapT is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.

from datetime import datetime
from pynumaflow.function import MessageTs, MessageT, Datum, Server


def mapt_handler(keys: list[str], datum: Datum) -> MessageTs:
    val = datum.value
    new_event_time = datetime.now()
    _ = datum.watermark
    message_t_s = MessageTs(MessageT(val, event_time=new_event_time, keys=keys))
    return message_t_s


if __name__ == "__main__":
    grpc_server = Server(mapt_handler=mapt_handler)
    grpc_server.start()

Reduce

import aiorun
from typing import Iterator, List
from pynumaflow.function import Messages, Message, Datum, Metadata, AsyncServer


async def my_handler(
    keys: List[str], datums: Iterator[Datum], md: Metadata
) -> Messages:
    interval_window = md.interval_window
    counter = 0
    async for _ in datums:
        counter += 1
    msg = (
        f"counter:{counter} interval_window_start:{interval_window.start} "
        f"interval_window_end:{interval_window.end}"
    )
    return Messages(Message(str.encode(msg), keys))


if __name__ == "__main__":
    grpc_server = AsyncServer(reduce_handler=my_handler)
    aiorun.run(grpc_server.start())

Sample Image

A sample UDF Dockerfile is provided under examples.

Implement a User Defined Sink (UDSink)

from typing import Iterator
from pynumaflow.sink import Datum, Responses, Response, Sink


def my_handler(datums: Iterator[Datum]) -> Responses:
    responses = Responses()
    for msg in datums:
        print("User Defined Sink", msg.value.decode("utf-8"))
        responses.append(Response.as_success(msg.id))
    return responses


if __name__ == "__main__":
    grpc_server = Sink(my_handler)
    grpc_server.start()

Sample Image

A sample UDSink Dockerfile is provided under examples.

Datum Metadata

The Datum object contains the message payload and metadata. Currently, there are two fields in metadata: the message ID, the message delivery count to indicate how many times the message has been delivered. You can use these metadata to implement customized logic. For example,

...


def my_handler(keys: list[str], datum: Datum) -> Messages:
    num_delivered = datum.metadata.num_delivered
    # Choose to do specific actions, if the message delivery count reaches a certain threshold.
    if num_delivered > 3:
        ...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pynumaflow-0.4.2.tar.gz (27.7 kB view hashes)

Uploaded Source

Built Distribution

pynumaflow-0.4.2-py3-none-any.whl (38.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page