Skip to main content

Asynchronous AMQP-integration for Plone (and Zope2)

Project description

AMQP integration for Zope2 (and Plone)

collective.zamqp acts as a Zope Server by co-opting Zope’s asyncore mainloop (using asyncore-supporting AMQP-library pika), and injecting consumed messages as requests to be handled by ZPublisher (exactly like Zope ClockServer).

Therefore AMQP-messages are handled (by default) in a similar environment to regular HTTP-requests: ZCA-hooks, events and everything else behaving normally.

This package is an almost complete rewrite of affinitic.zamqp, but preserves its ideas on how to setup AMQP-messaging by configuring only producers and consumers.

  • rewrite documentation to reflect the new design

  • update affinitic.zamqp’s tests for the new design

While we are still documenting and testing collective.zamqp, you may want to take a look at collective.zamqpdemo for an example of use.


A common setup includes a producer and a worker instance which are both connected to the same rabbitmq server.


Both instances can be easily configured using buildout:

parts =


recipe = plone.recipe.zope2instance
http-address = 8080
eggs =

zope-conf-additional =
    %import collective.zamqp
        connection_id   superuser
        port            5672
        username        guest
        password        guest
        heartbeat       120
        keepalive       60

<= instance
http-address = 8081
zserver-threads = 1
environment-vars =
zope-conf-additional =
        connection_id   superuser
        site_id         Plone
        user_id         admin

Explanation of configuration parameters

  • amqp-broker-connection


    Connection id, which is the registered name of the created global BrokerConnection-utility.

    hostname (optional)

    Hostname or IP Address of RabbitMQ server to connect to, default to localhost.

    port (optional)

    TCP port of RabbitMQ server to connect to, defaults to 5672.

    virtual_host (optional)

    RabbitMQ virtual host to use, defaults to /.

    username (optional)

    Plain text username for RabbitMQ connection, defaults to guest.

    password (optional)

    Plain text password for RabbitMQ connection, defaults to guest.

    heartbeat (optional)

    AMQP heartbeat interval in seconds, defaults to 0 to disable heartbeat.

    keepalive (optional)

    Register producer, consumer, view and clock-server with the given integer timeout in seconds to keep the connection alive. Defaults 0.

    prefetch_count (optional)

    AMQP channel prefetch count limit, defaults to 0 for no limit.

  • amqp-consuming-server


    The name of a global utility providing configured IBrokerConnection. A consuming server will serve consumers registered for its connection id only.

    scheme (optional)

    Configure URL schema for virtual rewriting using VirtualHostMonster, default is http. Will be ignored if no hostname is given.

    hostname (optional)

    Hostname which will be passed into fake request. When calling object.absolute_url() in a consuming server you’ll receive this hostname. URL will be rewritten using VirtualHostMonster using following scheme:


    If a hostname is configured, VirtualHostMonster will be invoked, socket.gethostname() will be used else.

    port (optional)

    Configure port for virtual rewriting using VirtualHostMonster, default is 80. Will be ignored if no hostname is given.

    use_vhm (optional)

    Create VirtualHostMonster-wrapped method calls when hostname is set. VHM is used to tell portal the configured real public hostname and to hide portal’s id from path. Defaults to on.

    vhm_method_prefix (optional)

    Explicitly set the VHM method prefix for AMQP-based requests. Most typical options may look like:

    • /VirtualHostBase/https/

    • /VirtualHostBase/https/

    • /VirtualHostBase/https/

    Note: This overrides the default implicit VHM-support by setting scheme, hostname, port and use_vhm, but will still require use_vhm enabled to be active. Empty value fallbacks to the old default use_vhm-behavior.


    The id of a site, which should be the context when consuming the AMQP messages, which the consumers of a consuming server consume. If a hostname is given, this will be used for VirtualHostMonster rewrites.

    user_id (optional)

    Optional user id of the Plone user, whose privileges are used to consume the messages. By default, the messages are consumed as Anonymous User calling trusted filesystem code.

Configuring logging

You may want to in/decrease collective.zamqp loglevel which can easily be done by passing an environment variable into worker instance as seen in buildout example above:

environment-vars =

Valid parameters are:


  • INFO


0.16.2 (2018-02-27)

  • Fix issue where connection delay did not increment due to routing configuration error causing disconnection whiles still configuring exchanges, queues or bindings [datakurre]

  • Fix issue where a automatic reconnection caused extra connection to be made, sometimes resulting in wrong delivery tags on consumed messages, leading to precondition failed errors when acking those messages [datakurre]

0.16.1 (2017-11-29)

  • Fix issue where blocking connection was unable close channel and connection properly due to bug in Pika 0.9.5 [datakurre]

