Skip to main content
This is a pre-production deployment of Warehouse. Changes made here affect the production instance of PyPI (
Help us improve Python packaging - Donate today!

A microframework to build source -> filter -> action workflows.

Project Description


| |docs| |changelog| |travis| |coveralls| |landscape| |scrutinizer|
| |version| |downloads| |wheel| |supported-versions| |supported-implementations|

.. |docs| image::
:alt: Documentation Status

.. |changelog| image::
:alt: Release Notes

.. |travis| image::
:alt: Travis-CI Build Status

.. |coveralls| image::
:alt: Coverage Status

.. |landscape| image::
:alt: Code Quality Status

.. |version| image::
:alt: PyPI Package latest release

.. |downloads| image::
:alt: PyPI Package monthly downloads

.. |wheel| image::
:alt: PyPI Wheel

.. |supported-versions| image::
:alt: Supported versions

.. |supported-implementations| image::
:alt: Supported imlementations

.. |scrutinizer| image::
:alt: Scrtinizer Status

Simple rules

Python processor is a tool for creating chained pipelines for dataprocessing.
It have very few key concepts:

Data object
Any python dict with two required fields: ``source`` and ``type``.
An iterable sequence of ``data objects`` or a function which returns ``data objects``.
See `full list of sources`_ in the docs.
A function which accepts a ``data object`` as input and could output another. See `full list of outputs`_ in the docs.
(or same) ``data object`` as result.
Pipeline consists from sources outputs, but ``predicate`` decides which
``data object`` should be processed by which ``output``.

Quick example

Here is example of pipeline which reads IMAP folder and sends all emails to Slack chat:

.. code:: python

[prepare_email_for_slack, outputs.slack(SLACK_URL)])

Here you construct a pipeline, which uses ``sources.imap`` for reading imap folder
"INBOX" of ````. In more complex case ``outputs.fanout``
can be used for routing dataobjects to different processors and ``sources.mix`` can
be used to merge items two or more sources into a one stream.

Functions ``prepare_email_to_slack`` and ``outputs.slack(SLACK_URL)`` are processors. First one
is a simple function which accepts data object, returned by imap source and transforming
it to the data object which could be used by slack.output. We need that because slack
requires a different set of fields. Call to ``outputs.slack(SLACK_URL)`` returns a
function which gets an object and send it to the specified Slack's endpoint.

It is just example, for working snippets, continue reading this documention ;-)

.. Note:: By the way, did you know there is a Lisp dialect which runs on Python
virtual machine? It's name is HyLang, and python processor is written in this


Create a virtual environment with python3:::

virtualenv --python=python3 env
source env/bin/activate

Install required version of hylang (this step is necessary because Hy syntax is not
final yet and frequently changed by language maintainers):::

pip install -U 'git+git://'

If you are on OSX, then install lxml on OSX separately:::

STATIC_DEPS=true pip install lxml

If you want to access IMAP over SSL on OSX, then you need to install
``openssl`` via homebrew, and then install ``pyopenssl`` like this:::

brew install openssl
env LDFLAGS="-L$(brew --prefix openssl)/lib" \
CFLAGS="-I$(brew --prefix openssl)/include" \
pip install -U --force-reinstall pyopenssl

Then install the ``processor``:::

pip install processor


Now create an executable python script, where you'll place your pipline's configuration.
For example, this simple code creates a process line which searches new results in Twitter
and outputs them to console. Of cause, you can output them not only to console, but also
post by email, to Slack chat or everywhere else if there is an output for it:

.. code:: python

import os
from processor import run_pipeline, sources, outputs
from twiggy_goodies.setup import setup_logging

for_any_message = lambda msg: True

def prepare(tweet):
return {'text': tweet['text'],
'from': tweet['user']['screen_name']}


'My Company',
consumer_key='***', consumer_secret='***',
access_token='***', access_secret='***',
rules=[(for_any_message, [prepare, outputs.debug()])])

Running this code, will fetch new results for search by query ``My Company``
and output them on the screen. Of course, you could use any other ``output``,
supported by the ``processor``. Browse online documentation to find out
which sources and outputs are supported and for to configure them.

