Skip to main content

Pluggable message queueing for Python

Project description

VolleyFull-Horizontal

Documentation: https://shipt.github.io/py-volley/

Volley makes building event stream applications easier and more accessible. Use Volley if you need to quickly develop an application to consume messages, processes them (and do other things), then publish results to one or many places. Volley was inspired ease of use and developer experience provided by the Flask and FastAPI projects, and aims to make working with queue based and event driven system as accessible as REST APIs.

Volley handles a number of operations that need to happen before and after processing a message. Reading the data, serialization, data validation, all need to happen before data reaches your application. If these fail, Volley can route the message to a dead-letter-queue. After processing, Volley again handles data validation, serialization, and the writing/publishing of data to any number of output queues. Finally, upon successfully delivery of that message to the target queue, Volley handles marking it as read or deleting it from the input queue.

All of Volley's major operations (connectors, serializers, data validation/model handling) can be extended with plugins, and comes with built in support for queues-like technologies Apache Kafka and RSMQ (Redis Simple Message Queue). There is a plugin built for a Postgres queue in our examples.

Installation

Requires Python >= 3.10

pip install py-volley[all]

You can also limit the dependencies by:

pip install py-volley[kafka]  # Kafka dependencies
pip install py-volley[rsmq]  # RSMQ dependencies
pip install py-volley[zmq]  # ZeroMQ dependencies

Features

  • Built in support for Apache Kafka, RSMQ, ZeroMQ
  • Prometheus metrics for all operations such as function processing time, and consumption and production count.
  • Serialization in JSON and MessagePack
  • Data validation via Pydantic
  • Optionally configured integration with dead-letter-queues
  • Extendible connectors (consumer/producers), serializers, model handlers, and model handlers via plugins.

Getting started

Volley handles the process of consuming/producing by providing developers with extendible interfaces and handlers:

  • connectors - consumer and producer interfaces which define how the application should read messages, write messages, and what actions to take when a message is successfully or fails processing.
  • serializers - handlers and interface which describe the behavior for reading an byte objects from connectors. For example, Json or MessagePack serializers.
  • model_handler - handler and interface which works very closely with serializers. Typically used to turn serialized data into a structured Python data model. Pydantic is Volley's most supported data_model and can handler serialization itself.
  • data_model - When your application receives data from a queue, what schema and object do you expect it in? The data_model is provided by the user. And the model_handler describes how to construct your data_model.

To demonstrate, let's create an application with two worker nodes. One consumes from Kafka, finds the maximum value in a list then publishes it to Redis. The other consumes the message from Redis - if the max value is > 10, it logs to console otherwise it constructs a new list and publishes to the same Kafka topic.

flowchart LR
A[(Kafka)] --> |consume| B[Worker 1]
B --> |publish| C[(Redis)]
C --> |consume| D[Worker 2]
D --> E{>10}
E --> | no | A
E --> | yes | F[Log to Console]

You can skip the details and just run make intro.start, which runs this example through ./example/intro/docker-compose.yml

  1. start Kafka and Redis instance
docker run -d -p 6379:6379 redis:latest
docker run -d -p 9092:9092 bashj79/kafka-kraft
  1. Configure the queues and data models. Let's put this in ./my_config.py.
# ./my_config.py
from typing import List, Tuple
from pydantic import BaseModel

from volley import Engine, QueueConfig

# define the schemas for the first and second worker nodes.
class InputMessage(BaseModel):
  my_values: List[float]

class OutputMessage(BaseModel):
  the_max: float

# define the configurations for the two queues, one in Kafka and the other in Redis.
queue_config = [
  QueueConfig(
    name="my-kafka-input",
    value="my.kafka.topic.name",
    profile="confluent",
    data_model=InputMessage,
    config={
      "group.id": "my.consumer.group",
      "bootstrap.servers": "localhost:9092",
    }
  ),
  QueueConfig(
    name="my-redis-output",
    value="my.redis.output.queue.name",
    profile="rsmq",
    data_model=OutputMessage,
    config={
      "host": "localhost",
      "port": 6379,
    }
  )
]
  1. Build the first worker node - consume from Kafka, find the max value, publish to Redis
# ./app_0.py
from typing import List, Tuple
from volley import Engine

