Skip to main content

A library to simplify processing streams of data

Project description

Arroyo Stream Processing Toolset

Processing event or streaming data presents several technological challenges. A variety of technologies are often used by scientific user facilities. ZMQ is used to stream data and messages in a peer-to-peer fashion. Message brokers like Kafka, Redis and RabbitMQ are often employed to route and pass messages from instruments to processing workflows. Arroyo provides an API and structure to flexibly integrate with these tools and incorporate arbitrarily complex processing workflows, letting the hooks to the workflow code be independent of the connection code and hence reusable at a variety of instruments.

The basic structure of building an arroyo implementation is to implement groups of several classes:

  • Operator - receives Messages from a listener and can optionally send Messages to one or more Publisher instances
  • Listener - receives Messages from the external world, parse them into arroyo Message and sends them to an Operator
  • Publisher - receives Messages from a Listener and publishes them to the outside world

Arroyo is un-opinionated about deployment decsions. It is intended support listener-operator-publisher groups in:

  • Single process
  • Chain of processes where listening, processing and publishing can linked together through a protocol like ZMQ. One process's publisher can communicate with another process's listener, etc.

This library is intended to provide classes, and will also include more specific common subclasses, like those that communicate over ZMQ or Redis.

---
title: Some sweet classes

note: I guess we use "None" instead of "void"
---

classDiagram
    namespace listener{

        class Listener{
            operator: Operator

            *start(): None
            *stop(): None
        }


    }

    namespace operator{
        class Operator{
            publisher: List[Publisher]
            *process(Message): None
            add_publisher(Publisher): None
            remove_publisher(Publisher): None

        }
    }

    namespace publisher{
        class Publisher{
            *publish(Message): None
        }

    }

    namespace message{

        class Message{

        }

        class Start{
            data: Dict
        }

        class Stop{
            data: Dict
        }

        class Event{
            metadata: Dict
            payload: bytes
        }
    }

    namespace zmq{
        class ZMQListener{
            operator: Operator
            socket: zmq.Socket
        }

        class ZMQPublisher{
            host: str
            port: int
        }

    }

    namespace redis{

        class RedisListener{
            operator: Redis.client
            pubsub: Redis.pubsub
        }

        class RedisPublisher{
            pubsub: Redis.pubsub
        }

    }



    Listener <|-- ZMQListener
    ZMQListener <|-- ZMQPubSubListener
    Listener o-- Operator

    Publisher <|-- ZMQPublisher
    ZMQPublisher <|-- ZMQPubSubPublisher

    Publisher <|-- RedisPublisher
    Listener <|-- RedisListener
    Operator o-- Publisher
    Message <|-- Start
    Message <|-- Stop
    Message <|-- Event

In-process, listening for ZMQ

Note that this leaves Concrete classes undefined as placeholders

TODO: parent class labels

sequenceDiagram
    autonumber
    ExternalPublisher ->> ZMQPubSubListener: publish(bytes)
    loop receiving thread
        activate ZMQPubSubListener
            ZMQPubSubListener ->> ConcreteMessageParser: parse(bytes)
            ZMQPubSubListener ->> MessageQueue: put(bytes)
        deactivate ZMQPubSubListener


        ZMQPubSubListener ->> MessageQueue: message(Message)
    end
    activate ConcreteOperator
        loop polling thread
            ConcreteOperator ->> MessageQueue: get(bytes)
        end
        loop processing thread
            ConcreteOperator ->> ConcreteOperator: calculate()

            ConcreteOperator ->> ConcretePublisher: publish()
        end
    deactivate ConcreteOperator

Devloper installation

Conda environment

We use pixi to be forward thinking tio help with CI. We like it because it helps you easily test that dependencies for a variety of architects can resolve.

However, at the time of writing we can't figure out how to get it to be a good developer experience. So, we create a conda environment like (note that at this time, we are using python 3.11 because of numpy and wheel availability):

conda create -n arroyo python=3.11
conda activate arroyo
pip install -e '.[dev]'

pre-commit

We use pre-commit in CI so you want to use it before commiting. To test that your branches changes are all good, type:

pre-commit run --all-files

Since our configuration of pre-commit uses black, it's possible that it will change files. If you like the changes, you can add them to your git commit with

git add .

Then you can run pre-commit run --all-files again.

pixi

We use pixi for CI in github action. It's great for that but can't get our favorite developr tools to use the python environments that pixi creaetes in the .pixi folder. If you want to play with pixi, here are some tips:

To setup a development environment:

  • Git clone this repo and CD into the directory

  • Install pixi

  • Install dependencies with ''' pixi install '''

  • run pre-commit on the files ''' pixi r pre-commit '''

  • Run pytest with ''' pixi r test '''

Copyright

Arroyo Stream Processing Toolset (arroyopy) Copyright (c) 2025, The Regents of the University of California, through Lawrence Berkeley National Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy). All rights reserved.

If you have questions about your rights to use or distribute this software, please contact Berkeley Lab's Intellectual Property Office at IPO@lbl.gov.

NOTICE. This Software was developed under funding from the U.S. Department of Energy and the U.S. Government consequently retains certain rights. As such, the U.S. Government has been granted for itself and others acting on its behalf a paid-up, nonexclusive, irrevocable, worldwide license in the Software to reproduce, distribute copies to the public, prepare derivative works, and perform publicly and display publicly, and to permit others to do so.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

arroyopy-0.3.0a3.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

arroyopy-0.3.0a3-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file arroyopy-0.3.0a3.tar.gz.

File metadata

  • Download URL: arroyopy-0.3.0a3.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for arroyopy-0.3.0a3.tar.gz
Algorithm Hash digest
SHA256 79eff84e545f9b40de63288badb4665e6d5acb1b88fb7615e0bc339e56a929e6
MD5 0763ed02640802c9219b26ee645624fa
BLAKE2b-256 418224fea11dd09357bb63598acf27a92553dc3b868382f392f5f145d1840fef

See more details on using hashes here.

Provenance

The following attestation bundles were made for arroyopy-0.3.0a3.tar.gz:

Publisher: publish.yaml on als-computing/arroyopy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file arroyopy-0.3.0a3-py3-none-any.whl.

File metadata

  • Download URL: arroyopy-0.3.0a3-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for arroyopy-0.3.0a3-py3-none-any.whl
Algorithm Hash digest
SHA256 15ba78a1293014c26a51665a513f6bd1a1d50de1604db3e25bcb4d80f53dd6ae
MD5 898fe959c617539aad3c14c38e703434
BLAKE2b-256 ba0a3de8be877461c9a147c6e25562090d1f738aa6a9aab40cd2f7ce34274b2d

See more details on using hashes here.

Provenance

The following attestation bundles were made for arroyopy-0.3.0a3-py3-none-any.whl:

Publisher: publish.yaml on als-computing/arroyopy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page