Skip to main content

gocept.amqprun helps you writing and running AMQP consumers, and sending AMQP messages. It currently only supports AMQP 0-9-1 and integrates with the Zope Tool Kit (ZTK) so you can use adapters, utilities and all the buzz.

Project description

gocept.amqprun helps you writing and running AMQP consumers, and sending AMQP messages. It currently only supports AMQP 0-9-1 and integrates with the Zope Tool Kit (ZTK) so you can use adapters, utilities and all the buzz.

Basic concepts and terms

  • A message handler is a function which is bound with a routing key to exactly one queue. It is called for each message on that queue, and may return a list of messages as a result.

  • The result messages of one handled message are sent in one transaction together with the ACK of the handled message.

  • When an exception is raised during message processing, the transaction is aborted. (The received message would be NACKed if RabbitMQ was supporting it.)

  • A message handler handles exactly one message at a time. Multiple messages can be processed at the same time using multiple processes. (Each process is single-threaded.)

Things you don’t need to take care of

  • Socket handling and locking for communicating with the AMQP broker

  • Transaction handling

  • Message ids

    • Each outgoing message gets an email-like message id.

    • The correlation id of each outgoing message is set to the message id of the incoming message.

    • Each outgoing message gets a custom references header which is set to the incoming message’s reference header plus the incoming message’s message id.

Getting started: receiving messages

To get started, define a function which does the work. In this example, we log the message body and send a message. The declare decorator takes two arguments, the queue name and the routing key (you can also pass in a list to bind the function to multiple routing keys). The declare decorator also supports an optional arguments argument that is a dictionary to be passed to the AMQP queue_declare call to, e.g., support mirrored queues on RabbitMQ. The optional argument principal specifies to wrap the handler call into a zope.security interaction using the given principal id (you need the [security] setup.py extra to use this integration functionality).

import logging
import gocept.amqprun.handler
import gocept.amqprun.message

log = logging.getLogger(__name__)

@gocept.amqprun.handler.declare('test.queue', 'test.routing')
def log_message_body(message):
    log.info(message.body)
    msg = gocept.amqprun.message.Message(
        header=dict(content_type='text/plain'),
        body=u'Thank you for your message.',
        routing_key='test.thank.messages')
    return [msg]

The handler function needs to be registered as a named utility. With ZCML this looks like this [1]:

<configure xmlns="http://namespaces.zope.org/zope">
  <utility component="your.package.log_message_body" name="basic" />
</configure>

To set up a server, it’s recommended to create a buildout. The following buildout creates a config file for gocept.amqprun as well as a ZCML file for the component configuration and uses ZDaemon to daemonize the process:

[buildout]
parts =
        config
        zcml
        app
        server

[deployment]
name = queue
recipe = gocept.recipe.deploymentsandbox
root = ${buildout:directory}

[config]
recipe = lovely.recipe:mkfile
path = ${deployment:etc-directory}/queue.conf

amqp-hostname = localhost
amqp-username = guest
amqp-password = guest
amqp-virtualhost = /

eventlog =
    <eventlog>
      level DEBUG
      <logfile>
        formatter zope.exceptions.log.Formatter
        path STDOUT
      </logfile>
    </eventlog>
amqp-server =
    <amqp-server>
      hostname ${:amqp-hostname}
      username ${:amqp-username}
      password ${:amqp-password}
      virtual_host ${:amqp-virtualhost}
    </amqp-server>

content =
    ${:eventlog}
    ${:amqp-server}
    <worker>
      component-configuration ${zcml:path}
    </worker>
    <settings>
      your.custom.settings here
    </settings>

[zcml]
recipe = lovely.recipe:mkfile
path = ${deployment:etc-directory}/queue.zcml
content =
    <configure xmlns="http://namespaces.zope.org/zope">
      <include package="your.package" />
    </configure>

[app]
recipe = zc.recipe.egg:script
eggs =
   gocept.amqprun
   your.package
   zope.exceptions
arguments = '${config:path}'
scripts = server=app

[server]
recipe = zc.zdaemonrecipe
deployment = deployment
program = ${buildout:bin-directory}/app

Sending messages

