Skip to main content

A complete, gevent-based, non-Tornado NSQ client.

Project description

This project is in active development, and the documentation is evolving as individual pieces.

This project encapsulates connection management, heartbeat management, and dispatching incoming messages (for consumers) to handlers.

Features

  • Fully featured:

    • Snappy compression

    • DEFLATE compression

    • TLS compression

    • Client (“mutual”) authentication via TLS

  • We rely on the consumer defining a “classification” function to determine the name of a handler for an incoming message. This allows for event-driven consumption. This means a little less boiler-plate for the end-user.

  • The complexities of RDY management is automatically managed by the library. These parameters can be reconfigured, but nsqs emphasized simplicity and intuitiveness so that you don’t have to be involved in mechanics if you don’t want to.

  • IDENTIFY parameters can be specified directly, but many are managed automatically based on parameters to the producer/consumer.

  • Messages are marked as “finished” with the server after being processed unless we’re configured not to.

  • For consumers, you can prescribe a list of topic and channel couplets, and connections will be made to every server and subscribed according to each. If lookup servers are used, servers are discovered and connected for each topic in the list (if no lookup servers, then we assume that all servers given support all topics).

Implementing a Consumer

Imports and boilerplate:

import logging
import json
import gevent

import nsq.consumer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

Create a message-handler:

class _MessageHandler(nsq.message_handler.MessageHandler):
    def __init__(self, *args, **kwargs):
        super(_MessageHandler, self).__init__(*args, **kwargs)
        self.__processed = 0

    def message_received(self, connection, message):
        super(_MessageHandler, self).message_received(connection, message)

        try:
            self.__decoded = json.loads(message.body)
        except:
            _logger.info("Couldn't decode message. Finished: [%s]",
                         message.body)
            return

    def classify_message(self, message):
        return (self.__decoded['type'], self.__decoded)

    def handle_dummy(self, connection, message, context):
        self.__processed += 1

        if self.__processed % 1000 == 0:
            _logger.info("Processed (%d) messages.", self.__processed)

    def default_message_handler(self, message_class, connection, message,
                                classify_context):
        _logger.warning("Squashing unhandled message: [%s] [%s]",
                        message_class, message)

Define the node-collection. We use nsqlookupd servers here, but we could just as easily use ServerNodes() with nsqd servers:

lookup_node_prefixes = [
    'http://127.0.0.1:4161',
]

nc = nsq.node_collection.LookupNodes(lookup_node_prefixes)

Create the consumer object:

_TOPIC = 'test_topic'
_CHANNEL = 'test_channel'
_MAX_IN_FLIGHT = 500

c = nsq.consumer.Consumer(
        [(_TOPIC, _CHANNEL)],
        nc,
        _MAX_IN_FLIGHT,
        message_handler_cls=_MessageHandler)

Start the consumer:

c.start()

Loop. As an example, we loop as long as we’re connected to at least one server:

while c.is_alive:
    gevent.sleep(1)

Implementing a Producer

Imports and boilerplate:

import logging
import json
import random

import nsq.producer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

Define the node-collection. This is a producer, so it only works with nsqd nodes:

server_nodes = [
    ('127.0.0.1', 4150),
]

nc = nsq.node_collection.ServerNodes(server_nodes)

Create the producer object:

_TOPIC = 'test_topic'

p = nsq.producer.Producer(_TOPIC, nc)

Start the producer:

p.start()

Emit the messages:

for i in range(0, 100000, 10):
    if i % 50 == 0:
        _logger.info("(%d) messages published.", i)

    data = { 'type': 'dummy', 'data': random.random(), 'index': i }
    message = json.dumps(data)
    p.mpublish((message,) * 10)

Stop the producer:

p.stop()

Callbacks

Both the consumer and producer can take a callbacks object.

To instantiate the callbacks for a producer:

import nsq.connection_callbacks
cc = nsq.connection_callbacks.ConnectionCallbacks()

To instantiate the callbacks for a consumer:

import nsq.consumer
cc = nsq.consumer.ConsumerCallbacks()

Then, pass the object into the producer or consumer object constructors as ccallbacks.

The following callback methods can be implemented for both a producer or consumer (while making sure to call the original implementation):

  • connect(connection)

    The connection has been established.

  • identify(connection)

    The identify response has been processed for this connection.

  • broken(connection)

    The connection has been broken.

  • message_received(connection, message)

    A message has been received.

The consumer has one additional callback:

  • rdy_replenish(connection, current_rdy, original_rdy)

    The RDY needs to be updated. By default, the original RDY will be reemitted. If this is not desired, override this callback, and don’t call the original.

Footnotes

  • Because we rely on gevent, and gevent isn’t Python3 compatible, nsqs isn’t Python3 compatible.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nsqs-0.2.0.tar.gz (22.6 kB view details)

Uploaded Source

Built Distribution

nsqs-0.2.0-py2-none-any.whl (33.6 kB view details)

Uploaded Python 2

File details

Details for the file nsqs-0.2.0.tar.gz.

File metadata

  • Download URL: nsqs-0.2.0.tar.gz
  • Upload date:
  • Size: 22.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for nsqs-0.2.0.tar.gz
Algorithm Hash digest
SHA256 076c1981925906c95459bff0c94985bfe341575fff4f57ceeddfe82510bc323d
MD5 826bd0bd67f6858c750de7bfc182a90d
BLAKE2b-256 15b6ea590386fee7293991c800f4ffe3330f0c99b5452b6c823412dfb43ca7c1

See more details on using hashes here.

File details

Details for the file nsqs-0.2.0-py2-none-any.whl.

File metadata

File hashes

Hashes for nsqs-0.2.0-py2-none-any.whl
Algorithm Hash digest
SHA256 3b13d6db7785884a3d194375d2f09f7cc6e9a67b37feae6ba9aa547857734da2
MD5 c0cced58a43a3bfdbcadb318c80af4ac
BLAKE2b-256 23ffaf99e44926d4e59db2526afaa37391b7c0b75ecdb72b1c4cb3381f0b4fba

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page