.. _full list of sources: sources.html
.. _full list of outputs: outputs.html

Ideas for Sources and Outputs

* ``web-hook`` endpoint `(in progress)`.
* ``tail`` source which reads file and outputs lines appeared in a file between invocations
or is able to emulate ``tail -f`` behaviour. Python module
`tailer <>`_ could be used here.
* ``grep`` output -- a filter to grep some fields using patterns. With ``tail`` and ``grep``
you could build a pipeline which watch on a log and send errors by email or to the chat.
* ``xmpp`` output.
* ``irc`` output.
* ``rss/atom feed reader``.
* ``weather`` source which tracks tomorrow's weather forecast and outputs a message if it was
changed significantly, for example from "sunny" to "rainy".
* ``github`` some integrations with github API?
* ``jira`` or other task tracker of your choice?
* `suggest your ideas!`



To run the all tests run::



* Alexander Artemenko -


0.10.0 (2016-01-04)

* IMAP source was fixed to work with new IMAPClient's API and
support ``IMAPClient > 1.0.0``.
* Datastorage was fixed to get ``filename`` from ``PROCESSOR_DB``
environment variable in case if it was setup using
``os.environ['PROCESSOR_DB'] = 'some.db'`` after the imports.

0.9.0 (2015-12-06)

Code was fixed to work with HyLang from ``a3bd90390cb37b46ae33ce3a73ee84a0feacce7d``
commit. Please, use this pinned version of HyLang and `subscribe`_ on future
release notes to know when this requirement will change.

.. _subscribe:

0.8.0 (2015-11-16)

* Code was fixed to work with latest Hy, from GitHub.
* Added ``twitter.mentions`` source, to read stream of mentions from the Twitter.
* Fixed a way how number of messages from IMAP folder is limited. Previously
limit was applied even when we already know an ID of the last seen message,
but now limit is ignored in this case and only applied when visiting the
folder first time.

0.7.0 (2015-05-05)

New output – XMPP was added and now processor is able
to notify Jabber users.

0.6.0 (2015-05-01)

The biggest change in this release is a new source – ``github.releases``.
It is able to read all new releases in given repository and send them into
processing pipeline. This works as for public repositories, and for private
too. `Read the docs`_ for futher details.

.. _Read the docs:

Other changes are:

* Storage backend now saves JSON database nicely pretty printed for you could read and edit it in your favorite editor. This is Emacs, right?
* source now saves state after the tweet was processed. This way processor shouldn't loose tweets if there was exception somewhere in processing pipeline.
* IMAP source was fixed and now is able to fetch emails from really big folders.

0.5.0 (2015-04-15)

Good news, everyone! New output was added - ``email``.
Now Processor is able to notify you via email about any event.

0.4.0 (2015-04-06)

* Function ``run_pipline`` was simplified and now accepts only one source and one ouput.
To implement more complex pipelines, use ``sources.mix`` and ``outputs.fanout`` helpers.

0.3.0 (2015-04-01)

* Added a `web.hook`_ source.
* Now `source` could be not only a iterable object, but any function which returns values.

.. _web.hook:

0.2.1 (2015-03-30)

Fixed error in ``import-or-error`` macro, which prevented from using 3-party libraries.

0.2.0 (2015-03-30)

Most 3-party libraries are optional now. If you want to use
some extension which requires external library, it will issue
an error and call ``sys.exit(1)`` until you satisfy this

This should make life easier for thouse, who does not want
to use ``rss`` output which requires ``feedgen`` which requires
``lxml`` which is hard to build because it is C extension.

0.1.0 (2015-03-18)

* First release on PyPI.
Release History

Release History

This version
History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


Download Files

Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

File Name & Checksum SHA256 Checksum Help Version File Type Upload Date
processor-0.10.0.tar.gz (113.4 kB) Copy SHA256 Checksum SHA256 Source Jan 4, 2016

Supported By

WebFaction WebFaction Technical Writing Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Rackspace Rackspace Cloud Servers DreamHost DreamHost Log Hosting