No project description provided
Project description
Async Service
Install
pip install "async_processor[sqs]"
Quick start
Write the Processor
app.py
from async_processor import (
InputMessageInterface,
Processor,
WorkerConfig,
SQSInputConfig,
SQSOutputConfig,
)
class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessageInterface) -> int:
body = input_message.get_body()
return body["x"] * body["y"]
app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=SQSInputConfig(
queue_url="YOUR_INPUT_SQS_URL",
visibility_timeout=2,
),
output_config=SQSOutputConfig(queue_url="YOUR_OUTPUT_SQS_URL"),
),
)
Run the app
gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
Output:
✦6 ❯ gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
[2023-08-17 16:10:33 +0530] [78736] [INFO] Starting gunicorn 21.2.0
[2023-08-17 16:10:33 +0530] [78736] [INFO] Listening at: http://127.0.0.1:8000 (78736)
[2023-08-17 16:10:33 +0530] [78736] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2023-08-17 16:10:33 +0530] [78738] [INFO] Booting worker with pid: 78738
[2023-08-17 16:10:33 +0530] [78738] [INFO] Started server process [78738]
[2023-08-17 16:10:33 +0530] [78738] [INFO] Waiting for application startup.
2023-08-17 16:10:33,764 - app - INFO - Invoking the processor init method
2023-08-17 16:10:33,765 - app - INFO - Processor init method execution completed
2023-08-17 16:10:33,765 - app - INFO - Starting processor runner
2023-08-17 16:10:34,461 - app - INFO - Started processor runner
2023-08-17 16:10:34,462 - app - INFO - Polling messages
[2023-08-17 16:10:34 +0530] [78738] [INFO] Application startup complete.
Send a synchronus process request
curl 'http://localhost:8000/process' -H 'Content-Type: application/json' -d '{"request_id": "abc", "body": {"x": 1, "y": 2}}'
Output:
❯ curl 'http://localhost:8000/process' -H 'Content-Type: application/json' -d '{"request_id": "abc", "body": {"x": 1, "y": 2}}'
{"request_id":"abc","status":"SUCCESS","body":2,"error":null}
- A FastAPI documentation dashboard will be available on http://localhost
Send an asynchronus process request
send_async_request.py
import json
import uuid
from async_processor import InputMessageV2, OutputMessage, ProcessStatus
import boto3
def send_request(input_sqs_url: str, output_sqs_url: str):
sqs = boto3.client("sqs")
request_id = str(uuid.uuid4())
sqs.send_message(
QueueUrl=input_sqs_url,
MessageBody=json.dumps(
InputMessageV2(tfy_request_id=request_id, x=1, y=2).dict()
),
)
while True:
response = sqs.receive_message(
QueueUrl=output_sqs_url, MaxNumberOfMessages=1, WaitTimeSeconds=19
)
if "Messages" not in response:
continue
msg = response["Messages"][0]
response = OutputMessage(**json.loads(msg["Body"]))
if ProcessStatus[response.status] is not ProcessStatus.SUCCESS:
raise Exception(f"processing failed: {response.error}")
print(response)
break
if __name__ == "__main__":
send_request(input_sqs_url="YOUR_INPUT_SQS_URL", output_sqs_url="YOUR_OUTPUT_SQS_URL")
Run the above python script
python send_async_request.py
Output:
❯ python send_async_request.py
request_id='46a4ebc6-afdb-46a0-8587-ba29abf0f0d4' status='SUCCESS' body=2 error=None
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
async_processor-1.0.0.tar.gz
(24.3 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_processor-1.0.0.tar.gz.
File metadata
- Download URL: async_processor-1.0.0.tar.gz
- Upload date:
- Size: 24.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8a88872c1a047f59074662823ab1ae056eff1bd1f8d46690fdc9d42d820b00b
|
|
| MD5 |
4bb793cb5cf95c24c3022ca158e0c5c4
|
|
| BLAKE2b-256 |
9fa93ea64f6758f3c484ac58d9fda75755037b16e8eb968e079155bb1e0afd5d
|
File details
Details for the file async_processor-1.0.0-py3-none-any.whl.
File metadata
- Download URL: async_processor-1.0.0-py3-none-any.whl
- Upload date:
- Size: 31.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.22
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ce6c10123c8c88d5028c5c46fd5e7db9be42ac37b61f26353669e8fcbfc74a04
|
|
| MD5 |
5dd68d1736f110b162b4dba05b7cc6c5
|
|
| BLAKE2b-256 |
170a20d46bec9af370a71848c1c57cbe0f43ba563ffa6661f633c1e7c28d9e16
|