A Python concurrency library inspired by Go's goroutines and channels
Project description
PyChannel
PyChannel é uma biblioteca de concorrência inspirada nas goroutines e nos channels da linguagem Go. Sua API é composta por três módulos principais: Task, Channel e Buffer.
A PyChannel utiliza processos gerenciados pelo
loky. Ela é inspirada nos conceitos do Go, mas não implementa goroutines reais.
Fluxo de execução
flowchart LR
A["Função decorada"] --> B["Task"]
B --> C["Worker do loky"]
C --> D["Future"]
D --> E{"Buffer configurado?"}
E -- "Não" --> F["Channel recebe o resultado"]
E -- "Sim, acumulando" --> G["Channel recebe False"]
E -- "Sim, lote pronto" --> H["Channel recebe a lista"]
A Task envia a função para um worker. O resultado retorna por um Future e, quando configurado, também é encaminhado ao Channel. Se a Task possuir um Buffer, os resultados são acumulados antes da liberação do lote.
Task
A classe Task transforma uma função comum em uma tarefa executada por processos.
Task(
max_workers,
name=None,
channel=None,
buffer=None,
)
| Parâmetro | Função |
|---|---|
max_workers |
Define a quantidade máxima de processos disponíveis no pool. |
name |
Define o nome da tarefa exibido nos logs. |
channel |
Recebe os resultados e as exceções das execuções. |
buffer |
Ativa o agrupamento automático dos resultados. |
Decorando uma função
O método run() retorna o decorator responsável por enviar cada chamada ao executor:
from PyChannel import Channel, Task
channel = Channel(name="Resultados")
task = Task(max_workers=4, name="Cálculos", channel=channel)
@task.run()
def quadrado(numero):
return numero * numero
future = quadrado(10)
A chamada não devolve o resultado diretamente. Ela devolve um Future:
resultado = future.result() # 100
Como existe um Channel, o mesmo resultado também pode ser consumido por ele:
resultado = channel.get() # 100
Comportamento da Task
| Configuração | Comportamento |
|---|---|
Sem Channel e sem Buffer |
O resultado fica disponível somente no Future. |
Com Channel |
Cada resultado é enviado ao Channel. |
Com Channel e Buffer |
O Channel recebe False durante o acúmulo e uma lista quando o lote fica pronto. |
Com Buffer, sem Channel |
O Buffer é atualizado e o resultado individual continua disponível no Future. |
Se a função gerar uma exceção, ela permanece armazenada no Future. Quando existe um Channel, o objeto da exceção também é enviado para ele.
Channel
O Channel transporta valores entre produtores e consumidores por meio de uma multiprocessing.Queue.
Channel(name=None, limit=None)
| Parâmetro | Função |
|---|---|
name |
Define o nome utilizado nos logs. |
limit |
Define a quantidade máxima de valores pendentes. None cria uma fila sem limite explícito. |
Enviando e recebendo valores
from PyChannel import Channel
channel = Channel(name="Eventos")
channel.send("processado")
resultado = channel.get()
print(resultado) # processado
send() bloqueia se um Channel limitado estiver cheio. get() bloqueia enquanto não houver nenhum valor disponível.
Ignorando resultados falsos
O método get(loop=True) continua consumindo a fila até encontrar um valor verdadeiro:
resultado = channel.get(loop=True)
Esse modo é útil com o Buffer automático, pois ignora os valores False enviados durante o acúmulo e retorna diretamente o lote:
somar()
somar()
lote = channel.get(loop=True)
print(lote) # [2, 2]
loop=Trueignora qualquer valor considerado falso pelo Python, incluindoFalse,None,0,""e coleções vazias.
Buffer
O Buffer acumula valores temporariamente até atingir sua capacidade ou seu limite de tempo.
Buffer(capacity, timeout, name=None)
| Parâmetro | Função |
|---|---|
capacity |
Quantidade máxima de valores acumulados. |
timeout |
Tempo máximo, em segundos, contado a partir do primeiro valor. |
name |
Define o nome utilizado nos logs. |
O acesso interno é protegido por um RLock, permitindo chamadas concorrentes aos seus métodos.
Uso manual
from PyChannel import Buffer
buffer = Buffer(capacity=3, timeout=10, name="Lote")
buffer.sender("A")
buffer.sender("B")
print(buffer.status()) # False
buffer.sender("C")
if buffer.status():
lote = buffer.get().copy()
buffer.reset()
print(lote) # ["A", "B", "C"]
Métodos
| Método | Comportamento |
|---|---|
sender(data) |
Adiciona um valor se ainda houver capacidade. O contador de tempo começa no primeiro valor. |
get() |
Retorna a lista atualmente armazenada. |
status() |
Retorna True quando a capacidade ou o timeout é atingido. |
reset() |
Limpa a lista e reinicia o controle de tempo. |
O timeout não cria uma tarefa em segundo plano. Ele é avaliado quando status() é chamado ou quando a Task processa um novo resultado.
Integração dos módulos
from PyChannel import Buffer, Channel, Task
buffer = Buffer(capacity=2, timeout=10, name="Resultados")
channel = Channel(name="Saída")
task = Task(
max_workers=2,
name="Soma",
channel=channel,
buffer=buffer,
)
@task.run()
def somar(a, b):
return a + b
somar(1, 1)
somar(2, 2)
lote = channel.get(loop=True)
print(lote) # A ordem pode ser [2, 4] ou [4, 2]
Os workers executam de forma independente. Por isso, os resultados chegam ao Buffer e ao Channel na ordem de conclusão, não necessariamente na ordem de envio.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pychannel-0.1.3.tar.gz.
File metadata
- Download URL: pychannel-0.1.3.tar.gz
- Upload date:
- Size: 5.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46abd0e53109f00be0cbeefa46bec5115bf5e30a7c476aa2f9da61349f9ce9f4
|
|
| MD5 |
d31c89d5add27e0ea49e4874644bb205
|
|
| BLAKE2b-256 |
2b9f05850871af3fc26a6b50e91b2b3927f79b2fad1441171701413974076db2
|
File details
Details for the file pychannel-0.1.3-py3-none-any.whl.
File metadata
- Download URL: pychannel-0.1.3-py3-none-any.whl
- Upload date:
- Size: 6.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
655202ca7a437d7b3233059022822faf486f6795952e6e3c6f916e9ec1e6ea03
|
|
| MD5 |
9f52db9efbce9c288b0bc303b418c800
|
|
| BLAKE2b-256 |
926ea637cc99e2d7ab20711652893d47b1b0247306cdb97d7f7984e71ec5e28a
|