Wrapper for interacting with Nanowire platform
Project description
nanowire-service-py
Usage
This library is designed for tight integration with Nanowire platform (created by Spotlight Data).
The primary code logic should be placed in a sub-class of Handler
. User is expected to implement validate_args
as well as handle_body
methods:
import os
from dotenv import load_dotenv
from fastapi import FastAPI, Response
from pydantic import BaseModel, validator
from typing import Any, List, Optional
import pandas as pd
from nanowire_service_py import Instance, Handler, Environment, TaskBody, RuntimeError
from toolbox import ClusterTool
load_dotenv()
allowed_methods = ["HDBSCAN", "DBSCAN"]
# pydantic used to verify function body
class Arguments(BaseModel):
contentUrl: str
textCol: str
indexCol: str
clusterSize: float = 0.2
nLabels: int = 10
method: str = "DBSCAN"
customStops: Optional[List[str]] = []
maxVocab: int = 5000
memSave: bool = False
withAnomalous: bool = False
@validator('method')
def method_check(cls, method):
if method not in allowed_methods:
raise ValueError("Method has to be one of: {}, received: {}".format(",".join(allowed_methods), method))
return method
# Our custom handler
class MyHandler(Handler):
def __init__(self, *args):
super().__init__(*args)
self.cluster_tool = ClusterTool(self.logger)
def validate_args(self, args: Any) -> Arguments:
return Arguments(**args)
def handle_body(self, args: Arguments, meta: Any):
df = pd.read_csv(args.contentUrl, dtype='unicode')
if args.textCol not in df.columns:
raise RuntimeError("Could not find text column '{}' in CSV".format(args.textCol), { "origin": "CSV"})
if args.indexCol not in df.columns:
raise RuntimeError("Could not find index column '{}' in CSV".format(args.indexCol), { "origin": "CSV"})
result = self.cluster_tool.main(df, args)
return (result, meta)
# Always handled by the library, pass environment directly
instance = Instance(Environment(**os.environ))
# Inherit worker specifications and logs from instance
handler = MyHandler(instance.setup(), instance.log_level)
# Router
app = FastAPI()
# Let's DAPR know which topics should be subscribed to
@app.get("/dapr/subscribe")
def subscribe():
return instance.subscriptions()
# Primary endpoint, where request will be delivered to
# TaskBody type here verifies the post body
@app.post("/subscription")
def subscription(body: TaskBody, response: Response):
task_id = body.data.id
handler.handle_request(task_id, response)
# Return empty body so dapr doesn't freak out
return {}
Assuming the filename is main.py
the server can then be started via uvicorn main:app
Handling failure
The primary validation happens within validate_args
function by pydantic
models. This is where anything related to input should be checked.
If at any point you want the current task to fail, raise RuntimeError
. This will indicate the library, that we should fail and not retry again. For example:
- CSV missing columns or having incorrect text format
- Not enough data passed
Anything else, that raises unexpected exception should be retried automatically.
Contributing
Read CONTRIBUTING.md
🛡 License
This project is licensed under the terms of the MIT
license. See LICENSE for more details.
Credits
This project was generated with python-package-template
.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for nanowire-service-py-0.1.4.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | b5c0b1e493b647e345b6a524cd355bd902d135a4c6390996113b7cbd361d6f1d |
|
MD5 | 495943385603fab6737912cfd8023faf |
|
BLAKE2b-256 | a2a741d80e6c4129e0a2a504702839986933fbd256729e80063ac5362438c88e |
Hashes for nanowire_service_py-0.1.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 542461a9a4ca3a61ad2329ceeae32b20d03ede1d617e4f314fd1cd277b188309 |
|
MD5 | efde3c3c82243296a1a5382c7f157712 |
|
BLAKE2b-256 | 16b8156916a211e59d7f01d8d010429625f6dd84d0108e8dca974e4210a29c8f |