Skip to main content

A Python concurrency library inspired by Go's goroutines and channels

Project description

PyChannel

Python Versão Status Concorrência

PyChannel é uma biblioteca de concorrência inspirada nas goroutines e nos channels da linguagem Go. Sua API é composta por três módulos principais: Task, Channel e Buffer.

A PyChannel utiliza processos gerenciados pelo loky. Ela é inspirada nos conceitos do Go, mas não implementa goroutines reais.

Fluxo de execução

flowchart LR
    A["Função decorada"] --> B["Task"]
    B --> C["Worker do loky"]
    C --> D["Future"]
    D --> E{"Buffer configurado?"}
    E -- "Não" --> F["Channel recebe o resultado"]
    E -- "Sim, acumulando" --> G["Channel recebe False"]
    E -- "Sim, lote pronto" --> H["Channel recebe a lista"]

A Task envia a função para um worker. O resultado retorna por um Future e, quando configurado, também é encaminhado ao Channel. Se a Task possuir um Buffer, os resultados são acumulados antes da liberação do lote.

Task

A classe Task transforma uma função comum em uma tarefa executada por processos.

Task(
    max_workers,
    name=None,
    channel=None,
    buffer=None,
)
Parâmetro Função
max_workers Define a quantidade máxima de processos disponíveis no pool.
name Define o nome da tarefa exibido nos logs.
channel Recebe os resultados e as exceções das execuções.
buffer Ativa o agrupamento automático dos resultados.

Decorando uma função

O método run() retorna o decorator responsável por enviar cada chamada ao executor:

from PyChannel import Channel, Task

channel = Channel(name="Resultados")
task = Task(max_workers=4, name="Cálculos", channel=channel)


@task.run()
def quadrado(numero):
    return numero * numero


future = quadrado(10)

A chamada não devolve o resultado diretamente. Ela devolve um Future:

resultado = future.result()  # 100

Como existe um Channel, o mesmo resultado também pode ser consumido por ele:

resultado = channel.get()  # 100

Comportamento da Task

Configuração Comportamento
Sem Channel e sem Buffer O resultado fica disponível somente no Future.
Com Channel Cada resultado é enviado ao Channel.
Com Channel e Buffer O Channel recebe False durante o acúmulo e uma lista quando o lote fica pronto.
Com Buffer, sem Channel O Buffer é atualizado e o resultado individual continua disponível no Future.

Se a função gerar uma exceção, ela permanece armazenada no Future. Quando existe um Channel, o objeto da exceção também é enviado para ele.

Channel

O Channel transporta valores entre produtores e consumidores por meio de uma multiprocessing.Queue.

Channel(name=None, limit=None)
Parâmetro Função
name Define o nome utilizado nos logs.
limit Define a quantidade máxima de valores pendentes. None cria uma fila sem limite explícito.

Enviando e recebendo valores

from PyChannel import Channel

channel = Channel(name="Eventos")

channel.send("processado")
resultado = channel.get()

print(resultado)  # processado

send() bloqueia se um Channel limitado estiver cheio. get() bloqueia enquanto não houver nenhum valor disponível.

Ignorando resultados falsos

O método get(loop=True) continua consumindo a fila até encontrar um valor verdadeiro:

resultado = channel.get(loop=True)

Esse modo é útil com o Buffer automático, pois ignora os valores False enviados durante o acúmulo e retorna diretamente o lote:

somar()
somar()

lote = channel.get(loop=True)
print(lote)  # [2, 2]

loop=True ignora qualquer valor considerado falso pelo Python, incluindo False, None, 0, "" e coleções vazias.

Buffer

O Buffer acumula valores temporariamente até atingir sua capacidade ou seu limite de tempo.

Buffer(capacity, timeout, name=None)
Parâmetro Função
capacity Quantidade máxima de valores acumulados.
timeout Tempo máximo, em segundos, contado a partir do primeiro valor.
name Define o nome utilizado nos logs.

O acesso interno é protegido por um RLock, permitindo chamadas concorrentes aos seus métodos.

Uso manual

from PyChannel import Buffer

buffer = Buffer(capacity=3, timeout=10, name="Lote")

buffer.sender("A")
buffer.sender("B")

print(buffer.status())  # False

buffer.sender("C")

if buffer.status():
    lote = buffer.get().copy()
    buffer.reset()
    print(lote)  # ["A", "B", "C"]

Métodos

Método Comportamento
sender(data) Adiciona um valor se ainda houver capacidade. O contador de tempo começa no primeiro valor.
get() Retorna a lista atualmente armazenada.
status() Retorna True quando a capacidade ou o timeout é atingido.
reset() Limpa a lista e reinicia o controle de tempo.

O timeout não cria uma tarefa em segundo plano. Ele é avaliado quando status() é chamado ou quando a Task processa um novo resultado.

Integração dos módulos

from PyChannel import Buffer, Channel, Task

buffer = Buffer(capacity=2, timeout=10, name="Resultados")
channel = Channel(name="Saída")
task = Task(
    max_workers=2,
    name="Soma",
    channel=channel,
    buffer=buffer,
)


@task.run()
def somar(a, b):
    return a + b


somar(1, 1)
somar(2, 2)

lote = channel.get(loop=True)
print(lote)  # A ordem pode ser [2, 4] ou [4, 2]

Os workers executam de forma independente. Por isso, os resultados chegam ao Buffer e ao Channel na ordem de conclusão, não necessariamente na ordem de envio.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pychannel-0.1.0.tar.gz (5.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pychannel-0.1.0-py3-none-any.whl (6.7 kB view details)

Uploaded Python 3

File details

Details for the file pychannel-0.1.0.tar.gz.

File metadata

  • Download URL: pychannel-0.1.0.tar.gz
  • Upload date:
  • Size: 5.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.14

File hashes

Hashes for pychannel-0.1.0.tar.gz
Algorithm Hash digest
SHA256 de8c79c0fe7f5d21e9ee75d5d15fce0339c7df45c20357c05ea8884cac6c97b2
MD5 cb09fba2b0721a6e463b4f4b5f339213
BLAKE2b-256 b3b0257a3b734ab5bfeadc05f6b10735866898fd73cca34c2417a7a721084a94

See more details on using hashes here.

File details

Details for the file pychannel-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: pychannel-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 6.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.14

File hashes

Hashes for pychannel-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 32c697cc278a485b7c9efd42aff020d4c1856a2be3dc32413a69737b8bb48c60
MD5 6f808740c1666d698ea25aa7de9f7e79
BLAKE2b-256 feb745e750142824b4ceb9b31f7b5edef057740343549481805f5e510cbc7e93

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page