Easily send data and events between processes, or within a single process
Project description
What is this?
A simple event manager that works between processes
How do I use this?
pip install elegant_events
file1.py
from elegant_events import Server
# will spinup a server in the background if one isn't already running
server1 = Server("localhost", 7070)
#
# listener examples:
#
# attach a listener that triggers every time
@server1.whenever("found_an_event")
def when_event_found(timestamp, data):
print(f'''data = {data}''')
# attach a listener that will only trigger on the next occurance
@server1.once("found_an_event")
def when_event_found(timestamp, data):
print(f'''data = {data}''')
#
# send data example
#
server1.tell(path="found_an_event", data="Hello from same process")
#
# receive events examples:
#
# manually check (e.g. the cooperative multitasking approach)
sleep(10) # (run the file2.py while this file is sleeping)
server1.check()
# because `.tell()` was called twice, when_event_found() will be triggered twice right here
# auto check (very efficiently wait for events from other processes)
# e.g. DONT do:
# while True: server1.check()
server1.listen_forever() #
file2.py
from elegant_events import Server
# will spinup a server in the background if one isn't already running
server1 = Server("localhost", 7070)
# "data" can be anything that works with json.dumps()
server1.tell(path="found_an_event", data={"Hello from different process!": True})
API
from elegant_events import Server
# will spinup a server in the background if one isn't already running
server1 = Server("localhost", 7070)
# temp stop listening for an event
server1.stop_keeping_track_of("found_an_event")
# resume listening for an event
server1.keep_track_of("found_an_event")
# attach a listener that triggers every time
@server1.whenever("found_an_event", catch_and_print_errors=False)
def when_event_found(timestamp, data):
pass
# attach a listener that will only trigger on the next occurance
@server1.once("found_an_event", catch_and_print_errors=False)
def when_event_found(timestamp, data):
print(f'''data = {data}''')
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
elegant_events-0.0.1.tar.gz
(4.2 kB
view hashes)
Built Distribution
Close
Hashes for elegant_events-0.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e7bd4103723efa77dbe89a7dc0c7aa102d90eaebed12b6a794e5f0224033aa9a |
|
MD5 | 033ef87ee13b840939903b9f1bbf132c |
|
BLAKE2b-256 | aefee8810c5a8fb20f38b6b7ff694ce936c92a7d77cdc27dae756b91254c3b34 |