Skip to main content

An eventbus for highly concurrent system

Project description

An eventbus for concurrent programming

geeteventbus is a library that allows publish-subscribe-style communication. There is no need for the components to register to each-other. It is inspired by a Java library, Guava eventbus from Google. But it is not exactly same as the Guava eventbus library.

  • geeteventbus simplifies handling events from publishers and subscribers.

  • publisher and subscribers don’t need to create threads to concurrently process the events.

  • the eventbus can be synchronus, where the events are delivered from the same thread posting the events

  • events can be delivered to subscibers in the same order they are posted

  • subscribers may be declared as thread-safe, in that case same subscriber may be invoked concurrently for processing multiple events

  • events for which there are no subscribers are registered yet are simply discared by the eventbus.

  • the eventbus is not to be used for inter process communication. Publishers and subsribers must run on the same process

Basic working

  1. We create an eventbus

    from geeteventbus.eventbus import eventbus
    eb = eventbus()

    This will create an eventbus with the defaults. The default eventbus will have below characteristics:

    1. the maximum queued event limit is set to 10000

    2. number of executor thread is 8

    3. the subscribers will be called asynchronously

    4. subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads

  2. Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:

    from geeteventbus.subscriber import subscriber
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event
    
    class mysubscriber(subscriber):
        def process(self, eventobj):
            if not isinstance(eventobj, event):
                print('Invalid object type is passed.')
                return
            topic = eventobj.get_topic()
            data = eventobj.get_data()
            print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    
    subscr = mysubscriber()
    eb.register_consumer(subscr, 'an_important_topic')
  3. Post some events to the eventbus with the topic “an_important_topic”.

    from geeteventbus.event import event
    
    eobj1 = ('an_important_topic', 'This is some data for the event 1')
    eobj2 = ('an_important_topic', 'This is some data for the event 2')
    eobj3 = ('an_important_topic', 'This is some data for the event 3')
    eobj3 = ('an_important_topic', 'This is some data for the event 4')
    
    eb.post(eobj1)
    eb.post(eobj2)
    eb.post(eobj3)
    eb.post(eobj4)
  4. We may gracefully shutdown the eventbus before exiting the process

    eb.shutdown()

The complete example is below:

from time import sleep
from geeteventbus.subscriber import subscriber
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event

class mysubscriber(subscriber):
    def process(self, eventobj):
        if not isinstance(eventobj, event):
            print('Invalid object type is passed.')
            return
        topic = eventobj.get_topic()
        data = eventobj.get_data()
        print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))


eb = eventbus()
subscr = mysubscriber()
eb.register_consumer(subscr, 'an_important_topic')


eobj1 = event('an_important_topic', 'This is some data for the event 1')
eobj2 = event('an_important_topic', 'This is some data for the event 2')
eobj3 = event('an_important_topic', 'This is some data for the event 3')
eobj4 = event('an_important_topic', 'This is some data for the event 4')

eb.post(eobj1)
eb.post(eobj2)
eb.post(eobj3)
eb.post(eobj4)

eb.shutdown()
sleep(2)

A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for a set of counters. It registers itself to an eventbus for receiving events for the counters(topics). A set of producers update the values for the counters and post events describing the counter to the eventbus:

from threading import Lock, Thread
from time import sleep, time
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event
from geeteventbus.subscriber import subscriber
from random import randint


class counter_aggregator(subscriber, Thread):
    '''
    Aggregator for a set of counters. Multiple threads updates the counts which
    are aggregated by this class and output the aggregated value periodically.
    '''
    def __init__(self, counter_names):
        Thread.__init__(self)
        self.counter_names = counter_names
        self.locks = {}
        self.counts = {}
        self.keep_running = True
        self.collect_times = {}
        for counter in counter_names:
            self.locks[counter] = Lock()
            self.counts[counter] = 0
            self.collect_times[counter] = time()

    def process(self, eobj):
        '''
        Process method calls with the event object eobj. eobj has the counter name as the topic
        and an int count as the value for the counter.
        '''
        counter_name = eobj.get_topic()
        if counter_name not in self.counter_names:
            return
        count = eobj.get_data()
        with self.locks[counter_name]:
            self.counts[counter_name] += count

    def stop(self):
        self.keep_running = False

    def __call__(self):
        '''
        Keep outputing the aggregated counts every 2 seconds
        '''
        while self.keep_running:
            sleep(2)
            for counter_name in self.counter_names:
                with self.locks[counter_name]:
                    print('Change for counter %s = %d, in last %f secs' % (counter_name,
                          self.counts[counter_name], time() - self.collect_times[counter_name]))
                    self.counts[counter_name] = 0
                    self.collect_times[counter_name] = time()
        print('Aggregator exited')


class count_producer:
    '''
    Producer for counters. Every 0.02 seconds post the "updated" value for a
    counter randomly
    '''
    def __init__(self, counters, ebus):
        self.counters = counters
        self.ebus = ebus
        self.keep_running = True
        self.num_counter = len(counters)

    def stop(self):
        self.keep_running = False

    def __call__(self):
        while self.keep_running:
            ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
            ebus.post(ev)
            sleep(0.02)
        print('producer exited')

if __name__ == '__main__':
    ebus = eventbus()
    counters = ['c1', 'c2', 'c3', 'c4']
    subcr = counter_aggregator(counters)
    producer = count_producer(counters, ebus)
    for counter in counters:
        ebus.register_consumer(subcr, counter)
    threads = []
    i = 30
    while i > 0:
        threads.append(Thread(target=producer))
        i -= 1

    aggregator_thread = Thread(target=subcr)
    aggregator_thread.start()
    for thrd in threads:
        thrd.start()
    sleep(20)
    producer.stop()
    subcr.stop()
    sleep(2)
    ebus.shutdown()

Project details


Release history Release notifications | RSS feed

This version

1.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

geeteventbus-1.0.tar.gz (6.8 kB view details)

Uploaded Source

File details

Details for the file geeteventbus-1.0.tar.gz.

File metadata

  • Download URL: geeteventbus-1.0.tar.gz
  • Upload date:
  • Size: 6.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for geeteventbus-1.0.tar.gz
Algorithm Hash digest
SHA256 f290b7cee13adc033fe3dcbf198678aa8413dd81fed4b8a7f767ca8fb156915c
MD5 5aabb993afe245038cad29eb0d875427
BLAKE2b-256 b6f9ed589395b7cad6f7113a4e9e3ccb99f28f318901d517ae54aa3da15c537e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page