Skip to main content

A Python concurrency library inspired by Go's goroutines and channels

Project description

PyChannel

Python Versão Status Concorrência

PyChannel é uma biblioteca de concorrência inspirada nas goroutines e nos channels da linguagem Go. Sua API é composta por três módulos principais: Task, Channel e Buffer.

A PyChannel utiliza processos gerenciados pelo loky. Ela é inspirada nos conceitos do Go, mas não implementa goroutines reais.

Fluxo de execução

flowchart LR
    A["Função decorada"] --> B["Task"]
    B --> C["Worker do loky"]
    C --> D["Future"]
    D --> E{"Buffer configurado?"}
    E -- "Não" --> F["Channel recebe o resultado"]
    E -- "Sim, acumulando" --> G["Channel recebe False"]
    E -- "Sim, lote pronto" --> H["Channel recebe a lista"]

A Task envia a função para um worker. O resultado retorna por um Future e, quando configurado, também é encaminhado ao Channel. Se a Task possuir um Buffer, os resultados são acumulados antes da liberação do lote.

Task

A classe Task transforma uma função comum em uma tarefa executada por processos.

Task(
    max_workers,
    name=None,
    channel=None,
    buffer=None,
)
Parâmetro Função
max_workers Define a quantidade máxima de processos disponíveis no pool.
name Define o nome da tarefa exibido nos logs.
channel Recebe os resultados e as exceções das execuções.
buffer Ativa o agrupamento automático dos resultados.

Decorando uma função

O método run() retorna o decorator responsável por enviar cada chamada ao executor:

from PyChannel import Channel, Task

channel = Channel(name="Resultados")
task = Task(max_workers=4, name="Cálculos", channel=channel)


@task.run()
def quadrado(numero):
    return numero * numero


future = quadrado(10)

A chamada não devolve o resultado diretamente. Ela devolve um Future:

resultado = future.result()  # 100

Como existe um Channel, o mesmo resultado também pode ser consumido por ele:

resultado = channel.get()  # 100

Comportamento da Task

Configuração Comportamento
Sem Channel e sem Buffer O resultado fica disponível somente no Future.
Com Channel Cada resultado é enviado ao Channel.
Com Channel e Buffer O Channel recebe False durante o acúmulo e uma lista quando o lote fica pronto.
Com Buffer, sem Channel O Buffer é atualizado e o resultado individual continua disponível no Future.

Se a função gerar uma exceção, ela permanece armazenada no Future. Quando existe um Channel, o objeto da exceção também é enviado para ele.

Os executores registrados pela Task são finalizados automaticamente quando o programa termina. Quando for necessário encerrar o pool antes do fim do programa, use task.shutdown().

Channel

O Channel transporta valores entre produtores e consumidores por meio de uma multiprocessing.Queue.

Channel(name=None, limit=None)
Parâmetro Função
name Define o nome utilizado nos logs.
limit Define a quantidade máxima de valores pendentes. None cria uma fila sem limite explícito.

Enviando e recebendo valores

from PyChannel import Channel

channel = Channel(name="Eventos")

channel.send("processado")
resultado = channel.get()

print(resultado)  # processado

send() bloqueia se um Channel limitado estiver cheio. get() bloqueia enquanto não houver nenhum valor disponível.

Ignorando resultados falsos

O método get(loop=True) continua consumindo a fila até encontrar um valor verdadeiro:

resultado = channel.get(loop=True)

Esse modo é útil com o Buffer automático, pois ignora os valores False enviados durante o acúmulo e retorna diretamente o lote:

somar()
somar()

lote = channel.get(loop=True)
print(lote)  # [2, 2]

loop=True ignora qualquer valor considerado falso pelo Python, incluindo False, None, 0, "" e coleções vazias.

Buffer

O Buffer acumula valores temporariamente até atingir sua capacidade ou seu limite de tempo.

Buffer(capacity, timeout, name=None)
Parâmetro Função
capacity Quantidade máxima de valores acumulados.
timeout Tempo máximo, em segundos, contado a partir do primeiro valor.
name Define o nome utilizado nos logs.

O acesso interno é protegido por um RLock, permitindo chamadas concorrentes aos seus métodos.

Uso manual

from PyChannel import Buffer

buffer = Buffer(capacity=3, timeout=10, name="Lote")

buffer.sender("A")
buffer.sender("B")

print(buffer.status())  # False

buffer.sender("C")

if buffer.status():
    lote = buffer.get().copy()
    buffer.reset()
    print(lote)  # ["A", "B", "C"]

Métodos

Método Comportamento
sender(data) Adiciona um valor se ainda houver capacidade. O contador de tempo começa no primeiro valor.
get() Retorna a lista atualmente armazenada.
status() Retorna True quando a capacidade ou o timeout é atingido.
reset() Limpa a lista e reinicia o controle de tempo.

O timeout não cria uma tarefa em segundo plano. Ele é avaliado quando status() é chamado ou quando a Task processa um novo resultado.

Integração dos módulos

from PyChannel import Buffer, Channel, Task

buffer = Buffer(capacity=2, timeout=10, name="Resultados")
channel = Channel(name="Saída")
task = Task(
    max_workers=2,
    name="Soma",
    channel=channel,
    buffer=buffer,
)


@task.run()
def somar(a, b):
    return a + b


somar(1, 1)
somar(2, 2)

lote = channel.get(loop=True)
print(lote)  # A ordem pode ser [2, 4] ou [4, 2]

Os workers executam de forma independente. Por isso, os resultados chegam ao Buffer e ao Channel na ordem de conclusão, não necessariamente na ordem de envio.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pychannel-1.5.tar.gz (6.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pychannel-1.5-py3-none-any.whl (7.2 kB view details)

Uploaded Python 3

File details

Details for the file pychannel-1.5.tar.gz.

File metadata

  • Download URL: pychannel-1.5.tar.gz
  • Upload date:
  • Size: 6.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.14

File hashes

Hashes for pychannel-1.5.tar.gz
Algorithm Hash digest
SHA256 cf6e5dfd0952c46e15c283ddcfa2836db5187fb791504accc1870d99019f9104
MD5 50f6628d243e9e831578a178b24c52af
BLAKE2b-256 78cf9239f6c126f64475f329e007e0f8a589536b092dc8861eb6393515f0f284

See more details on using hashes here.

File details

Details for the file pychannel-1.5-py3-none-any.whl.

File metadata

  • Download URL: pychannel-1.5-py3-none-any.whl
  • Upload date:
  • Size: 7.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.14

File hashes

Hashes for pychannel-1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 fb0c1ecb74f2d0708d135327f687fc7c12f0259111a131f3d27af82b233af187
MD5 d12322c792063898918554fcd9c45d11
BLAKE2b-256 5dce0b191371f95b97570fe3fa683bd69d71ae68ca6e6faedcd14076cb206860

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page