No project description provided
Project description
Async Service
Install
pip install "async_processor[sqs]"
Quick start
Write the Processor
app.py
from async_processor import (
InputMessage,
Processor,
WorkerConfig,
SQSInputConfig,
SQSOutputConfig,
)
class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
return body["x"] * body["y"]
app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=SQSInputConfig(
queue_url="YOUR_INPUT_SQS_URL",
visibility_timeout=2,
),
output_config=SQSOutputConfig(queue_url="YOUR_OUTPUT_SQS_URL"),
),
)
Run the app
gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
Output:
✦6 ❯ gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
[2023-08-17 16:10:33 +0530] [78736] [INFO] Starting gunicorn 21.2.0
[2023-08-17 16:10:33 +0530] [78736] [INFO] Listening at: http://127.0.0.1:8000 (78736)
[2023-08-17 16:10:33 +0530] [78736] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2023-08-17 16:10:33 +0530] [78738] [INFO] Booting worker with pid: 78738
[2023-08-17 16:10:33 +0530] [78738] [INFO] Started server process [78738]
[2023-08-17 16:10:33 +0530] [78738] [INFO] Waiting for application startup.
2023-08-17 16:10:33,764 - app - INFO - Invoking the processor init method
2023-08-17 16:10:33,765 - app - INFO - Processor init method execution completed
2023-08-17 16:10:33,765 - app - INFO - Starting processor runner
2023-08-17 16:10:34,461 - app - INFO - Started processor runner
2023-08-17 16:10:34,462 - app - INFO - Polling messages
[2023-08-17 16:10:34 +0530] [78738] [INFO] Application startup complete.
Send a synchronus process request
curl 'http://localhost:8000/process' -H 'Content-Type: application/json' -d '{"request_id": "abc", "body": {"x": 1, "y": 2}}'
Output:
❯ curl 'http://localhost:8000/process' -H 'Content-Type: application/json' -d '{"request_id": "abc", "body": {"x": 1, "y": 2}}'
{"request_id":"abc","status":"SUCCESS","body":2,"error":null}
- A FastAPI documentation dashboard will be available on http://localhost
Send an asynchronus process request
send_async_request.py
import json
import uuid
from async_processor import InputMessage, OutputMessage, ProcessStatus
import boto3
def send_request(input_sqs_url: str, output_sqs_url: str):
sqs = boto3.client("sqs")
request_id = str(uuid.uuid4())
sqs.send_message(
QueueUrl=input_sqs_url,
MessageBody=json.dumps(
InputMessage(request_id=request_id, body={"x": 1, "y": 2}).dict()
),
)
while True:
response = sqs.receive_message(
QueueUrl=output_sqs_url, MaxNumberOfMessages=1, WaitTimeSeconds=19
)
if "Messages" not in response:
continue
msg = response["Messages"][0]
response = OutputMessage(**json.loads(msg["Body"]))
if ProcessStatus[response.status] is not ProcessStatus.SUCCESS:
raise Exception(f"processing failed: {response.error}")
print(response)
break
if __name__ == "__main__":
send_request(input_sqs_url="YOUR_INPUT_SQS_URL", output_sqs_url="YOUR_OUTPUT_SQS_URL")
Run the above python script
python send_async_request.py
Output:
❯ python send_async_request.py
request_id='46a4ebc6-afdb-46a0-8587-ba29abf0f0d4' status='SUCCESS' body=2 error=None
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
async_processor-0.1.17.tar.gz
(25.3 kB
view details)
Built Distribution
File details
Details for the file async_processor-0.1.17.tar.gz
.
File metadata
- Download URL: async_processor-0.1.17.tar.gz
- Upload date:
- Size: 25.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.20
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ae5b12af0adf5ee1a1ef08effdf07f86087947043e82001ebfdd0b4ad4cc6ca7 |
|
MD5 | 3848e9fc2d0d3226af9dbd1a89942753 |
|
BLAKE2b-256 | 8c6370ee8ac6cb0a34dd69a70f18cea378d9a64aa31202bf11a539084c63d332 |
File details
Details for the file async_processor-0.1.17-py3-none-any.whl
.
File metadata
- Download URL: async_processor-0.1.17-py3-none-any.whl
- Upload date:
- Size: 31.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.20
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0b857fbbadb298a8e558c6d1692ec4389af1857a802eed5a734e438c9d57a43a |
|
MD5 | 68a99e5b3b6fd84688b9ddeb23493e18 |
|
BLAKE2b-256 | 337ebe103e4d564ed2579f1a713ac8c9d6d81342dd64519d2585e268245cfaff |