Channel implementations for Python
Project description
Frequenz channels
Introduction
Frequenz Channels is a channels implementation for Python.
According to Wikipedia:
A channel is a model for interprocess communication and synchronization via message passing. A message may be sent over a channel, and another process or thread is able to receive messages sent over a channel it has a reference to, as a stream. Different implementations of channels may be buffered or not, and either synchronous or asynchronous.
Frequenz Channels are mostly designed after Go channels but it also borrows ideas from Rust channels.
Supported Platforms
The following platforms are officially supported (tested):
- Python: 3.11
- Operating System: Ubuntu Linux 20.04
- Architectures: amd64, arm64
[!NOTE] Newer Python versions and other operating systems and architectures might work too, but they are not automatically tested, so we cannot guarantee it.
Quick Start
Installing
Assuming a supported working Python environment:
python3 -m pip install frequenz-channels
[!TIP] For more details please read the Installation Guide.
Examples
Hello World
import asyncio
from frequenz.channels import Anycast
async def main() -> None:
hello_channel = Anycast[str](name="hello-world-channel")
sender = hello_channel.new_sender()
receiver = hello_channel.new_receiver()
await sender.send("Hello World!")
message = await receiver.receive()
print(message)
asyncio.run(main())
Showcase
This is a comprehensive example that shows most of the main features of the library:
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum, auto
from typing import assert_never
from frequenz.channels import (
Anycast,
Broadcast,
Receiver,
Sender,
merge,
select,
selected_from,
)
from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed
class Command(Enum):
PING = auto()
STOP_SENDER = auto()
class ReplyCommand(Enum):
PONG = auto()
@dataclass(frozen=True)
class Reply:
reply: ReplyCommand
source: str
async def send(
sender: Sender[str],
control_command: Receiver[Command],
control_reply: Sender[Reply],
) -> None:
"""Send a counter value every second, until a stop command is received."""
print(f"{sender}: Starting")
timer = Timer(timedelta(seconds=1.0), TriggerAllMissed())
counter = 0
async for selected in select(timer, control_command):
if selected_from(selected, timer):
print(f"{sender}: Sending {counter}")
await sender.send(f"{sender}: {counter}")
counter += 1
elif selected_from(selected, control_command):
print(f"{sender}: Received command: {selected.message.name}")
match selected.message:
case Command.STOP_SENDER:
print(f"{sender}: Stopping")
break
case Command.PING:
print(f"{sender}: Ping received, reply with pong")
await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
case _ as unknown:
assert_never(unknown)
print(f"{sender}: Finished")
async def receive(
receivers: list[Receiver[str]],
control_command: Receiver[Command],
control_reply: Sender[Reply],
) -> None:
"""Receive data from multiple channels, until no more data is received for 2 seconds."""
print("receive: Starting")
timer = Timer(timedelta(seconds=2.0), SkipMissedAndDrift())
print(f"{timer=}")
merged = merge(*receivers)
async for selected in select(merged, timer, control_command):
if selected_from(selected, merged):
message = selected.message
print(f"receive: Received {message=}")
timer.reset()
print(f"{timer=}")
elif selected_from(selected, control_command):
print(f"receive: received command: {selected.message.name}")
match selected.message:
case Command.PING:
print("receive: Ping received, reply with pong")
await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
case Command.STOP_SENDER:
pass # Ignore
case _ as unknown:
assert_never(unknown)
elif selected_from(selected, timer):
drift = selected.message
print(
f"receive: No data received for {timer.interval + drift} seconds, "
"giving up"
)
break
print("receive: Finished")
async def main() -> None:
data_channel_1 = Anycast[str](name="data-channel-1")
data_channel_2 = Anycast[str](name="data-channel-2")
command_channel = Broadcast[Command](name="control-channel") # (1)!
reply_channel = Anycast[Reply](name="reply-channel")
async with asyncio.TaskGroup() as tasks:
tasks.create_task(
send(
data_channel_1.new_sender(),
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="send-channel-1",
)
tasks.create_task(
send(
data_channel_2.new_sender(),
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="send-channel-2",
)
tasks.create_task(
receive(
[data_channel_1.new_receiver(), data_channel_2.new_receiver()],
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="receive",
)
control_sender = command_channel.new_sender()
reply_receiver = reply_channel.new_receiver()
# Send a ping command to all tasks and wait for the replies
await control_sender.send(Command.PING)
print(f"main: {await reply_receiver.receive()}")
print(f"main: {await reply_receiver.receive()}")
print(f"main: {await reply_receiver.receive()}")
await asyncio.sleep(5.0)
# Stop senders, after 2 seconds not receiving any data,
# the receiver will stop too
await control_sender.send(Command.STOP_SENDER)
asyncio.run(main())
Documentation
For more information, please read the documentation website.
Contributing
If you want to know how to build this project and contribute to it, please check out the Contributing Guide.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file frequenz-channels-1.1.2.tar.gz
.
File metadata
- Download URL: frequenz-channels-1.1.2.tar.gz
- Upload date:
- Size: 39.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2df8e42dcdd2e7c3aa0789bf477364d43e135da0e3005297a8a732b42dc3f303 |
|
MD5 | c7011b204e7bf20ec67d2e8332765299 |
|
BLAKE2b-256 | a4cde600aeac1cad39caf53ac9f57fceb008a33372463870d0ec03317a6ee56d |
File details
Details for the file frequenz_channels-1.1.2-py3-none-any.whl
.
File metadata
- Download URL: frequenz_channels-1.1.2-py3-none-any.whl
- Upload date:
- Size: 46.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7e764d016eb33a8023af95e54a9687c80f39ca72a7543818dd354f14d8726607 |
|
MD5 | a5a181a46e18e2cf95dc72e46b301837 |
|
BLAKE2b-256 | 6b3afe1727ba5ab6392a3c96feda11112645074309a0e5629041746d3ee8397d |