Skip to main content

Asynchronous message dispatcher for concurrent tasks processing

Project description

Circular Image

pyinsole is an asynchronous message dispatcher inpired by loafer designed to provide a flexible and efficient way to consume messages from Amazon SQS queues. The pyinsole simplifies the process of integrating with SQS by offering multiple consumption strategies, allowing you to choose the best approach for your application's needs.


💻 Usage

The script defines an asynchronous message handler function (my_handler) that will be invoked whenever a message is received from the SQS queue. The SQSRoute class is used to route messages from the example-queue to the handler.

Example Code

Here’s the main code that processes messages from the example-queue. The script will listen for messages on the example-queue, and for each message received, it will print the message content, associated metadata, and any additional keyword arguments.

import os

from pyinsole import Manager
from pyinsole.ext.aws import SQSRoute

async def my_handler(message: dict, metadata: dict, **kwargs):
    print(f"message={message}, metadata={metadata}, kwargs={kwargs}")
    return True

provider_options = {
    "endpoint_url": os.getenv("AWS_ENDPOINT_URL"),
    "options": {
        "MaxNumberOfMessages": 10,
        "WaitTimeSeconds": os.getenv("AWS_WAIT_TIME_SECONDS", 20),
    },
}

routes = [
    SQSRoute('example-queue', handler=my_handler, provider_options=provider_options),
]

if __name__ == '__main__':
    manager = Manager(routes)
    manager.run()

Or you can use class based handlers if you prefer:

import os

from pyinsole import Manager
from pyinsole.ext.aws import SQSRoute
from pyinsole.ext.handlers import AsyncHandler

class MyHandler(AsyncHandler):
    async def process(self, message, metadata, **kwargs) -> bool:
        print(f"message={message}, metadata={metadata}, kwargs={kwargs}")
        return  True

provider_options = {
    "endpoint_url": os.getenv("AWS_ENDPOINT_URL"),
    "options": {
        "MaxNumberOfMessages": 10,
        "WaitTimeSeconds": os.getenv("AWS_WAIT_TIME_SECONDS", 20),
    },
}

routes = [
    SQSRoute('example-queue', handler=MyHandler(), provider_options=provider_options),
]

if __name__ == '__main__':
    manager = Manager(routes)
    manager.run()

Running the Script

This setup allows you to easily process messages from an SQS queue using the pyinsole library. You can modify the my_handler function to implement your specific message processing logic.

  1. Start LocalStack (or ensure you have access to AWS SQS). If you are using LocalStack, make sure it's running and the example-queue is created:

    aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name example-queue
    
  2. Run the script:

    python your_script.py
    
  3. Push some messages:

    aws --endpoint-url=http://localhost:4566 sqs send-message --queue-url http://localhost:4566/000000000000/example-queue --message-body "Your message body"
    

🎯 Roadmap

You can find the project roadmap here.

This document outlines future improvements, features, and tasks planned for the project.


🫱🏻‍🫲🏽 How to contribute

We welcome contributions of all kinds to make pyinsole better! To contribute to the project, follow these steps:

  1. Fork the repository: Click on the "Fork" button at the top right of the repository page.

  2. Clone your fork:

    git clone https://github.com/edopneto/pyinsole
    cd pyinsole
    
  3. Create a new branch: It's best practice to create a feature branch for your changes.

    git checkout -b feature/your-feature-name
    
  4. Make your changes: Work on your feature, bug fix, or documentation improvement.

  5. Test your changes: Ensure everything is working as expected.

  6. Commit your changes:

    git add .
    git commit -m "Add a brief message describing your changes"
    
  7. Push to your branch:

    git push origin feature/your-feature-name
    
  8. Open a Pull Request: Go to the repository on GitHub, and you’ll see a button to "Compare & Pull Request." Submit a pull request with a clear title and description of your changes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyinsole-0.2.0a2.tar.gz (706.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyinsole-0.2.0a2-py3-none-any.whl (15.0 kB view details)

Uploaded Python 3

File details

Details for the file pyinsole-0.2.0a2.tar.gz.

File metadata

  • Download URL: pyinsole-0.2.0a2.tar.gz
  • Upload date:
  • Size: 706.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pyinsole-0.2.0a2.tar.gz
Algorithm Hash digest
SHA256 e8cab032d10c84c73a2847995a8c246bb5cb2bf76debd235b0d868cda56124af
MD5 e12da833fa4f564d0b4eb4ca67508380
BLAKE2b-256 893dbfdaff193e6f7d8b2097abcde7baaa99c4ac779f3b3feaca9091899dedb9

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyinsole-0.2.0a2.tar.gz:

Publisher: release.yml on pyinsole/pyinsole

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyinsole-0.2.0a2-py3-none-any.whl.

File metadata

  • Download URL: pyinsole-0.2.0a2-py3-none-any.whl
  • Upload date:
  • Size: 15.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for pyinsole-0.2.0a2-py3-none-any.whl
Algorithm Hash digest
SHA256 54172988e8bd774701b6660bd1ad78ba890d25b26dc3bd16e1160bee60fd67bf
MD5 bc85c9a463878c95a88cf8e3accc1193
BLAKE2b-256 42cefe3e6d980978b61b893b9b793db716e449032702697e9899a25727918560

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyinsole-0.2.0a2-py3-none-any.whl:

Publisher: release.yml on pyinsole/pyinsole

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page