Skip to main content

A package to allow flexible and quick programs on a message bus/kafka.

Project description

metrobus

Metrobus - Elastic

Proof of concept and example for smart routing on a dumb bus.

This is a small project focused on my blog posts around routing on a message bus that's dumb. Like Kafka.

Stateless and dumb: https://medium.com/capital-one-developers/stateless-and-dumb-microservices-on-a-message-bus-be78bca93ccb

Fast Cache: https://medium.com/capital-one-developers/blazing-fast-data-lookup-in-a-microservices-world-dd3ae548ca45

I use some of the cacheing ideas in here for a few of my data lookups.

The concept for the 'test' example application is fairly simple. The 'pusher' generates records, as if from a client or application. These records are simple JSON structures. For our example, we get cool things like an account number (FAKE!). The 'pusher' sends the message to the 'Source' topic on Kafka. Consider this your public entry point for upstream clients.

Next, the 'busdriver' takes over. The 'busdriver' then pulls data from the Source, and formats it to something that downstream can use. In our case, we add a 'header' object. This 'header' contains a 'route' (and an initial route, and a historical route for tracking purposes). Once this is created based on some intelligence (if the message is in a certain state, or of a certain type, etc), then the 'busdriver' configures it to be so. This is where the 'metrobus' framework comes into play. Once the callback to the 'busdriver' returns the 'wrapped' object, the framework pulls (pops) the "next" stop (Kafka Topic) from the routes stack, and sends the message downstream.

In this case, our first stop after the 'busdriver' is the microservice that uses external data to get the email address associated with the account.. This is our 'contactpoint' micro service. You know, the point at which we should contact the customer? Email address? Come on. Generic! The 'contactpoint' microservice pulls from an external cache, and loads the email address. If the cache doesn't contain the email address, the callback function returns None, indicating that the message is 'dropped' appropriately. I may change this to a specific 'Exception'. Soon?

Next is the 'whitelist' service. Long story short, same pattern. If the email address is in the whitelist cache, we can send to it. If not, the message gets dropped.

Next is the 'LogStop' service. If we were really building an email service, this is where we would send the message out through an SMTP server or something cool like mailgun or mailchimp. IN this case, we just drop it.. cause you know... example code.

Questions? hit me up: chris [ @ ] [at] (obfuscation) fauie.com

OH, The entire reason I wrote this framework is to make each of my stops 'dumb', and focus on only one thing... processing the message. Doesn't care where it came from. Doesn't care where it's going. Just do your job. KISS.

import time
import json
from metrobus import metrobus
import sys
import random


# To consume latest messages and auto-commit offsets
WHITE_LIST = set()

def callback(message):
    print("Received in CB: ", message)
    real_message = message
    if real_message['email'] in WHITE_LIST:
        real_message['whitelist'] = True
        return real_message
    print("DROPPING due to missing white list.")
    return

if __name__ == "__main__":
    print("Trying to start app.")
    print("Loading whitelist")
    counter = 0
    with open('./data/whitelist.dat', 'r') as input_file:
        for line in input_file:
            line = line.strip()
            WHITE_LIST.add(line)
            counter+=1
            if counter % 100000 == 0:
                print('added another 100k, up to ', counter)


    topic_in = "WhiteList"
    metrostop = metrobus.MetroStop(callback, in_topic=topic_in)
    metrostop.start()

The unique bit of code for this 'stop' is the "topic_in" (configuration file, seriously folks), loading of the cache (whitelist.dat), and then processing each message. Seriously, it's like 12 lines of code.. This is what I mean, when I say "are you kidding? That's like 10 lines of code!"

Setup

Once you download the clone/download this repo, let's setup some stuff in the test path:

  • git clone git@github.com:chrisfauerbach/metrobus.git
  • cd metrobus
  • cd test
  • pip install --upgrade pipenv
  • pipenv shell
  • mkdir data
  • python make_whitelist.py > data/whitelist.dat
  • python make_account_email.py > data/emails.dat
  • ./run.sh

You'll then see a lot of docker compose stuff happening. The environment variables are in metrobus.env

They're set to:

  • Kafka: kafka:9092
  • Redis: redis:6379

Those host names are set in the docker-compose file.

docker-compose.yml

Web Status

Web application, in order to see some super basic stats from redis.

http://localhost:5000/

Flask app in web.py

Pusher

Cool name, eh? Pusher?

This is the example 'source' of data

Raw JSON messages going into Kafka...

pusher.py

Bus Driver

This is the first major step of the process.

Bus driver is my smart router dude. In a real

application, bus driver could/would inspect

the state of the data coming in. THen, it would

dynamically build a route

https://medium.com/capital-one-developers/stateless-and-dumb-microservices-on-a-message-bus-be78bca93ccb

Contact Point - Add Email (Stage one)

This application uses the fake account ID

generated in the pusher. Adds an email

address based on the cps.dat file you

previously generated. Drops message if can't find

Whitelist (stage two)

Example of a whitelist. I blogged about black list.

Same dealio? If record (email in this case) isn't

in the list, then we can drop the messsage (return)

Log Stop (stage three)

This is an example 'sink' stop on the metrobus

All this bad boy does is logs the message..

in the real world, this could shove into elasticsearch

or something less cool than ES.

NOTE

This is a personal project and has nothing to do with my employer or any contracts/clients I have.

I will build this project out to be MIT licensed though, for anyone to use and abuse.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metrobus-0.0.12.tar.gz (7.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metrobus-0.0.12-py3-none-any.whl (7.6 kB view details)

Uploaded Python 3

File details

Details for the file metrobus-0.0.12.tar.gz.

File metadata

  • Download URL: metrobus-0.0.12.tar.gz
  • Upload date:
  • Size: 7.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.1.0 requests-toolbelt/0.8.0 tqdm/4.23.4 CPython/3.6.5

File hashes

Hashes for metrobus-0.0.12.tar.gz
Algorithm Hash digest
SHA256 b82eb92a4caee7d8981083cb951341a08074efb48d12eb2ca3d87ef82bf598e0
MD5 c138bb1b921ef161723a8f953eef0fe3
BLAKE2b-256 126557826bf1ebcd2a6d597ccfeb16c3237d40d0fcec9c8ce94cdf3545968128

See more details on using hashes here.

File details

Details for the file metrobus-0.0.12-py3-none-any.whl.

File metadata

  • Download URL: metrobus-0.0.12-py3-none-any.whl
  • Upload date:
  • Size: 7.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.1.0 requests-toolbelt/0.8.0 tqdm/4.23.4 CPython/3.6.5

File hashes

Hashes for metrobus-0.0.12-py3-none-any.whl
Algorithm Hash digest
SHA256 3b50f5f33aa8ef57b63c021557ebfb9a32231043ece2de773f3050354b1d3476
MD5 067f54a4c15f3847a3873e44e0dc16b7
BLAKE2b-256 99d3b2a9cb010b7768e86cb4079b90b49c12aa71e8687f39f7abcc22e470453f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page