If all you want to do is send messages, you don’t have to register any handlers, but can use gocept.amqprun.server.Server.send() directly. While the handlers usually run in their own process, started by the server entrypoint (as described above), if you’re just sending messages, you can also skip the extra process and run the gocept.amqprun.server.Server in your original process. Here is some example code to do that:

def start_server(**kw):
    server = gocept.amqprun.server.Server(kw, setup_handlers=False)
    server.connect()
    return server

(When you’re using the ZCA, you’ll probably want to register the Server as a utility at that point, too, so clients can access it to send messages easily.)

Settings

For application-specific settings gocept.amqprun makes the <settings> section from the configuration available via an ISettings utility:

settings = zope.component.getUtility(
    gocept.amqprun.interfaces.ISettings)
settings.get('your.settings.key')

Limitations

  • Currently all messages are sent and received through the amq.topic exchange. Other exchanges are not supported at the moment.

Interfacing with the file system

Reading

There is a send_files entrypoint in the setup.py. It can be configured with three arguments: The path of the zconfig file, a watch path and a routing key. It reads new files in the directory named new in the watch path and sends a message with its content as the body and the filename as an X-Filename header to the route. Sent files are moved to the directory called cur in the watch path.

Development

You can set the AMQP server parameters for running the tests via environment variables:

AMQP_HOSTNAME:

default: localhost

AMQP_USERNAME:

default: guest

AMQP_PASSWORD:

default: guest

AMQP_VIRTUALHOST:

default: None, so a vhost with a temporary name is created and deleted automatically (using AMQP_RABBITMQCTL command)

AMQP_RABBITMQCTL:

default: ‘sudo rabbitmqctl’

The source code is available in the mercurial repository at https://github.com/gocept/gocept.amqprun

Please report any bugs you find at https://github.com/gocept/gocept.amqprun/issues

bin/test_sender and bin/test_server

test_sender and test_server are scripts that provide basic sender and handler capabilities to smoke test the behaviour of our current implementation. When started test_sender emits 10 messages routed to test.routing. test.server declares a test.queue which all messages from test.routing are sent to and a handler logging every incoming message from test.queue.

bin/test_send_files

test_send_files starts a server that watches the ./testdir/new directory and sends files copied into it as an amqp message to test.routing. Its entrypoint is gocept.amqprun.readfiles:main.

CHANGES

2.1 (2020-09-18)

  • Let the Server exit in case of an socket.error. We lost the re-connection feature in 2.0 already.

2.1a1 (2020-04-17)

  • Add support for Python 3.7 and 3.8.