0.16.0 (2016-06-08)

  • Fix msgpack serializer to encode and decode using ‘utf-8’ encoding by default [Asko Soukka]

0.15.1 (2016-01-28)

  • Fix to require collective.monkeypatcher >= 1.1.1 [datakurre]

0.15.0 (2016-01-27)

  • Note: This release changes producers and messages to join the current transaction using transaction’s join instead of deprecated register. So far, this change has seem to been fully backwards compatible.

  • Add minimal savepoint support (NoRollbackSavepoint) [datakurre]

  • Fix to join transaction using join instead of register to support savepoints [datakurre]

  • Fix issue where pika blocking connection raised error because of a missing log import in its 0.9.5 release [datakurre]

  • Fix testing integration related issue where ZAMQP fixture left dangling registrations to slow down and cause side-effects for tests [datakurre]

0.14.3 (2014-10-23)

  • Log connection and channel creation etc. [sunew]

0.14.2 (2014-10-23)

  • The ZAMQP_LOGLEVEL environment variable was not yet accessible at import time. Moved Logger to utils module to delay the import. [sunew]

0.14.1 (2014-10-10)

  • Fix issue where logging reject did not get enough arguments [datakurre]

0.14.0 (2014-09-18)

  • Fix issue where language is not correctly set for AMQP requests [datakurre]

  • Remove hard dependency to z3c.unconfigure 1.0.1 [saily]

0.13.2 (2013-11-08)

  • Remove duplicate default producer from test fixture. [datakurre]

0.13.1 (2013-11-08)

  • Fix packaging.

0.13.0 (2013-11-08)

  • Add vhm_method_prefix-option to replace old vhm-options [datakurre]

0.12.0 (2013-08-08)

  • Fix to store durable publishable messages during reconnection period in memory on the base of message’s delivery_mode-proprety, not producer’s durable-property [datakurre]

  • Combine loggers and reduce logging [saily, datakurre]

0.11.0 (2013-06-05)

  • Set use_vhm=False for ZAMQP Test Layer [saily]

  • Add option use_vhm to make VHM-methods optional (to allow use of hostname=localhost:8080/Plone in development) [datakurre]

  • Added a configureable scheme, hostname and port keyword to be passed into fake request object in consumers. We well automatically rewrite those URLs using VirtualHostMonster. [datakurre, saily]

  • Reduce amount of logging, fixes #2. [saily]

  • Documentation updates [saily]

  • Add prefetch_count-option for connections [datakurre]

  • Add fixture with RabbitMQ trace firehose consumer to ease debugging [datakurre]

  • Fix to requeue messages also when subclass of ConflictError is raised during commit (e.g. ReadConflictError) [datakurre]

0.10.2 (2013-03-14)

  • Fix to process Pika timeouts on every asyncore poll to fix heartbeat issues

0.10.1 (2013-03-13)

  • Add configuration options for ‘exchange_auto_declare’ and ‘queue_auto_declare’

  • Fix ‘queue_exclusive = True’ to imply also ‘queue_durable = False’ and ‘queue_auto_delete = True’

0.10.0 (2013-03-12)

  • Update keepalive-ping-queue to use 1 hour message ttl

  • Add option (defaulting to true) for monkey patching Zope2 lifetime_loop for shorter asyncore.poll-timeouts on consuming instances.

0.9.8 (2013-03-09)

  • Fix regression: allow to bind queue with empty routing key

0.9.7 (2013-03-05)

  • Fix to not try to requeue auto-acknowledged messages on conflict errors, because it’s not possible

  • Fix consumer configuration to support list of routing keys (to ease the use of metronome like exchanges)

0.9.6 (2013-02-21)

  • Fix Rabbit-fixture to be silent when Rabbit has been killed before teardown

0.9.5 (2013-02-20)

  • Fix to include convenient default producers in the default test layer

0.9.4 (2012-11-27)

  • Fixed bugs in registering connections and consuming servers in ZAMQP layer.

  • Added ‘text/csv’ serializer (for serializing iterable container of dictionaries into RFC4180 CSV data and deserializing such messages into tuple of dictionaries)

0.9.3 (2012-11-27)

  • Enhanced ‘runAsyncTest’ to accept ‘loop_timeout’ and ‘loop_count’ parameters.

  • Fixed optional json-serializer to try to import ‘json’ at first and only then ‘simplejson’.

0.9.2 (2012-09-18)

  • Pinned z3c.unconfigure==1.0.1.

  • Added test fixture to be used with ZSERVER-fixtures to support Selenium-testing.

  • Fixed consuming server to default to ‘Anonymous User’ instead of None.

  • Added runAsyncTest-helper for running tests depending on the asyncore loop. Added consuming server for ZAMQP-layer.

0.9.1 (2012-09-18)

  • Fixed to not set correlation_id-property for message if the given correlation_id is None.

