Skip to main content

Channel implementations for Python

Project description

Frequenz channels

Build Status PyPI Package Docs

Introduction

Frequenz Channels is a channels implementation for Python.

According to Wikipedia:

A channel is a model for interprocess communication and synchronization via message passing. A message may be sent over a channel, and another process or thread is able to receive messages sent over a channel it has a reference to, as a stream. Different implementations of channels may be buffered or not, and either synchronous or asynchronous.

Frequenz Channels are mostly designed after Go channels but it also borrows ideas from Rust channels.

Supported Platforms

The following platforms are officially supported (tested):

  • Python: 3.11
  • Operating System: Ubuntu Linux 20.04
  • Architectures: amd64, arm64

[!NOTE] Newer Python versions and other operating systems and architectures might work too, but they are not automatically tested, so we cannot guarantee it.

Quick Start

Installing

Assuming a supported working Python environment:

python3 -m pip install frequenz-channels

[!TIP] For more details please read the Installation Guide.

Examples

Hello World

import asyncio

from frequenz.channels import Anycast


async def main() -> None:
    hello_channel = Anycast[str](name="hello-world-channel")
    sender = hello_channel.new_sender()
    receiver = hello_channel.new_receiver()

    await sender.send("Hello World!")
    message = await receiver.receive()
    print(message)


asyncio.run(main())

Showcase

This is a comprehensive example that shows most of the main features of the library:

import asyncio
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum, auto
from typing import assert_never

from frequenz.channels import (
    Anycast,
    Broadcast,
    Receiver,
    Sender,
    merge,
    select,
    selected_from,
)
from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed


class Command(Enum):
    PING = auto()
    STOP_SENDER = auto()


class ReplyCommand(Enum):
    PONG = auto()


@dataclass(frozen=True)
class Reply:
    reply: ReplyCommand
    source: str


async def send(
    sender: Sender[str],
    control_command: Receiver[Command],
    control_reply: Sender[Reply],
) -> None:
    """Send a counter value every second, until a stop command is received."""
    print(f"{sender}: Starting")
    timer = Timer(timedelta(seconds=1.0), TriggerAllMissed())
    counter = 0
    async for selected in select(timer, control_command):
        if selected_from(selected, timer):
            print(f"{sender}: Sending {counter}")
            await sender.send(f"{sender}: {counter}")
            counter += 1
        elif selected_from(selected, control_command):
            print(f"{sender}: Received command: {selected.message.name}")
            match selected.message:
                case Command.STOP_SENDER:
                    print(f"{sender}: Stopping")
                    break
                case Command.PING:
                    print(f"{sender}: Ping received, reply with pong")
                    await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
                case _ as unknown:
                    assert_never(unknown)
    print(f"{sender}: Finished")


async def receive(
    receivers: list[Receiver[str]],
    control_command: Receiver[Command],
    control_reply: Sender[Reply],
) -> None:
    """Receive data from multiple channels, until no more data is received for 2 seconds."""
    print("receive: Starting")
    timer = Timer(timedelta(seconds=2.0), SkipMissedAndDrift())
    print(f"{timer=}")
    merged = merge(*receivers)
    async for selected in select(merged, timer, control_command):
        if selected_from(selected, merged):
            message = selected.message
            print(f"receive: Received {message=}")
            timer.reset()
            print(f"{timer=}")
        elif selected_from(selected, control_command):
            print(f"receive: received command: {selected.message.name}")
            match selected.message:
                case Command.PING:
                    print("receive: Ping received, reply with pong")
                    await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
                case Command.STOP_SENDER:
                    pass  # Ignore
                case _ as unknown:
                    assert_never(unknown)
        elif selected_from(selected, timer):
            drift = selected.message
            print(
                f"receive: No data received for {timer.interval + drift} seconds, "
                "giving up"
            )
            break
    print("receive: Finished")


async def main() -> None:
    data_channel_1 = Anycast[str](name="data-channel-1")
    data_channel_2 = Anycast[str](name="data-channel-2")
    command_channel = Broadcast[Command](name="control-channel")  # (1)!
    reply_channel = Anycast[Reply](name="reply-channel")

    async with asyncio.TaskGroup() as tasks:
        tasks.create_task(
            send(
                data_channel_1.new_sender(),
                command_channel.new_receiver(),
                reply_channel.new_sender(),
            ),
            name="send-channel-1",
        )
        tasks.create_task(
            send(
                data_channel_2.new_sender(),
                command_channel.new_receiver(),
                reply_channel.new_sender(),
            ),
            name="send-channel-2",
        )
        tasks.create_task(
            receive(
                [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
                command_channel.new_receiver(),
                reply_channel.new_sender(),
            ),
            name="receive",
        )

        control_sender = command_channel.new_sender()
        reply_receiver = reply_channel.new_receiver()

        # Send a ping command to all tasks and wait for the replies
        await control_sender.send(Command.PING)
        print(f"main: {await reply_receiver.receive()}")
        print(f"main: {await reply_receiver.receive()}")
        print(f"main: {await reply_receiver.receive()}")

        await asyncio.sleep(5.0)

        # Stop senders, after 2 seconds not receiving any data,
        # the receiver will stop too
        await control_sender.send(Command.STOP_SENDER)


asyncio.run(main())

Documentation

For more information, please read the documentation website.

Contributing

If you want to know how to build this project and contribute to it, please check out the Contributing Guide.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

frequenz_channels-1.9.0.tar.gz (44.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

frequenz_channels-1.9.0-py3-none-any.whl (53.0 kB view details)

Uploaded Python 3

File details

Details for the file frequenz_channels-1.9.0.tar.gz.

File metadata

  • Download URL: frequenz_channels-1.9.0.tar.gz
  • Upload date:
  • Size: 44.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for frequenz_channels-1.9.0.tar.gz
Algorithm Hash digest
SHA256 de5bb3473e6c75ec8fdedf46afd23c2ce662e18063580595f45b266141f1e580
MD5 549a84c34d001cb6d7be65934313aaa7
BLAKE2b-256 d206835e7e86233e6b19cfee2ec9293a76d6c77f0f5f633130f993deb700e14e

See more details on using hashes here.

Provenance

The following attestation bundles were made for frequenz_channels-1.9.0.tar.gz:

Publisher: ci.yaml on frequenz-floss/frequenz-channels-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file frequenz_channels-1.9.0-py3-none-any.whl.

File metadata

File hashes

Hashes for frequenz_channels-1.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bac3ad78c5a7f52153963fceb9fb10c8e6755eb2563a9b66f1fc88a8e49746bd
MD5 3ba7205f7fd6d3d0ea807f51f9a15d92
BLAKE2b-256 28cf9a5124aad7f5d2d8e9afdd50a447d5f6be12d1b59cb7df2e1cdf611d149a

See more details on using hashes here.

Provenance

The following attestation bundles were made for frequenz_channels-1.9.0-py3-none-any.whl:

Publisher: ci.yaml on frequenz-floss/frequenz-channels-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page