Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow.
Project description
Python SDK for Numaflow
This SDK provides the interface for writing UDFs and UDSinks in Python.
Implement a User Defined Function (UDF)
Map
from pynumaflow.function import Messages, Message, Datum, Server
from typing import List
def my_handler(keys: List[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages(Message(value=val, keys=keys))
return messages
if __name__ == "__main__":
grpc_server = Server(map_handler=my_handler)
grpc_server.start()
MapT - Map with event time assignment capability
In addition to the regular Map function, MapT supports assigning a new event time to the message. MapT is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
import datetime
from pynumaflow.function import MessageTs, MessageT, Datum, Server
from typing import List
def mapt_handler(keys: List[str], datum: Datum) -> MessageTs:
val = datum.value
new_event_time = datetime.time()
_ = datum.watermark
message_t_s = MessageTs(MessageT(new_event_time, val, keys))
return message_t_s
if __name__ == "__main__":
grpc_server = Server(mapt_handler=mapt_handler)
grpc_server.start()
Reduce
import aiorun
import asyncio
from typing import Iterator, List
from pynumaflow.function import Messages, Message, Datum, Metadata, AsyncServer
async def my_handler(keys: List[str], datums: Iterator[Datum], md: Metadata) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
counter += 1
msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys))
if __name__ == "__main__":
grpc_server = AsyncServer(reduce_handler=my_handler)
aiorun.run(grpc_server.start())
Sample Image
A sample UDF Dockerfile is provided under examples.
Implement a User Defined Sink (UDSink)
from typing import Iterator
from pynumaflow.sink import Datum, Responses, Response, Sink
def my_handler(datums: Iterator[Datum]) -> Responses:
responses = Responses()
for msg in datums:
print("User Defined Sink", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
if __name__ == "__main__":
grpc_server = Sink(my_handler)
grpc_server.start()
Sample Image
A sample UDSink Dockerfile is provided under examples.
Datum Metadata
The Datum object contains the message payload and metadata. Currently, there are two fields in metadata: the message ID, the message delivery count to indicate how many times the message has been delivered. You can use these metadata to implement customized logic. For example,
...
def my_handler(keys: List[str], datum: Datum) -> Messages:
num_delivered = datum.metadata.num_delivered
# Choose to do specific actions, if the message delivery count reaches a certain threshold.
if num_delivered > 3:
...
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pynumaflow-0.4.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7156174297933be5704bedbfc7303dcab158139d90959c88d084a856faec39a0 |
|
MD5 | 8c628117e05ffb8c6c0d3eb0a657278b |
|
BLAKE2b-256 | ed96d4738b8cb218f3f287ba61466ecd475caefc214722512a3bdf0ea80ec9d8 |