Skip to main content

A package to allow flexible and quick programs on a message bus/kafka.

Project description


Metrobus - Elastic

Proof of concept and example for smart routing on a dumb bus.

This is a small project focused on my blog posts around routing on a message bus that's dumb. Like Kafka.

Stateless and dumb:

Fast Cache:

I use some of the cacheing ideas in here for a few of my data lookups.

The concept for the 'test' example application is fairly simple. The 'pusher' generates records, as if from a client or application. These records are simple JSON structures. For our example, we get cool things like an account number (FAKE!). The 'pusher' sends the message to the 'Source' topic on Kafka. Consider this your public entry point for upstream clients.

Next, the 'busdriver' takes over. The 'busdriver' then pulls data from the Source, and formats it to something that downstream can use. In our case, we add a 'header' object. This 'header' contains a 'route' (and an initial route, and a historical route for tracking purposes). Once this is created based on some intelligence (if the message is in a certain state, or of a certain type, etc), then the 'busdriver' configures it to be so. This is where the 'metrobus' framework comes into play. Once the callback to the 'busdriver' returns the 'wrapped' object, the framework pulls (pops) the "next" stop (Kafka Topic) from the routes stack, and sends the message downstream.

In this case, our first stop after the 'busdriver' is the microservice that uses external data to get the email address associated with the account.. This is our 'contactpoint' micro service. You know, the point at which we should contact the customer? Email address? Come on. Generic! The 'contactpoint' microservice pulls from an external cache, and loads the email address. If the cache doesn't contain the email address, the callback function returns None, indicating that the message is 'dropped' appropriately. I may change this to a specific 'Exception'. Soon?

Next is the 'whitelist' service. Long story short, same pattern. If the email address is in the whitelist cache, we can send to it. If not, the message gets dropped.

Next is the 'LogStop' service. If we were really building an email service, this is where we would send the message out through an SMTP server or something cool like mailgun or mailchimp. IN this case, we just drop it.. cause you know... example code.

Questions? hit me up: chris [ @ ] [at] (obfuscation)

OH, The entire reason I wrote this framework is to make each of my stops 'dumb', and focus on only one thing... processing the message. Doesn't care where it came from. Doesn't care where it's going. Just do your job. KISS.

import time
import json
from metrobus import metrobus
import sys
import random

# To consume latest messages and auto-commit offsets
WHITE_LIST = set()

def callback(message):
    print("Received in CB: ", message)
    real_message = message
    if real_message['email'] in WHITE_LIST:
        real_message['whitelist'] = True
        return real_message
    print("DROPPING due to missing white list.")

if __name__ == "__main__":
    print("Trying to start app.")
    print("Loading whitelist")
    counter = 0
    with open('./data/whitelist.dat', 'r') as input_file:
        for line in input_file:
            line = line.strip()
            if counter % 100000 == 0:
                print('added another 100k, up to ', counter)

    topic_in = "WhiteList"
    metrostop = metrobus.MetroStop(callback, in_topic=topic_in)

The unique bit of code for this 'stop' is the "topic_in" (configuration file, seriously folks), loading of the cache (whitelist.dat), and then processing each message. Seriously, it's like 12 lines of code.. This is what I mean, when I say "are you kidding? That's like 10 lines of code!"


Once you download the clone/download this repo, let's setup some stuff in the test path:

  • git clone
  • cd metrobus
  • cd test
  • pip install --upgrade pipenv
  • pipenv shell
  • mkdir data
  • python > data/whitelist.dat
  • python > data/emails.dat
  • ./

You'll then see a lot of docker compose stuff happening. The environment variables are in metrobus.env

They're set to:

  • Kafka: kafka:9092
  • Redis: redis:6379

Those host names are set in the docker-compose file.


Web Status

Web application, in order to see some super basic stats from redis.


Flask app in


Cool name, eh? Pusher?

This is the example 'source' of data

Raw JSON messages going into Kafka...

Bus Driver

This is the first major step of the process.

Bus driver is my smart router dude. In a real

application, bus driver could/would inspect

the state of the data coming in. THen, it would

dynamically build a route

Contact Point - Add Email (Stage one)

This application uses the fake account ID

generated in the pusher. Adds an email

address based on the cps.dat file you

previously generated. Drops message if can't find

Whitelist (stage two)

Example of a whitelist. I blogged about black list.

Same dealio? If record (email in this case) isn't

in the list, then we can drop the messsage (return)

Log Stop (stage three)

This is an example 'sink' stop on the metrobus

All this bad boy does is logs the message..

in the real world, this could shove into elasticsearch

or something less cool than ES.


This is a personal project and has nothing to do with my employer or any contracts/clients I have.

I will build this project out to be MIT licensed though, for anyone to use and abuse.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
metrobus-0.0.23-py3-none-any.whl (7.6 kB) Copy SHA256 hash SHA256 Wheel py3
metrobus-0.0.23.tar.gz (7.5 kB) Copy SHA256 hash SHA256 Source None

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page