from my_config import queue_config, InputMessage, OutputMessage
# the first node - reads from kafka and writes to redis
app_0 = Engine(
  app_name="app_0",
  input_queue="my-kafka-input",  # one input
  output_queues=["my-redis-output"],  # zero to many outputs
  queue_config=queue_config
)

@app_0.stream_app
def kafka_to_redis(msg: InputMessage) -> List[Tuple[str, OutputMessage]]:
  print(f"Received {msg.model_dump_json()}")
  max_val = max(msg.my_values)
  out = OutputMessage(the_max=max_val)
  print(out)
  return [("my-redis-output", out)]  # a list of one or many output targets and messages

if __name__ == "__main__":
  kafka_to_redis()
  1. Run the first application in a terminal
python app_0.py
  1. Build the second worker node - consume from Redis, determine if we log to console or recycle the message as a new list.
# ./app_1.py
from typing import List, Tuple, Union
from volley import Engine

from my_config import OutputMessage, queue_config, InputMessage

# the second node
app_1 = Engine(
  app_name="app_1",
  input_queue="my-redis-output",
  output_queues=["my-kafka-input"],
  queue_config=queue_config,
  metrics_port=None
)

@app_1.stream_app
def redis_to_kafka(msg: OutputMessage) -> Union[bool, List[Tuple[str, InputMessage]]]:
  print(f"The maximum: {msg.the_max}")
  if msg.the_max > 10:
    print("That's it, we are done!")
    return True
  else:
    out = InputMessage(my_values=[msg.the_max, msg.the_max+1, msg.the_max+2])
    return [("my-kafka-input", out)]  # a list of one or many output targets and messages

if __name__ == "__main__":
    redis_to_kafka()
  1. Run the second worker node in another terminal
python app_1.py
  1. Finally, let's manually publish a message to the input kafka topic:
from confluent_kafka import Producer
import json
producer = Producer({"bootstrap.servers": "localhost:9092"})
producer.produce(topic="my.kafka.topic.name", value=json.dumps({"my_values":[1,2,3]}))
producer.flush(5)

You should see the following in your two terminals

./app_0.py

Received {"my_values": [1.0, 2.0, 3.0]}
the_max=3.0
Received {"my_values": [3.0, 4.0, 5.0]}
the_max=5.0
Received {"my_values": [5.0, 6.0, 7.0]}
the_max=7.0
Received {"my_values": [7.0, 8.0, 9.0]}
the_max=9.0
Received {"my_values": [9.0, 10.0, 11.0]}
the_max=11.0

./app_1.py

The maximum: 3.0
The maximum: 5.0
The maximum: 7.0
The maximum: 9.0
The maximum: 11.0
That's it, we are done!

Complete example

Clone this repo and make run.example to see a complete example of:

  • consuming a message from a Kafka topic
  • producing to RSMQ
  • consuming from RSMQ and publishing to Kafka and Postgres using custom plugin for Postgres.

Contributing

See our contributing guide.

Thanks goes to great projects and these incredible people.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_volley-1.3.1.tar.gz (29.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_volley-1.3.1-py3-none-any.whl (34.3 kB view details)

Uploaded Python 3

File details

Details for the file py_volley-1.3.1.tar.gz.

File metadata

  • Download URL: py_volley-1.3.1.tar.gz
  • Upload date:
  • Size: 29.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.3 Linux/6.17.0-1018-azure

File hashes

Hashes for py_volley-1.3.1.tar.gz
Algorithm Hash digest
SHA256 bf6d15f2443cb4d6cb8e7cc624fe2ba60e6f8f81cf274f63886aedd586e4a67f
MD5 a54ffb0f54d53a18e786d0f38b590036
BLAKE2b-256 9b7656ca9318b1c602643c3f9f098067c48435beb2b764ed5ee208ca795d0a80

See more details on using hashes here.

File details

Details for the file py_volley-1.3.1-py3-none-any.whl.

File metadata

  • Download URL: py_volley-1.3.1-py3-none-any.whl
  • Upload date:
  • Size: 34.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.3 Linux/6.17.0-1018-azure

File hashes

Hashes for py_volley-1.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 eedfa7edb49feabe758b653e677ca601a0b34f7e5ebf76807740ee91a879c0e8
MD5 8ca62085c30a2d8328e819938ec013ad
BLAKE2b-256 b8bf1e0bd62ddf8b3a723f0f9c0b96ddb6619b8ba79bb55c8b5c78617bddf29d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page