Skip to main content
This is a pre-production deployment of Warehouse. Changes made here affect the production instance of PyPI (pypi.python.org).
Help us improve Python packaging - Donate today!

Twisted based pipeline framework for AMQP

Project Description

Twisted based pipeline framework for AMQP. It allow you to create fast asynchronous services which follow ideology:

  • get message from queue
  • doing something with message
  • publish some result

Installation

Install via pip:

pip install amqpipe

Basic usage

The minimal module based on AMQPipe is:

from amqpipe import AMQPipe

pipe = AMQPipe()
pipe.run()

It will simply get all messages from one RabbitMQ queue and publish them to other RabbitMQ exchange.

Now we define some action on messages:

import hashlib
from amqpipe import AMQPipe

def action(message):
    return hashlib.md5(message).hexdigest()

pipe = AMQPipe(action=action)
pipe.run()

It will publish md5 checksum for every message as result.

If messages in input queue are in predefined format then you can define converter-function:

import hashlib
from amqpipe import AMQPipe

def converter(message):
    return message['text']

def action(text):
    return hashlib.md5(text).hexdigest()

pipe = AMQPipe(converter=converter, action=action)
pipe.run()

You can define service-specific arguments:

import hashlib
from amqpipe import AMQPipe

class Processor:
    def set_field(self, field):
        self.field = field

processor = Processor()

def init(args):
    processor.set_field(args.field)

def converter(message):
    return message.get(processor.field)

def action(text):
    return hashlib.md5(text).hexdigest()

pipe = AMQPipe(converter, action, init)
pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
pipe.run()

You can connect to database in init function or do some other things for initialization.

If your action returns Deferred then result would be published to RabbitMQ when this Deferred will be resolved:

import logging
from twisted.internet import defer
from amqpipe import AMQPipe

logger = logging.getLogger(__name__)

class Processor:
    def set_field(self, field):
        self.field = field

processor = Processor()

def init(args):
    connect_to_db()
    ...

def converter(message):
    return message.get(processor.field)

@defer.inlineCallbacks
def action(text):
    result = yield db_query(text)
    logger.info('Get from db: %s', result)
    defer.returnValue(result)

pipe = AMQPipe(converter, action, init)
pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
pipe.run()

Init function may return Deferred too.

Release History

Release History

This version
History Node

1.2

History Node

1.1

History Node

1.0

History Node

0.2.7

History Node

0.2.6

History Node

0.2.5

History Node

0.2.4

History Node

0.2.3

History Node

0.2.2

History Node

0.2.1

History Node

0.2.0

History Node

0.1.16

History Node

0.1.15

History Node

0.1.14

History Node

0.1.13

History Node

0.1.12

History Node

0.1.11

History Node

0.1.10

History Node

0.1.8

History Node

0.1.7

History Node

0.1.6

History Node

0.1.5

History Node

0.1.3

History Node

0.1.2

History Node

0.1.1

History Node

0.1

Download Files

Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

File Name & Checksum SHA256 Checksum Help Version File Type Upload Date
amqpipe-1.2.tar.gz (6.4 kB) Copy SHA256 Checksum SHA256 Source Nov 7, 2017

Supported By

WebFaction WebFaction Technical Writing Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Rackspace Rackspace Cloud Servers DreamHost DreamHost Log Hosting