Skip to main content

Asynchronous message dispatcher for concurrent tasks processing

Project description

Circular Image

pyinsole is an asynchronous message dispatcher inpired by loafer designed to provide a flexible and efficient way to consume messages from Amazon SQS queues. The pyinsole simplifies the process of integrating with SQS by offering multiple consumption strategies, allowing you to choose the best approach for your application's needs.


💻 Usage

The script defines an asynchronous message handler function (my_handler) that will be invoked whenever a message is received from the SQS queue. The SQSRoute class is used to route messages from the example-queue to the handler.

Example Code

Here’s the main code that processes messages from the example-queue. The script will listen for messages on the example-queue, and for each message received, it will print the message content, associated metadata, and any additional keyword arguments.

import os

from pyinsole import Manager
from pyinsole.ext.aws import SQSRoute

async def my_handler(message: dict, metadata: dict, **kwargs):
    print(f"message={message}, metadata={metadata}, kwargs={kwargs}")
    return True

provider_options = {
    "endpoint_url": os.getenv("AWS_ENDPOINT_URL"),
    "options": {
        "MaxNumberOfMessages": 10,
        "WaitTimeSeconds": os.getenv("AWS_WAIT_TIME_SECONDS", 20),
    },
}

routes = [
    SQSRoute('example-queue', handler=my_handler, provider_options=provider_options),
]

if __name__ == '__main__':
    manager = Manager(routes)
    manager.run()

Or you can use class based handlers if you prefer:

import os

from pyinsole import Manager
from pyinsole.ext.aws import SQSRoute
from pyinsole.ext.handlers import AsyncHandler

class MyHandler(AsyncHandler):
    async def process(self, message, metadata, **kwargs) -> bool:
        print(f"message={message}, metadata={metadata}, kwargs={kwargs}")
        return  True

provider_options = {
    "endpoint_url": os.getenv("AWS_ENDPOINT_URL"),
    "options": {
        "MaxNumberOfMessages": 10,
        "WaitTimeSeconds": os.getenv("AWS_WAIT_TIME_SECONDS", 20),
    },
}

routes = [
    SQSRoute('example-queue', handler=MyHandler(), provider_options=provider_options),
]

if __name__ == '__main__':
    manager = Manager(routes)
    manager.run()

Running the Script

This setup allows you to easily process messages from an SQS queue using the pyinsole library. You can modify the my_handler function to implement your specific message processing logic.

  1. Start LocalStack (or ensure you have access to AWS SQS). If you are using LocalStack, make sure it's running and the example-queue is created:

    aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name example-queue
    
  2. Run the script:

    python your_script.py
    
  3. Push some messages:

    aws --endpoint-url=http://localhost:4566 sqs send-message --queue-url http://localhost:4566/000000000000/example-queue --message-body "Your message body"
    

🎯 Roadmap

You can find the project roadmap here.

This document outlines future improvements, features, and tasks planned for the project.


🫱🏻‍🫲🏽 How to contribute

We welcome contributions of all kinds to make pyinsole better! To contribute to the project, follow these steps:

  1. Fork the repository: Click on the "Fork" button at the top right of the repository page.

  2. Clone your fork:

    git clone https://github.com/edopneto/pyinsole
    cd pyinsole
    
  3. Create a new branch: It's best practice to create a feature branch for your changes.

    git checkout -b feature/your-feature-name
    
  4. Make your changes: Work on your feature, bug fix, or documentation improvement.

  5. Test your changes: Ensure everything is working as expected.

  6. Commit your changes:

    git add .
    git commit -m "Add a brief message describing your changes"
    
  7. Push to your branch:

    git push origin feature/your-feature-name
    
  8. Open a Pull Request: Go to the repository on GitHub, and you’ll see a button to "Compare & Pull Request." Submit a pull request with a clear title and description of your changes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyinsole-0.2.0a3.tar.gz (706.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyinsole-0.2.0a3-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file pyinsole-0.2.0a3.tar.gz.

File metadata

  • Download URL: pyinsole-0.2.0a3.tar.gz
  • Upload date:
  • Size: 706.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pyinsole-0.2.0a3.tar.gz
Algorithm Hash digest
SHA256 e0448f6c9286745af6c9f9876a643d91a3b74641858e08934a84157142632699
MD5 2f802f60cb0586226339dc789022b16e
BLAKE2b-256 252ec4a47e45d566d2d934df1c3ddc25e89f8b349c90afcbf6e30e3d240bdeae

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyinsole-0.2.0a3.tar.gz:

Publisher: release.yml on pyinsole/pyinsole

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyinsole-0.2.0a3-py3-none-any.whl.

File metadata

  • Download URL: pyinsole-0.2.0a3-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pyinsole-0.2.0a3-py3-none-any.whl
Algorithm Hash digest
SHA256 94b2ab743f537c4e928a3e9b3222f19772d6d33fb4effdd5d2bdd0b8ba24cb9d
MD5 41ca41bc5f3154a5e806b2b6d9dc9795
BLAKE2b-256 4200d19286fb7e34e744cd86910740f2924b7fb11054dc16f4a169e7a02903a1

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyinsole-0.2.0a3-py3-none-any.whl:

Publisher: release.yml on pyinsole/pyinsole

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page