0.9.0 (2012-09-17)

  • Added alias ‘Producer.register’ for VTM._register.

  • Renamed ‘connected’-property to ‘is_connect’.

0.8.1 (2012-09-06)

  • Fixed queue length helpers to use BlockingChannel-helper properly.

0.8.0 (2012-09-06)

  • Fixed consumers without marker interface not to start consuming.

  • Enhanced undolog for transactions by ‘zamqp-consumer’-view.

  • Fixed consuming view to annotate transaction with the user configured for the current consuming service.

  • Added separate auto_delete-setting for exchanges and queues. Previously auto_delete was set as negation of durability, which remains the default.

  • Added connection configuration to default with implicit default producer registration (= producer with the same name/id as the connection, but no any special routing).

  • Added support for custom ‘x-cookie-auth’ header. Its value will be set to value of ‘__ac’ cookie for AMQP request to allow PAS-authentication for logged in user (e.g. to support authenticated asyncronous tasks).

  • Added __len__ for consumer and producer for getting the queue length (if the related queue is defined) using blocking channel.

  • Added BlockingChannel wrapper to be used with ‘with’ statement to create separate blocking connections for quick raw AMQP-operations.

  • Fixed to never declare queue starting with ‘amq.’, which is reserved prefix in RabbitMQ. Allow empty queue names to support automatic (broker-generated) queue-names.

  • Fixed to never declare RabbitMQ default exchanges (declarig of any ‘amq.’-starting exchange will be skipped).

  • Added json-serializer (when either json or simplejson can be imported).

0.7.18 (2012-08-10)

  • Bug fixes. [malthe]

  • Added plone.testing layer for RabbitMQ + Zope. Added a dummy test for the layer. Enabled RabbitMQ-parts in test buildout.

  • Fixed consumers and producers to use the default exchange by default to allow the easiest possible configuration for the use of the default exchange only.

0.7.17 (2012-05-21)

  • Added transaction aware reject to message.

  • Added site_id-substitution support for consumer name to make consuming service and site specific consumers available for lookup.

  • Fixed to not crash if connection if not defined. Just warn.

  • Fixed to work more similarly in consumer (name is taken as queue) as in producer (name is taken as routing_key).

  • Refactored ping to use simple dynamic properties for routing_key and queue.

  • Refactored producer, consumer and connection init to allow configuration using simple dynamic properties.

  • Refactored producer, consumer and connection init to allow configuration using simple dynamic properties.

  • Dropped ‘magical’ buildout name from keepalive’s ping queues.

  • Removed ‘magical’ proxying of message body’s properties before we rely on it.

0.7.16 (2012-05-04)

  • Forced correlation_id to be str.

  • Changed default serializer from ‘text/plain’ to ‘pickle’.

  • Fixed added dependencies to work on Plone 4.0.x.

0.7.14 (2012-05-02)

  • Fixed to requeue message when transaction of successful handling is aborted (e.g. due to ZODB conflict error).

0.7.12 (2012-04-25)

  • Added support for sauna.reload.

0.7.11 (2012-04-18)

  • Changed ping to be logged on debug-level instead of info-level.

0.7.10 (2012-04-18)

  • Fixed Pika-adapter to process timeouts to support AMQP-heartbeat.

0.7.9 (2012-04-16)

  • Modified keepalive-setting to accept an integer instead of boolean to allow configuration of keepalive-ping-interval in detail.

0.7.8 (2012-04-16)

  • Fixed issue where a typo in message de-serialization hide de-serialized body.

0.7.7 (2012-04-04)

  • Fixed issue with attribute not found in threadlocals.

0.7.5 (2012-02-26)

  • Minor fixes for being more sauna.reload-friendly.

0.7.4 (2012-03-12)

  • Simplified Ping-consumer to ack messages and log ping directly withing asyncore loop without creating a fake HTTP-request.

0.7.3 (2012-03-09)

  • Added a helper function collective.zamqp.utils.getBuildoutName to be used in configuration re-usable packages using buildout-depending AMQP-queues (e.g. for replies).

0.7.2 (2012-03-08)

  • Added keepalive option for AMQP Broker Connection -configuration in zope.conf to auto-register all needed utilities, views and clock-servers for keeping the connection alive with regular ping message.

0.7.1 (2012-03-06)

  • Allowed new named AMQP Broker Connections to be defined in zope.conf (or in ‘zope-conf-additional’ in instance buildout recipe).

0.7.0 (2012-02-05)

  • Internal development release.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

collective.zamqp-0.16.2.tar.gz (64.2 kB view hashes)

Uploaded Source

Built Distribution

collective.zamqp-0.16.2-py2-none-any.whl (73.4 kB view hashes)

Uploaded Python 2

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page