a simple library that you can set up service based on different kinds of MQ
Project description
Pipeflow
Pipeflow is simple library that you can set up service based on different kinds of MQ.
Installation
$ python -m pip install pypipeflow
Pipeflow only supports Python 3.5+.
Usage example
Pipeflow offer different type of endpoint
# NSQ endpoint
from pipeflow.endpoints.nsq_endpoints import NsqInputEndpoint, NsqOutputEndpoint
in_endpoint = NsqInputEndpoint("topic1", "channel1", lookupd_http_addresses="127.0.0.1:4161")
output_endpoint = NsqOutputEndpoint(nsqd_tcp_addresses="127.0.0.1:4150")
# Rabbitmq endpoint
from pipeflow.endpoints.rabbitmq_endpoints import RabbitmqInputEndpoint, RabbitmqOutputEndpoint
RABBITMQ_CONF = {
"host": "127.0.0.1", "port": 5672, "virtualhost": "/", "login": "login_name",
"password": "xxxx"
}
in_endpoint = RabbitmqInputEndpoint("queue1", **RABBITMQ_CONF)
output_endpoint = RabbitmqOutputEndpoint("queue1", **RABBITMQ_CONF)
# Redis endpoint
from pipeflow.endpoints.redis_endpoints import RedisInputEndpoint, RedisOutputEndpoint
in_endpoint = RedisInputEndpoint("queue1", host="127.0.0.1", port='6379', password="xxx", db=0)
output_endpoint = RedisOutputEndpoint(host="127.0.0.1", port='6379', password="xxx", db=0)
Set up service
from pipeflow.server import Server, Task
async def worker_handle(group, task):
data = task.get_data()
data += b"--"
res_task = Task(data)
res_task.set_to('out_name')
return res_task
server = Server()
group = server.add_group('group1', concurrency=2)
group.set_handle(worker_handle)
group.add_input_endpoint('in_name', in_endpoint)
# NSQ
group.add_output_endpoint('out_name', output_endpoint, topic='topic1', delay=3000)
# Rabbitmq
group.add_output_endpoint('out_name', output_endpoint, queue='queue1')
# Redis
group.add_output_endpoint('out_name', output_endpoint, queue='queue1')
server.run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pypipeflow-1.0.1.tar.gz
(10.8 kB
view hashes)
Built Distribution
Close
Hashes for pypipeflow-1.0.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3720cf05e91d499af5351f0689aa74636fa7b05b75cb13b727069a2eb5e63eaf |
|
MD5 | 081fab41c844c3376add24c5bf771a53 |
|
BLAKE2b-256 | 5ef5651f7ecb2b5dd5a7c2d767376220304a0b04f51def8738c43d3a98befaab |