asyncio pydipatch (Signal Manager)
Project description
aio_pydispatch
Asyncio pydispatch (Signal Manager)
This is based on pyDispatcher reference scrapy SignalManager implementation on Asyncio
Usage
Like the situation often encountered on the web
Init some signals and a signal manager
import asyncio
import logging
from aio_pydispatch import SignalManager
logging.basicConfig(level=logging.INFO)
server_start = object()
server_stop = object()
signal_manager = SignalManager()
Do something when signal is trigger. Another expression: the signal subscribe the event
def start(data: str) -> None:
# Do something when server is start.
logging.info(f'started. {data}')
def stop(data: str) -> None:
# Do something when server is stop.
logging.info(f'stoped. {data}')
signal_manager.connect(start, server_start)
signal_manager.connect(start, server_stop)
Let's run fake server.
async def run():
await signal_manager.send(server_start, data=f'xxx')
await asyncio.sleep(5)
await signal_manager.send(server_stop, data=f'xxx')
if __name__ == '__main__':
asyncio.run(run())
There is all code:
import asyncio
import logging
from aio_pydispatch import SignalManager
logging.basicConfig(level=logging.INFO)
def start(data: str) -> None:
logging.info(f'started. {data}')
def stop(data: str) -> None:
logging.info(f'stopped. {data}')
server_start = object()
server_stop = object()
signal_manager = SignalManager()
signal_manager.connect(start, server_start)
signal_manager.connect(start, server_stop)
async def run():
await signal_manager.send(server_start, data=f'xxx')
await asyncio.sleep(5)
await signal_manager.send(server_stop, data=f'xxx')
if __name__ == '__main__':
asyncio.run(run())
Similar design
sync
Others
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
File details
Details for the file aio_pydispatch-0.0.1.dev0-py3-none-any.whl
.
File metadata
- Download URL: aio_pydispatch-0.0.1.dev0-py3-none-any.whl
- Upload date:
- Size: 3.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.6.8
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2a14527c4dac00b3bed70279328642f159f4a6dab4f4c41dc660294acf1515b0 |
|
MD5 | b31b0edd08e1d2cbd5bd2f6b9cb22f66 |
|
BLAKE2b-256 | 527e9c43a56b14d7790fd1298ec292876903723459e20a9534c90d4b76dcfbc5 |