2.0 (2020-04-15)

  • Add support for zope.interface >= 5. (#13)

  • Change CI pipeline from Travis to GitHub Actions.

2.0a1 (2020-03-20)

Breaking changes

  • Use kombu/py-amqp instead of pika for basic abstraction over amqp methods.

  • Remove multithreading – servers and workers run in the current process.

  • Remove gocept.amqprun.main.main_server as main loop.

  • Remove configure.zcml and meta.zcml.

  • Remove writing messages in files and the amqp:writefile directive.

  • Remove writefiles extra in setup.py.

  • Remove amqp:readfile zcml directive and add send_files entrypoint for readfiles process. It takes three parameters: a configuration file path, a watch directory path and a route to send the file to. (see bin/test_send_files, details are in the README.rst).

  • Remove .channel.Channel and use amqp.channel.Channel instead.

  • Remove .interfaces.IChannelManager.

  • Remove .interfaces.ProcessStarted.

  • Remove .interfaces.ProcessStopping.

  • Remove .connection.Connection and use kombu.Connection instead.

  • Rename conf key heartbeat_interval to heartbeat in zconfig configuration.

  • Remove amount from worker zconfig.

  • Enforce content_encoding header for Message bodies that are unicode.

  • Rename .message.Message.header.headers to .message.Message.header.application_headers.

  • Rename .testing.ZCML_LAYER to .testing.ZCA_LAYER.

  • Remove .testing.LoopTestCase, use unittest.TestCase instead.

  • Remove .testing.MainTestCase.wait_for_response, use wait_for_message instead.

  • Remove .testing.SettingsLayer as it is not used here.

  • Move .connection.Parameters to .server, it now returns a dict.

Features

  • Add bin/test_sender and bin/test_server scripts to smoke test sending and receiving behaviour. See README.rst.

  • Add representation for .handler.Handler class.

1.8 (2019-09-11)

  • Update to current bootstrap.py

  • Improve forward compatibility with Python 3.

  • Fix deprecation warning.

  • Prevent using pytest >= 5 to keep Python 2 compatibility.

  • Migrate repository to github.

1.7.1 (2016-08-22)

  • Fix bug, when a string is passed as a port number to gocept.amqprun.connection.Parameters.

1.7 (2016-05-31)

  • Fix possible unresponsiveness of message handlers which occurred after an exception in the worker. The exception caused a counter under-run which prevents switching channels. The counter is no longer bound to the transaction but to the message handling.

  • Add a save guard which kills the whole process if the reference counter on the channel falls below zero.

  • Acquire the channel before putting a task to the worker queue to prevent a possible race condition.

  • Release the commit lock if tpc_abort() fails. So the next tpc_begin() does not wait forever while trying to acquire the lock and thus blocking the whole process.

1.6 (2016-04-04)

  • Allow arbitrary keywords in .testing.QueueTestCase.send_message() to be passed to generated message.

1.5 (2016-02-01)

  • Add a testing extra which can be used to reuse the test infrastructure of this package to develop tests on top of it.

1.4 (2015-11-20)

1.3 (2015-09-11)

  • Update the test infrastructure to be able to run with newer RabbitMQ versions than 3.1.5.

1.2 (2015-09-11)

  • Add py.test as test runner.

  • Add settings attribute to IHandler to ease accessing ISettings.

1.1 (2015-09-01)

  • .connection.Parameters no longer accepts arbitrary keyword parameters.

1.0 (2015-04-10)

  • Notify IConfigFinished event after gocept.amqprun is fully configured but before the workers are started. This event can be used to open database connections for instance.

  • Bump version to 1.0 as this package is used in production for years.

0.17.0 (2015-04-09)

0.16.0 (2015-03-25)

  • Raise RuntimeError if connection is not alive after connect. (This can happen when the credentials are wrong.)

0.15.2 (2015-02-24)

  • Fix error reporting in .testing.QueueLayer.rabbitmqctl().

0.15.1 (2015-01-21)

  • Moved code to and bug tracker to Bitbucket.

0.15.0 (2014-10-08)

  • Pretend to support savepoints so we can work together with other systems that support (and require) them.

0.14.0 (2014-09-09)

  • Introduce gocept.amqprun.handler.ErrorHandlingHandler base class.

  • Run integration tests (MainTestCase) in a separate ZCA, isolated from any registrations that might have been done by other tests.

  • Create temporary virtualhost on the rabbitmq server per test run.

  • Improved test-case method send_message to take headers as a parameter.

0.13.0 (2014-04-16)

  • Introduce IResponse for transaction-integrated exception handling.

  • Fix timing bug: when the remote side closes the socket and at the same time a channel switch is triggered, this used to break down (since the socket is closed). We now ignore the channel switch attempt and simply wait for the reconnect, which opens a new channel anyway.

0.12.2 (2014-02-20)

  • Add xfilename variable to the <amqp:writefiles pattern=""> setting.

0.12.1 (2014-02-20)

  • Add safeguard that two handlers cannot bind to the same queue.

0.12 (2014-02-13)

  • Include <amqp:writefiles> into the transaction handling, i.e. write to file only on commit. Previously, when an error occurred e.g. inside the MessageStored event, duplicate files would be written (each time the message was processed again on retry after the error).

0.11 (2014-01-21)

  • Crash the process on filesystem errors for <amqp:readfiles>. Previously only the file reading thread crashed, but the process kept running, unaware that it now was not doing its job anymore, since the thread had died.

0.10 (2013-05-28)

  • Support more than one Server in a single process.

  • Introduce setup_handlers parameter for Server so clients can disable setting up handlers.

  • Fix a bug in the <amqp:readfiles> transaction implementation that caused crash when aborting a transaction (#12437).

  • Allow test server to take longer time for startup on slow computers.

0.9.5 (2013-04-16)

  • Refactor Session / DataManager responsibilities (#9988).

0.9.4 (2012-09-07)

  • Fix IDataManager implementation: abort() may be called multiple times, regardless of transaction outcome. Only release the channel refcount once.

0.9.3 (2012-09-07)

  • Improve logging of IDataManager.

0.9.2 (2012-09-07)

  • Improve logging of IChannelManager.acquire/release.

0.9.1 (2012-09-06)

  • Fix IDataManager implementation: tpc_abort() may also be called without a prior tpc_begin() (happens for errors in savepoints, for example).

  • Fix method signature of Connection.close().

0.9 (2012-08-31)

  • Introduce optional integration with zope.security: handlers can declare a principal id with which an interaction will be created.

  • Use a separate channel for sending messages that are not a response to a received message.

  • Introduce SETTINGS_LAYER for tests relying on ISettings.

0.8 (2012-04-04)

  • Fix race condition that caused messages to be acknowledged on a different channel than they were received on (#10635).

  • Fix race condition that caused attempts at sending messages before the server was started properly (#10620).

0.7 (2012-03-22)

  • Fix race condition between getting the current channel in the DataManager and switching the current channel in the Server (#10521).

  • Make AMQP server configurable for tests (#9232).

0.6.1 (2012-02-23)

  • Fixed bug in creating references header when parent message has no references (#10478).

0.6 (2012-02-22)

Features

  • Changed FileStoreReader from its own process to a thread that uses gocep.amqprun for sending (previously it used amqplib). Introduced amqp:readfiles ZCML directive. (#10177)

  • Changed filestore extra to readfiles extra.

  • Transmit filename as X-Filename header from amqp:readfiles.

  • Introduced ISender utility.

Bugs

  • Fixed bug with acknowledging messages that was introduced in 0.5 (#10030).

Internal

  • Changed API for MainTestCase from create_reader to start_server.

0.5.1 (2012-01-09)

  • Bugfix to support unicode arguments for queue declaration as pika only supports bytestrings here.

  • Bugfix to make arguments parameter of amqp:writefiles work (#10115).

0.5 (2011-12-08)

General

  • Added writefiles extra to make ZCML directive amqp:writefiles optional.

  • Added filestore extra to make gocept.amqprun.filestore optional.

  • Moved declaration of amqp:writefiles from configure.zcml to meta.zcml.

Features

  • Renamed gocept.amqprun.server.MessageReader into gocept.amqprun.server.Server and added a send method so it can initiate sending of messages.

  • Add support for arguments for queue_declare e.g to support x-ha-policy headers for RabbitMQ mirrored queue deployments (#10036).

Internal

  • Internal API change in server.AMQPDataManager.__init__: the message parameter is now optional, so it was moved to the end of the list of arguments.

  • Use plone.testing for layer infrastructure.

0.4.2 (2011-08-23)

  • Add helper methods for dealing with header files to FileWriter (for #9443).

0.4.1 (2011-08-22)

  • Log Message-ID.

0.4 (2011-07-25)

  • The message id of outgoing messages is set.

  • The correlation id of outgoing messages is set to the incoming message’s message id (if set).

  • A custom header references is set to the incoming message’s reference header + the incomming message’s message id (like References in RFC5322).

  • Fixed broken tests.

  • Allow upper case in settings keys.

  • Extend AMQP server configuration for FileStoreReader to include credentials and virtual host.

  • Allow specifying multiple routing keys (#9326).

  • Allow specifying a filename/path pattern (#9327).

  • The FileWriter stores the headers in addition to the body (#9328).

  • FileWriter sends IMessageStored event (#9335).

0.3 (2011-02-05)

  • Renamed decorator from handle to declare.

  • Added helper method wait_for_response to MainTestCase.

  • Added an IProcessStarting event which is sent during startup.

  • Added the <amqp:writefiles/> directive that sets up a handler that writes incoming messages into files.

  • Added handling of <logger> directives

0.2 (2010-09-14)

  • Added a decorator gocept.amqprun.handler.handle(queue_name, routing_key).

0.1 (2010-08-13)

  • first release.

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gocept.amqprun-2.1.tar.gz (41.9 kB view hashes)

Uploaded Source

Built Distribution

gocept.amqprun-2.1-py2.py3-none-any.whl (45.5 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page