Skip to main content

A Python concurrency library inspired by Go's goroutines and channels

Project description

PyChannel

Python Versão Status Concorrência

PyChannel é uma biblioteca de concorrência inspirada nas goroutines e nos channels da linguagem Go. Sua API é composta por três módulos principais: Task, Channel e Buffer.

A PyChannel utiliza processos gerenciados pelo loky. Ela é inspirada nos conceitos do Go, mas não implementa goroutines reais.

Fluxo de execução

flowchart LR
    A["Função decorada"] --> B["Task"]
    B --> C["Worker do loky"]
    C --> D["Future"]
    D --> E{"Buffer configurado?"}
    E -- "Não" --> F["Channel recebe o resultado"]
    E -- "Sim, acumulando" --> G["Channel recebe False"]
    E -- "Sim, lote pronto" --> H["Channel recebe a lista"]

A Task envia a função para um worker. O resultado retorna por um Future e, quando configurado, também é encaminhado ao Channel. Se a Task possuir um Buffer, os resultados são acumulados antes da liberação do lote.

Task

A classe Task transforma uma função comum em uma tarefa executada por processos.

Task(
    max_workers,
    name=None,
    channel=None,
    buffer=None,
)
Parâmetro Função
max_workers Define a quantidade máxima de processos disponíveis no pool.
name Define o nome da tarefa exibido nos logs.
channel Recebe os resultados e as exceções das execuções.
buffer Ativa o agrupamento automático dos resultados.

Decorando uma função

O método run() retorna o decorator responsável por enviar cada chamada ao executor:

from PyChannel import Channel, Task

channel = Channel(name="Resultados")
task = Task(max_workers=4, name="Cálculos", channel=channel)


@task.run()
def quadrado(numero):
    return numero * numero


future = quadrado(10)

A chamada não devolve o resultado diretamente. Ela devolve um Future:

resultado = future.result()  # 100

Como existe um Channel, o mesmo resultado também pode ser consumido por ele:

resultado = channel.get()  # 100

Comportamento da Task

Configuração Comportamento
Sem Channel e sem Buffer O resultado fica disponível somente no Future.
Com Channel Cada resultado é enviado ao Channel.
Com Channel e Buffer O Channel recebe False durante o acúmulo e uma lista quando o lote fica pronto.
Com Buffer, sem Channel O Buffer é atualizado e o resultado individual continua disponível no Future.

Se a função gerar uma exceção, ela permanece armazenada no Future. Quando existe um Channel, o objeto da exceção também é enviado para ele.

Channel

O Channel transporta valores entre produtores e consumidores por meio de uma multiprocessing.Queue.

Channel(name=None, limit=None)
Parâmetro Função
name Define o nome utilizado nos logs.
limit Define a quantidade máxima de valores pendentes. None cria uma fila sem limite explícito.

Enviando e recebendo valores

from PyChannel import Channel

channel = Channel(name="Eventos")

channel.send("processado")
resultado = channel.get()

print(resultado)  # processado

send() bloqueia se um Channel limitado estiver cheio. get() bloqueia enquanto não houver nenhum valor disponível.

Ignorando resultados falsos

O método get(loop=True) continua consumindo a fila até encontrar um valor verdadeiro:

resultado = channel.get(loop=True)

Esse modo é útil com o Buffer automático, pois ignora os valores False enviados durante o acúmulo e retorna diretamente o lote:

somar()
somar()

lote = channel.get(loop=True)
print(lote)  # [2, 2]

loop=True ignora qualquer valor considerado falso pelo Python, incluindo False, None, 0, "" e coleções vazias.

Buffer

O Buffer acumula valores temporariamente até atingir sua capacidade ou seu limite de tempo.

Buffer(capacity, timeout, name=None)
Parâmetro Função
capacity Quantidade máxima de valores acumulados.
timeout Tempo máximo, em segundos, contado a partir do primeiro valor.
name Define o nome utilizado nos logs.

O acesso interno é protegido por um RLock, permitindo chamadas concorrentes aos seus métodos.

Uso manual

from PyChannel import Buffer

buffer = Buffer(capacity=3, timeout=10, name="Lote")

buffer.sender("A")
buffer.sender("B")

print(buffer.status())  # False

buffer.sender("C")

if buffer.status():
    lote = buffer.get().copy()
    buffer.reset()
    print(lote)  # ["A", "B", "C"]

Métodos

Método Comportamento
sender(data) Adiciona um valor se ainda houver capacidade. O contador de tempo começa no primeiro valor.
get() Retorna a lista atualmente armazenada.
status() Retorna True quando a capacidade ou o timeout é atingido.
reset() Limpa a lista e reinicia o controle de tempo.

O timeout não cria uma tarefa em segundo plano. Ele é avaliado quando status() é chamado ou quando a Task processa um novo resultado.

Integração dos módulos

from PyChannel import Buffer, Channel, Task

buffer = Buffer(capacity=2, timeout=10, name="Resultados")
channel = Channel(name="Saída")
task = Task(
    max_workers=2,
    name="Soma",
    channel=channel,
    buffer=buffer,
)


@task.run()
def somar(a, b):
    return a + b


somar(1, 1)
somar(2, 2)

lote = channel.get(loop=True)
print(lote)  # A ordem pode ser [2, 4] ou [4, 2]

Os workers executam de forma independente. Por isso, os resultados chegam ao Buffer e ao Channel na ordem de conclusão, não necessariamente na ordem de envio.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pychannel-0.1.3.tar.gz (5.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pychannel-0.1.3-py3-none-any.whl (6.7 kB view details)

Uploaded Python 3

File details

Details for the file pychannel-0.1.3.tar.gz.

File metadata

  • Download URL: pychannel-0.1.3.tar.gz
  • Upload date:
  • Size: 5.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.14

File hashes

Hashes for pychannel-0.1.3.tar.gz
Algorithm Hash digest
SHA256 46abd0e53109f00be0cbeefa46bec5115bf5e30a7c476aa2f9da61349f9ce9f4
MD5 d31c89d5add27e0ea49e4874644bb205
BLAKE2b-256 2b9f05850871af3fc26a6b50e91b2b3927f79b2fad1441171701413974076db2

See more details on using hashes here.

File details

Details for the file pychannel-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: pychannel-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 6.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.14

File hashes

Hashes for pychannel-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 655202ca7a437d7b3233059022822faf486f6795952e6e3c6f916e9ec1e6ea03
MD5 9f52db9efbce9c288b0bc303b418c800
BLAKE2b-256 926ea637cc99e2d7ab20711652893d47b1b0247306cdb97d7f7984e71ec5e28a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page