No project description provided
Project description
Async Service
Install
pip install "async_processor[sqs]"
Quick start
Write the Processor
app.py
from async_processor import (
InputMessage,
Processor,
WorkerConfig,
SQSInputConfig,
SQSOutputConfig,
)
class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
return body["x"] * body["y"]
app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=SQSInputConfig(
queue_url="YOUR_INPUT_SQS_URL",
visibility_timeout=2,
),
output_config=SQSOutputConfig(queue_url="YOUR_OUTPUT_SQS_URL"),
),
)
Run the app
gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
Output:
✦6 ❯ gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
[2023-08-17 16:10:33 +0530] [78736] [INFO] Starting gunicorn 21.2.0
[2023-08-17 16:10:33 +0530] [78736] [INFO] Listening at: http://127.0.0.1:8000 (78736)
[2023-08-17 16:10:33 +0530] [78736] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2023-08-17 16:10:33 +0530] [78738] [INFO] Booting worker with pid: 78738
[2023-08-17 16:10:33 +0530] [78738] [INFO] Started server process [78738]
[2023-08-17 16:10:33 +0530] [78738] [INFO] Waiting for application startup.
2023-08-17 16:10:33,764 - app - INFO - Invoking the processor init method
2023-08-17 16:10:33,765 - app - INFO - Processor init method execution completed
2023-08-17 16:10:33,765 - app - INFO - Starting processor runner
2023-08-17 16:10:34,461 - app - INFO - Started processor runner
2023-08-17 16:10:34,462 - app - INFO - Polling messages
[2023-08-17 16:10:34 +0530] [78738] [INFO] Application startup complete.
Send a synchronus process request
curl 'http://localhost:8000/process' -H 'Content-Type: application/json' -d '{"request_id": "abc", "body": {"x": 1, "y": 2}}'
Output:
❯ curl 'http://localhost:8000/process' -H 'Content-Type: application/json' -d '{"request_id": "abc", "body": {"x": 1, "y": 2}}'
{"request_id":"abc","status":"SUCCESS","body":2,"error":null}
- A FastAPI documentation dashboard will be available on http://localhost
Send an asynchronus process request
send_async_request.py
import json
import uuid
from async_processor import InputMessage, OutputMessage, ProcessStatus
import boto3
def send_request(input_sqs_url: str, output_sqs_url: str):
sqs = boto3.client("sqs")
request_id = str(uuid.uuid4())
sqs.send_message(
QueueUrl=input_sqs_url,
MessageBody=json.dumps(
InputMessage(request_id=request_id, body={"x": 1, "y": 2}).dict()
),
)
while True:
response = sqs.receive_message(
QueueUrl=output_sqs_url, MaxNumberOfMessages=1, WaitTimeSeconds=19
)
if "Messages" not in response:
continue
msg = response["Messages"][0]
response = OutputMessage(**json.loads(msg["Body"]))
if ProcessStatus[response.status] is not ProcessStatus.SUCCESS:
raise Exception(f"processing failed: {response.error}")
print(response)
break
if __name__ == "__main__":
send_request(input_sqs_url="YOUR_INPUT_SQS_URL", output_sqs_url="YOUR_OUTPUT_SQS_URL")
Run the above python script
python send_async_request.py
Output:
❯ python send_async_request.py
request_id='46a4ebc6-afdb-46a0-8587-ba29abf0f0d4' status='SUCCESS' body=2 error=None
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
async_processor-0.1.18.tar.gz
(25.3 kB
view details)
Built Distribution
File details
Details for the file async_processor-0.1.18.tar.gz
.
File metadata
- Download URL: async_processor-0.1.18.tar.gz
- Upload date:
- Size: 25.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.20
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5e6d4670b3094ee725f38d7d191a308310c96ed2c41aace4e90ac94506a12e79 |
|
MD5 | d41b304f48a1863e474756d219bcbc91 |
|
BLAKE2b-256 | 9111f91d0b3158ae1fa11770ea1bf791162b97d7df3873a2c58e561b9bfbb578 |
File details
Details for the file async_processor-0.1.18-py3-none-any.whl
.
File metadata
- Download URL: async_processor-0.1.18-py3-none-any.whl
- Upload date:
- Size: 31.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.20
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b17863c009e0fabd5998973205e7710facb38ab03a35bc0710f9d0cecf2aafa7 |
|
MD5 | 1d976bbafa0fb730198292a42fefbee1 |
|
BLAKE2b-256 | 03754fd4216e4c3d27b15c4cbce41490042c4f1aa4b9e75a2b9bcd4d88e1fa56 |