Skip to main content

Storage Queue Input-Output Manager for Azure Storage Queue

Project description

AZ Queue Manager

AZ Queue Manager is a tool to manage messages in Azure Storage Queue.

It is a command line tool that can be used to send and receive messages from a queue. It can also be used to create and delete queues.

Getting Started

Install AZQueueManager and Extensions

# Install the base package
pip install azqueuemanager

# Install the csv-input extension
pip install azqeueuemanager-csv # or some other extension

Setup your script

If this is your input file data.json

# data.json
[
    {"id": "msg_1", "text": "hello world"},
    {"id": "msg_2": "text": "hello universe"}
]

Create your script.

# test.py
from azqueuemanager import QueueManager, 
from azqueuemanager-json import JSONTransformer

We'll come back to the extension. For now, let's define our queue_client. This is the storage queue itself.

# test.py
CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=MYACCOUNT;AccountKey=MYKEY;EndpointSuffix=core.windows.net" #You'll need your own connection string
queue_name="MYQUEUENAME"

queue = StorageQueue(
    connection_string,
    queue_name,
)

Next we'll need to setup our extension. Every extension has a transformer. That transformer has modules for passing data into and out of the queue. The input transformer will take data and create one or more messages. The output transformer will take a message and create output for each record.

These inputs and outputs are pretty dynamic so you will need to look at the documentation for each extension to see what the input and output looks like.

The json extension the input transformer takes json_in_file (or json_data) and (optionally) a json_out_file. The transform_in will add messages from the json content. The transform_out will take the messages and create a json file with all the records returned.

Let's create an instance of the JSONTransformer and pass it to the QueueManager along with our queue.

# test.py

json_transform = JSONTransformer(
    json_in_file='data.json',
)

queue_manager = QueueManager(
    queue=queue
    input_transformer=json_transform,
    output_transformer=json_transform,
)

Now we can add or pull data to/from our Queue

Loading Data

We pass data in using the queue_messages method. This will use the data processed from the input_transformer and add it to the storage queue.

If there is no input_transformer then the queue_messages method will take a list of messages (as messages) and add them to the queue.

You can look at all the messages loaded into the queue using the list_messages method.

# test.py
# load data into the queue
queue_manager.queue_messages() 


# list the messages in the queue without popping them from the stack
queue_manager.list_messages() 

# >>> [
#    {id: 1234567890, content:"{'id': 'msg_1', 'text': 'hello world'}"},
# ...
# ]

Previewing Data

You can also preview the next record. The preview_message method will return the next message in the queue transformed without removing it from the queue.

This is designed to ensure that you're data is correct before pushing it to another service.

# test.py 

queue_manager.preview_message()

# >>> PREVIEW: message={...} -> transformed_message={'id': 'msg_1', 'text': 'hello world'}

Retrieving The Data

Use next_messages(count=n) to retrieve n messages from the queue and process them with the output_transformer. By default messages are not deleted from the queue, but you can delete them by setting delete_after=True in the method.

If you don't add a count, then it will return all the messages in the queue.

You can also process the next message ONLY by using next_message(delete_after=True). This may be helpful if you want to process the messages one at a time (if the messages have different key:values that may make data more compicated).

# test.py

# get messages from the queue
QueueManager.next_messages() 

# >>>
#	[
#		{"id": "msg_1", "text": "hello world"},
#		{"id": "msg_2": "text": "hello universe"}
#	]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azqueuemanager-0.0.6.tar.gz (4.7 kB view details)

Uploaded Source

Built Distribution

azqueuemanager-0.0.6-py3-none-any.whl (5.3 kB view details)

Uploaded Python 3

File details

Details for the file azqueuemanager-0.0.6.tar.gz.

File metadata

  • Download URL: azqueuemanager-0.0.6.tar.gz
  • Upload date:
  • Size: 4.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.8

File hashes

Hashes for azqueuemanager-0.0.6.tar.gz
Algorithm Hash digest
SHA256 e7c82578c0278f8fba15a2286909cd6d5b6eccf01a2668436bb04a02be9ee12e
MD5 ad8c4920f364eaf2e7b2f5be40a3b8b9
BLAKE2b-256 2acd486d0e167713dace436d1d2e13460d5f11ab011453368b3d9e326f63b448

See more details on using hashes here.

File details

Details for the file azqueuemanager-0.0.6-py3-none-any.whl.

File metadata

File hashes

Hashes for azqueuemanager-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 4d01153b82e6477d8366b2734d890be95c551703c4cbf53ada3077213615cbf6
MD5 205d173d5d6b94bcd44cc04ea11ddda4
BLAKE2b-256 bb8adeeb81c4ab0565cf1c952bfc0aeaf795a9eaceec5b486cfe01c5a2df04fd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page