Skip to main content

A microservices nanoframework.

Project description

PyPI version Python Versions https://travis-ci.org/barrachri/rampante.svg?branch=master https://codecov.io/gh/barrachri/rampante/branch/master/graph/badge.svg

🐎 Rampante

A fancy and opinionated nanoframework for microservices.

Installation

pip install rampante

How to use subscribe_on

from rampante import subscribe_on

# The function should accept 3 params
# queue_name, for example could be "user.subscribed"
# data is a dictionary, it's a msgpacked message sent to Kafka
# producer is an instance of AIOKafkaProducer, if you want to send new events

@subscribe_on("user.subscribed")
async def send_a_message(queue_name, data, producer):
    log.info("Event received!")

@subscribe_on("user.subscribed", "user.created")
async def send_another_message(queue_name, data, producer):
    log.info("Event received!")

Example

import asyncio
import logging

import msgpack
from aiokafka import AIOKafkaProducer
from rampante import scheduler, subscribe_on

log = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
    '[%(asctime)s %(name)s %(levelname)s] %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)

KAFKA_URI = 'localhost:9092'

@subscribe_on("user.subscribed")
async def send_a_message(queue_name, data, producer):
    log.info("Event received!")

async def stop_task_manager(app):
    """Cancel task manager."""
    if 'task_manager' in app:
        app['task_manager'].cancel()
        await app['task_manager']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(scheduler(kafka_uri=KAFKA_URI, loop=loop, queue_size=10))
    except KeyboardInterrupt:
        log.warning("Shutting down!")

Example with aiohttp

import asyncio
import logging

import msgpack
from aiohttp import web
from aiokafka import AIOKafkaProducer
from rampante import scheduler, subscribe_on

log = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
    '[%(asctime)s %(name)s %(levelname)s] %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)

KAFKA_URI = 'localhost:9092'


@subscribe_on("user.subscribed")
async def send_a_message(queue_name, data, producer):
    log.info("Event received!")


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    body = msgpack.packb({"message": "Hello", "priority": 3})
    await request.app['events_queue'].send_and_wait("user.subscribed", body)
    return web.Response(text=text)


async def start_event_connection(app):
    """Connect to Kafka."""
    connection = AIOKafkaProducer(loop=app.loop, bootstrap_servers=KAFKA_URI)
    await connection.start()
    app['events_queue'] = connection


async def stop_event_connection(app):
    """Close connection with Kafka."""
    if 'events_queue' in app:
        await app['events_queue'].stop()


async def start_task_manager(app):
    """Load task manager."""
    app['task_manager'] = asyncio.ensure_future(
        scheduler(kafka_uri=KAFKA_URI, loop=app.loop, queue_size=10))


async def stop_task_manager(app):
    """Cancel task manager."""
    if 'task_manager' in app:
        app['task_manager'].cancel()
        await app['task_manager']

if __name__ == '__main__':
    app = web.Application()
    app.router.add_get('/{name}', handle)
    # On-startup tasks
    app.on_startup.append(start_event_connection)
    app.on_startup.append(start_task_manager)
    # Clean-up tasks
    app.on_cleanup.append(stop_task_manager)
    app.on_cleanup.append(stop_event_connection)
    web.run_app(app)

The name

Rampante means “rampant” in Italian.

Why Kafka?

I like aiokafka, but I plan to switch to Redis as soon as Stream will be officially available.

To Do

  • add circuit breaker
  • add retry
  • add logic when tasks fail
  • add consumer position

Pull requests are encouraged!

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for rampante, version 0.0.2
Filename, size File type Python version Upload date Hashes
Filename, size rampante-0.0.2-py2.py3-none-any.whl (5.1 kB) File type Wheel Python version py2.py3 Upload date Hashes View
Filename, size rampante-0.0.2.tar.gz (6.0 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page