Skip to main content

Wrapper for ZMQ comunication.

Project description

PyPI version

ZMQ Tubes

ZMQ Tubes is a managing system for ZMQ communication. It can manage many ZMQ sockets by one interface. The whole system is hierarchical, based on topics (look at MQTT topics).

Classes

  • TubeMessage - This class represents a request/response message. Some types of tubes require a response in this format.
  • Tube - This class wraps a ZMQ socket. It represents a connection between client and server.
  • TubeNode - This represents an application interface for communication via tubes.

Asyncion / Threading

The library support bot method.

from zmq_tubes import TubeNode, Tube            # Asyncio classes
from zmq_tubes.threads import TubeNode, Tube    # Threads classes

Usage:

Node definitions in yml file

We can define all tubes for one TubeNode by yml file.

# test.yml
tubes:
  - name: Client REQ
    addr:  ipc:///tmp/req.pipe      
    tube_type: REQ
    topics:
      - foo/#
      - +/bar
  
  - name: Client PUB
    addr:  ipc:///tmp/pub.pipe      
    tube_type: PUB
    topics:
      - foo/pub/#

  - name: Server ROUTER
    addr:  ipc:///tmp/router.pipe      
    tube_type: ROUTER
    server: yes
    sockopts:
      LINGER: 0
    topics:
      - server/#
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return request.create_response('response')


with open('test.yml', 'r+') as fd:    
    schema = yaml.safe_load(fd)

node = TubeNode(schema=schema)
node.register_handler('server/#', handler)
asyncio.current_task(node.start(), name="Server")

node.publish('foo/pub/test', 'message 1')
print(await node.request('foo/xxx', 'message 2'))

Request / Response

This is a simple scenario, the server processes the requests serially.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'answer'
  # or return request.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()

# output: 'question'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_resp.pipe',
  tube_type='REQ'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'

Subscribe / Publisher

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/sub_pub.pipe',
  server=True,
  tube_type='SUB'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/sub_pub.pipe',
  tube_type='PUB'
)
# In the case of publishing, the first message is very often
# lost. The workaround is to connect the tube manually as soon as possible.
tube.connect()

node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')        

Request / Router

The server is asynchronous. It means it is able to process more requests at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

import asyncio
from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_router.pipe',
  tube_type='REQ'
)


async def task(node, text):
  print(await node.request('test/xxx', text))


node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'

Dealer / Response

The client is asynchronous. It means it is able to send more requests at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'response'
  # or return requset.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_resp.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message')

# output: 'response'

Dealer / Router

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_router.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'wait')
node.send('test/xxx', 'message')

# output: 'message'
# output: 'wait'

Dealer / Dealer

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_dealer.pipe',
  server=True,
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from server')
# output: 'message from client'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_dealer.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from client')
# output: 'message from server'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zmq_tubes-1.4.0.tar.gz (11.0 kB view hashes)

Uploaded Source

Built Distribution

zmq_tubes-1.4.0-py3-none-any.whl (10.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page