This is a pre-production deployment of Warehouse, however changes made here WILL affect the production instance of PyPI.
Latest Version Dependencies status unknown Test status unknown Test coverage unknown
Project Description

This project is in active development, and the documentation is evolving as individual pieces.

This project encapsulates connection management, heartbeat management, and dispatching incoming messages (for consumers) to handlers.

Features

  • Fully featured:
    • Snappy compression
    • DEFLATE compression
    • TLS compression
    • Client (“mutual”) authentication via TLS
  • We rely on the consumer defining a “classification” function to determine the name of a handler for an incoming message. This allows for event-driven consumption. This means a little less boiler-plate for the end-user.
  • The complexities of RDY management is automatically managed by the library. These parameters can be reconfigured, but nsqs emphasized simplicity and intuitiveness so that you don’t have to be involved in mechanics if you don’t want to.
  • IDENTIFY parameters can be specified directly, but many are managed automatically based on parameters to the producer/consumer.
  • Messages are marked as “finished” with the server after being processed unless we’re configured not to.
  • For consumers, you can prescribe a list of topic and channel couplets, and connections will be made to every server and subscribed according to each. If lookup servers are used, servers are discovered and connected for each topic in the list (if no lookup servers, then we assume that all servers given support all topics).

Implementing a Consumer

Imports and boilerplate:

import logging
import json
import gevent

import nsq.consumer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

Create a message-handler:

class _MessageHandler(nsq.message_handler.MessageHandler):
    def __init__(self, *args, **kwargs):
        super(_MessageHandler, self).__init__(*args, **kwargs)
        self.__processed = 0

    def message_received(self, connection, message):
        super(_MessageHandler, self).message_received(connection, message)

        try:
            self.__decoded = json.loads(message.body)
        except:
            _logger.info("Couldn't decode message. Finished: [%s]",
                         message.body)
            return

    def classify_message(self, message):
        return (self.__decoded['type'], self.__decoded)

    def handle_dummy(self, connection, message, context):
        self.__processed += 1

        if self.__processed % 1000 == 0:
            _logger.info("Processed (%d) messages.", self.__processed)

    def default_message_handler(self, message_class, connection, message,
                                classify_context):
        _logger.warning("Squashing unhandled message: [%s] [%s]",
                        message_class, message)

Define the node-collection. We use nsqlookupd servers here, but we could just as easily use ServerNodes() with nsqd servers:

lookup_node_prefixes = [
    'http://127.0.0.1:4161',
]

nc = nsq.node_collection.LookupNodes(lookup_node_prefixes)

Create the consumer object:

_TOPIC = 'test_topic'
_CHANNEL = 'test_channel'
_MAX_IN_FLIGHT = 500

c = nsq.consumer.Consumer(
        [(_TOPIC, _CHANNEL)],
        nc,
        _MAX_IN_FLIGHT,
        message_handler_cls=_MessageHandler)

Start the consumer:

c.start()

Loop. As an example, we loop as long as we’re connected to at least one server:

while c.is_alive:
    gevent.sleep(1)

Implementing a Producer

Imports and boilerplate:

import logging
import json
import random

import nsq.producer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

Define the node-collection. This is a producer, so it only works with nsqd nodes:

server_nodes = [
    ('127.0.0.1', 4150),
]

nc = nsq.node_collection.ServerNodes(server_nodes)

Create the producer object:

_TOPIC = 'test_topic'

p = nsq.producer.Producer(_TOPIC, nc)

Start the producer:

p.start()

Emit the messages:

for i in range(0, 100000, 10):
    if i % 50 == 0:
        _logger.info("(%d) messages published.", i)

    data = { 'type': 'dummy', 'data': random.random(), 'index': i }
    message = json.dumps(data)
    p.mpublish((message,) * 10)

Stop the producer:

p.stop()

Callbacks

Both the consumer and producer can take a callbacks object.

To instantiate the callbacks for a producer:

import nsq.connection_callbacks
cc = nsq.connection_callbacks.ConnectionCallbacks()

To instantiate the callbacks for a consumer:

import nsq.consumer
cc = nsq.consumer.ConsumerCallbacks()

Then, pass the object into the producer or consumer object constructors as ccallbacks.

The following callback methods can be implemented for both a producer or consumer (while making sure to call the original implementation):

  • connect(connection)

    The connection has been established.

  • identify(connection)

    The identify response has been processed for this connection.

  • broken(connection)

    The connection has been broken.

  • message_received(connection, message)

    A message has been received.

The consumer has one additional callback:

  • rdy_replenish(connection, current_rdy, original_rdy)

    The RDY needs to be updated. By default, the original RDY will be reemitted. If this is not desired, override this callback, and don’t call the original.

Footnotes

  • Because we rely on gevent, and gevent isn’t Python3 compatible, nsqs isn’t Python3 compatible.
Release History

Release History

0.2.0

This version

History Node

TODO: Figure out how to actually get changelog content.

Changelog content for this version goes here.

Donec et mollis dolor. Praesent et diam eget libero egestas mattis sit amet vitae augue. Nam tincidunt congue enim, ut porta lorem lacinia consectetur. Donec ut libero sed arcu vehicula ultricies a non tortor. Lorem ipsum dolor sit amet, consectetur adipiscing elit.

Show More

Download Files

Download Files

TODO: Brief introduction on what you do with files - including link to relevant help section.

File Name & Checksum SHA256 Checksum Help Version File Type Upload Date
nsqs-0.2.0-py2-none-any.whl (33.6 kB) Copy SHA256 Checksum SHA256 2.7 Wheel Sep 19, 2014
nsqs-0.2.0.tar.gz (22.6 kB) Copy SHA256 Checksum SHA256 Source Sep 19, 2014

Supported By

WebFaction WebFaction Technical Writing Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS HPE HPE Development Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Rackspace Rackspace Cloud Servers DreamHost DreamHost Log Hosting