Skip to main content

asyncio pydipatch (Signal Manager)

Project description

aio_pydispatch

Asyncio pydispatch (Signal Manager)

This is based on pyDispatcher reference scrapy SignalManager implementation on Asyncio

Usage

Like the situation often encountered on the web

Init some signals and a signal manager

import asyncio
import logging
from aio_pydispatch import SignalManager

logging.basicConfig(level=logging.INFO)

server_start = object()
server_stop = object()

signal_manager = SignalManager()

Do something when signal is trigger. Another expression: the signal subscribe the event

def start(data: str) -> None:
    # Do something when server is start.
    logging.info(f'started. {data}')

def stop(data: str) -> None:
    # Do something when server is stop.
    logging.info(f'stoped. {data}')

signal_manager.connect(start, server_start)
signal_manager.connect(start, server_stop)

Let's run fake server.

async def run():
    await signal_manager.send(server_start, data=f'xxx')
    await asyncio.sleep(5)
    await signal_manager.send(server_stop, data=f'xxx')


if __name__ == '__main__':
    asyncio.run(run())

There is all code:

import asyncio
import logging
from aio_pydispatch import SignalManager

logging.basicConfig(level=logging.INFO)


def start(data: str) -> None:
    logging.info(f'started. {data}')


def stop(data: str) -> None:
    logging.info(f'stopped. {data}')


server_start = object()
server_stop = object()

signal_manager = SignalManager()

signal_manager.connect(start, server_start)
signal_manager.connect(start, server_stop)


async def run():
    await signal_manager.send(server_start, data=f'xxx')
    await asyncio.sleep(5)
    await signal_manager.send(server_stop, data=f'xxx')


if __name__ == '__main__':
    asyncio.run(run())

Similar design

sync

Others

Event system in Python

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

aio_pydispatch-0.0.1.dev0-py3-none-any.whl (3.7 kB view details)

Uploaded Python 3

File details

Details for the file aio_pydispatch-0.0.1.dev0-py3-none-any.whl.

File metadata

  • Download URL: aio_pydispatch-0.0.1.dev0-py3-none-any.whl
  • Upload date:
  • Size: 3.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.6.8

File hashes

Hashes for aio_pydispatch-0.0.1.dev0-py3-none-any.whl
Algorithm Hash digest
SHA256 2a14527c4dac00b3bed70279328642f159f4a6dab4f4c41dc660294acf1515b0
MD5 b31b0edd08e1d2cbd5bd2f6b9cb22f66
BLAKE2b-256 527e9c43a56b14d7790fd1298ec292876903723459e20a9534c90d4b76dcfbc5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page