A small example package
Project description
Pipeflow
Pipeflow is simple library that you can set up service based on different kinds of MQ.
Installation
$ python -m pip install pypipeflow
Pipeflow only supports Python 3.5+.
Usage example
Pipeflow offer different type of endpoint
# NSQ endpoint
from pipeflow.endpoints.nsq_endpoints import NsqInputEndpoint, NsqOutputEndpoint
in_endpoint = NsqInputEndpoint("topic1", "channel1", lookupd_http_addresses="127.0.0.1:4161")
output_endpoint = NsqOutputEndpoint(nsqd_tcp_addresses="127.0.0.1:4150")
# Rabbitmq endpoint
from pipeflow.endpoints.rabbitmq_endpoints import RabbitmqInputEndpoint, RabbitmqOutputEndpoint
RABBITMQ_CONF = {
"host": "127.0.0.1", "port": 5672, "virtualhost": "/", "login": "login_name",
"password": "xxxx"
}
in_endpoint = RabbitmqInputEndpoint("queue1", **RABBITMQ_CONF)
output_endpoint = RabbitmqOutputEndpoint("queue1", **RABBITMQ_CONF)
# Redis endpoint
from pipeflow.endpoints.redis_endpoints import RedisInputEndpoint, RedisOutputEndpoint
in_endpoint = RedisInputEndpoint("queue1", host="127.0.0.1", port='6379', password="xxx", db=0)
output_endpoint = RedisOutputEndpoint(host="127.0.0.1", port='6379', password="xxx", db=0)
Set up service
from pipeflow.server import Server, Task
async def worker_handle(group, task):
data = task.get_data()
data += b"--"
res_task = Task(data)
res_task.set_to('out_name')
return res_task
server = Server()
group = server.add_group('group1', concurrency=2)
group.set_handle(worker_handle)
group.add_input_endpoint('in_name', in_endpoint)
# NSQ
group.add_output_endpoint('out_name', output_endpoint, topic='topic1', delay=3000)
# Rabbitmq
group.add_output_endpoint('out_name', output_endpoint, queue='queue1')
# Redis
group.add_output_endpoint('out_name', output_endpoint, queue='queue1')
server.run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pypipeflow-1.0.0.tar.gz
(10.8 kB
view hashes)
Built Distribution
Close
Hashes for pypipeflow-1.0.0-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d02aa13a9456256abaaa1b67789437629387d49a2e145863acdb5ee38c092273 |
|
MD5 | d4493773e8e8e46eb6ef6ced13a507fd |
|
BLAKE2b-256 | 9bcd662fe61d1e6f63ffef79b98efba785ec0bc34603fea03e45362c92d05eb7 |