Manage producer and consumer workers
Project description
warehut
Installation
pip install warehut
Example
import time
import datetime
import random
from warehut import Warehut
from warehut import Producer
from warehut import Consumer, listen
# There also exists a `Hybrid` worker,
# that can both listen to events and
# when responding, can forward to other consumers.
# Use this with caution and consideration.
from warehut.worker import Worker
class MyWarehut(Warehut):
def handle_error(self, worker_type: type[Worker], exception: Exception):
# Handle exception raised within a worker process
# You could launch a window, write the error to a log file, etc.
# Stopping all other workers
self.stop()
# And printing the error
print(
f'\nWorker of type {worker_type.__name__} '
f'encountered an error.\n{exception!r}\n')
class MyProducer(Producer):
async def __aenter__(self):
print(f'Preparing environment for {self}')
async def __aexit__(self, exc_type, exc_value, trace):
print(f'Gracefully closing environment for {self}')
async def update(self):
# Put whatever you want your producer to do, here.
# It can read from any source and `forward` data to consumer queues.
# This method is run on repeat in-between status checks of the worker.
# Forward the current timestamp to 'ping'
self.forward('ping', time.time())
print('Pinged!')
# Sleep a random amount of time to create obvious offset
time.sleep(random.random() * 2)
# Forward a randomly generated number to 'random'
self.forward('random', random.randint(0, 100))
print('Randomed!')
# Sleep again ... same reason.
time.sleep(random.random() * 2)
class MyConsumer(Consumer):
async def __aenter__(self):
print(f'Preparing environment for {self}')
async def __aexit__(self, exc_type, exc_value, trace):
print(f'Gracefully closing environment for {self}')
# `listen` defines a function to be called with data
# that is labeled with the specified event name.
@listen('ping')
async def on_ping(self, timestamp):
"""Print the time at which a ping was sent"""
timestamp = datetime.datetime.fromtimestamp(int(timestamp))
print(f'Ping at {timestamp.strftime("%Y-%m-%d %H:%M:%S")}')
@listen('random')
async def on_random(self, number):
"""Print generated random numbers"""
if number > 80:
raise RuntimeError('An error to show off the `Warehut` error handler')
print(f'Random number generated: {number}')
if __name__ == '__main__':
with MyWarehut([MyProducer, MyConsumer]):
# `Warehut.start` is called upon entering the context
input('\nPress Enter to exit\n\n')
# `Warehut.stop` is called upon exiting the context
# They can be called on their own with the same effect
#
# Ex.
# warehut = MyWarehut([MyProducer, MyConsumer])
# warehut.start()
# input('\nPress Enter to exit\n\n')
# warehut.stop()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
warehut-3.3.0.tar.gz
(17.7 kB
view details)
Built Distribution
warehut-3.3.0-py3-none-any.whl
(18.7 kB
view details)
File details
Details for the file warehut-3.3.0.tar.gz
.
File metadata
- Download URL: warehut-3.3.0.tar.gz
- Upload date:
- Size: 17.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.11 CPython/3.9.7 Linux/5.10.79-1-MANJARO
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c6891cb25fc6549059a93c24f552ac68ae2cad0d9b20cd49c56862cea2eefc74 |
|
MD5 | dd7f0c9f43e240423aef0721c7bd7b68 |
|
BLAKE2b-256 | df8b79a07a2dd87fb673a0cd19a54be3eb4037348b685fec530ef41e9a0fe2e6 |
File details
Details for the file warehut-3.3.0-py3-none-any.whl
.
File metadata
- Download URL: warehut-3.3.0-py3-none-any.whl
- Upload date:
- Size: 18.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.11 CPython/3.9.7 Linux/5.10.79-1-MANJARO
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bac6b4de40a686ca91f72e0c5b8025b901b136f11b89bc31ff2477936e2f9108 |
|
MD5 | 121a68587c34514e3fd0715a826e8767 |
|
BLAKE2b-256 | 21560edced14c9c0e950b239c52aa5f1b7fd2c5d3703696e73dba188ea70a377 |