Skip to main content

a simple library that you can set up service based on different kinds of MQ

Project description

Pipeflow

Pipeflow is simple library that you can set up service based on different kinds of MQ.

Installation

$ python -m pip install pypipeflow

Pipeflow only supports Python 3.5+.

Usage example

Pipeflow offer different type of endpoint

# NSQ endpoint
from pipeflow.endpoints.nsq_endpoints import NsqInputEndpoint, NsqOutputEndpoint

in_endpoint = NsqInputEndpoint("topic1", "channel1", lookupd_http_addresses="127.0.0.1:4161")
output_endpoint = NsqOutputEndpoint(nsqd_tcp_addresses="127.0.0.1:4150")


# Rabbitmq endpoint
from pipeflow.endpoints.rabbitmq_endpoints import RabbitmqInputEndpoint, RabbitmqOutputEndpoint

RABBITMQ_CONF = {
    "host": "127.0.0.1", "port": 5672, "virtualhost": "/", "login": "login_name",
    "password": "xxxx"
}
in_endpoint = RabbitmqInputEndpoint("queue1", **RABBITMQ_CONF)
output_endpoint = RabbitmqOutputEndpoint("queue1", **RABBITMQ_CONF)


# Redis endpoint
from pipeflow.endpoints.redis_endpoints import RedisInputEndpoint, RedisOutputEndpoint

in_endpoint = RedisInputEndpoint("queue1", host="127.0.0.1", port='6379', password="xxx", db=0)
output_endpoint = RedisOutputEndpoint(host="127.0.0.1", port='6379', password="xxx", db=0)

Set up service

from pipeflow.server import Server, Task

async def worker_handle(group, task):
    data = task.get_data()
    data += b"--"
    res_task = Task(data)
    res_task.set_to('out_name')
    return res_task

server = Server()
group = server.add_group('group1', concurrency=2)
group.set_handle(worker_handle)
group.add_input_endpoint('in_name', in_endpoint)
# NSQ
group.add_output_endpoint('out_name', output_endpoint, topic='topic1', delay=3000)
# Rabbitmq
group.add_output_endpoint('out_name', output_endpoint, queue='queue1')
# Redis
group.add_output_endpoint('out_name', output_endpoint, queue='queue1')

server.run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypipeflow-1.0.1.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pypipeflow-1.0.1-py2.py3-none-any.whl (13.0 kB view details)

Uploaded Python 2Python 3

File details

Details for the file pypipeflow-1.0.1.tar.gz.

File metadata

  • Download URL: pypipeflow-1.0.1.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.7.0

File hashes

Hashes for pypipeflow-1.0.1.tar.gz
Algorithm Hash digest
SHA256 95339e7bd6d85a49adcfccdc5dc2ba7742b25ab881cfcf613b71869eda0ee591
MD5 c137098f0a89da89eab9ab6f4ae14ebe
BLAKE2b-256 64149d11243efc0cc59281ecb7d6d3d489e45f30f32beb62272e2e38318cd4bb

See more details on using hashes here.

File details

Details for the file pypipeflow-1.0.1-py2.py3-none-any.whl.

File metadata

  • Download URL: pypipeflow-1.0.1-py2.py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/49.6.0 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.7.0

File hashes

Hashes for pypipeflow-1.0.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 3720cf05e91d499af5351f0689aa74636fa7b05b75cb13b727069a2eb5e63eaf
MD5 081fab41c844c3376add24c5bf771a53
BLAKE2b-256 5ef5651f7ecb2b5dd5a7c2d767376220304a0b04f51def8738c43d3a98befaab

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page