Twisted based pipeline framework for AMQP
Project description
Twisted based pipeline framework for AMQP. It allow you to create fast asynchronous services which follow ideology:
get message from queue
doing something with message
publish some result
Installation
Install via pip:
pip install amqpipe
Basic usage
The minimal module based on AMQPipe is:
from amqpipe import AMQPipe
pipe = AMQPipe()
pipe.run()
It will simply get all messages from one RabbitMQ queue and publish them to other RabbitMQ exchange.
Now we define some action on messages:
import hashlib
from amqpipe import AMQPipe
def action(message):
return hashlib.md5(message).hexdigest()
pipe = AMQPipe(action=action)
pipe.run()
It will publish md5 checksum for every message as result.
If messages in input queue are in predefined format then you can define converter-function:
import hashlib
from amqpipe import AMQPipe
def converter(message):
return message['text']
def action(text):
return hashlib.md5(text).hexdigest()
pipe = AMQPipe(converter=converter, action=action)
pipe.run()
You can define service-specific arguments:
import hashlib
from amqpipe import AMQPipe
class Processor:
def set_field(self, field):
self.field = field
processor = Processor()
def init(args):
processor.set_field(args.field)
def converter(message):
return message.get(processor.field)
def action(text):
return hashlib.md5(text).hexdigest()
pipe = AMQPipe(converter, action, init)
pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
pipe.run()
You can connect to database in init function or do some other things for initialization.
If your action returns Deferred then result would be published to RabbitMQ when this Deferred will be resolved:
import logging
from twisted.internet import defer
from amqpipe import AMQPipe
logger = logging.getLogger(__name__)
class Processor:
def set_field(self, field):
self.field = field
processor = Processor()
def init(args):
connect_to_db()
...
def converter(message):
return message.get(processor.field)
@defer.inlineCallbacks
def action(text):
result = yield db_query(text)
logger.info('Get from db: %s', result)
defer.returnValue(result)
pipe = AMQPipe(converter, action, init)
pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
pipe.run()
Init function may return Deferred too.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file amqpipe-0.2.6.tar.gz
.
File metadata
- Download URL: amqpipe-0.2.6.tar.gz
- Upload date:
- Size: 5.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2f093966f65b8ad72d05936b3dd109c9ec2b4d519ece945380ceb2dcbd19f682 |
|
MD5 | 1c0da6dd21b28327947ac5831b946b2e |
|
BLAKE2b-256 | 56007e8e959225940fa482fd9651324ebe71660782897e48d86a3d5222a36e24 |