Skip to main content

Wrapper for interacting with Nanowire platform

Project description

nanowire-service-py

Build status Python Version Dependencies Status

Code style: black Security: bandit Pre-commit Semantic Versions License

Wrapper for interacting with Nanowire platform

Usage

Install the library via pip install nanowire-service-py, or by adding it to requirements file and running pip install -r requirements.txt

This library is designed for tight integration with Nanowire platform (created by Spotlight Data).

The library does not have a hardcode requirement for a specific web server, so a another framework like django or flask could be utilised, however, I'd recommend using fastapi due to it's simplicity and speed

Environment

The following environment variables need to be supplied:

class Environment(BaseModel):
    # Dapr spect
    DAPR_HTTP_PORT: int
    DAPR_APP_ID: str
    PUB_SUB: str
    # Where /pending requests get made
    SCHEDULER_PUB_SUB: str
    # Dapr related properties
    # Whether we should wait for DAPR server to be active before loading
    NO_WAIT: bool = False
    # Whether the service should publish to schduler
    # This shouldn't be done if we have an "executor" worker
    NO_PUBLISH: bool = False

    LOG_LEVEL: Union[str, int] = "DEBUG"
    # Postgres connection details
    POSTGRES_URL: str
    POSTGRES_SCHEMA: str
    # Utilised for healthchecks and identifying the pod
    SERVICE_ID: str = str(uuid.uuid4())

This will be verified on service startup.

Entrypoint

The primary code logic should be placed in a sub-class of BaseHandler. User is expected to implement validate_args as well as handle_body methods:

import os
from dotenv import load_dotenv
from fastapi import FastAPI, Response

from pydantic import BaseModel, validator
from typing import Any, List, Optional

import pandas as pd

from nanowire_service_py import BaseHandler, create, TaskBody
from toolbox import ClusterTool

load_dotenv()

allowed_methods = ["HDBSCAN", "DBSCAN"]
# pydantic used to verify function body
class Arguments(BaseModel):
    contentUrl: str
    textCol: str
    indexCol: str
    clusterSize: float = 0.2
    nLabels: int = 10
    method: str = "DBSCAN"
    customStops: Optional[List[str]] = []
    maxVocab: int = 5000
    memSave: bool = False
    withAnomalous: bool = False

    @validator('method')
    def method_check(cls, method):
        if method not in allowed_methods:
            raise ValueError("Method has to be one of: {}, received: {}".format(",".join(allowed_methods), method))
        return method

# Our custom handler
class MyHandler(BaseHandler):
    def __init__(self, *args):
        super().__init__(*args)
        self.cluster_tool = ClusterTool(self.logger)

    def validate_args(self, args: Any, task_id: str) -> Arguments:
        return Arguments(**args)

    def handle_body(self, args: Arguments, meta: Any, task_id: str):
        df = pd.read_csv(args.contentUrl, dtype='unicode')

        if args.textCol not in df.columns:
            raise RuntimeError("Could not find text column '{}' in CSV".format(args.textCol), { "origin": "CSV"})

        if args.indexCol not in df.columns:
            raise RuntimeError("Could not find index column '{}' in CSV".format(args.indexCol), { "origin": "CSV"})

        result = self.cluster_tool.main(df, args)
        return (result, meta)

# Always handled by the library, pass environment directly
executor = create(os.environ, MyHandler)

app = FastAPI()

# Let's DAPR know which topics should be subscribed to
@app.get("/dapr/subscribe")
def subscribe():
    return executor.subscriptions

# Primary endpoint, where request will be delivered to
# TaskBody type here verifies the post body
@app.post("/subscription")
def subscription(body: TaskBody, response: Response):
    status = executor.handle_request(body.data.id)
    response.status_code = status
    # Return empty body so dapr doesn't freak out
    return {}

# Start heartbeat thread, which will periodically send updates to database
executor.heartbeat()

Assuming the filename is main.py the server can then be started via uvicorn main:app

Handling failure

The primary validation happens within validate_args function by pydantic models. This is where anything related to input should be checked.

If at any point you want the current task to fail, raise RuntimeError or Exception. This will indicate the library, that we should fail and not retry again. For example:

  • CSV missing columns or having incorrect text format
  • Not enough data passed

Anything else that raises for a retryable error, should be raised via RetryError.

Versioning

Versioning is based on semver, however, it primarily applies to the create function exposed by the package. If you're using any of the internal system parts, make sure to validate before updating the version.

Contributing

Read CONTRIBUTING.md

🛡 License

License

This project is licensed under the terms of the MIT license. See LICENSE for more details.

Credits

This project was generated with python-package-template.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nanowire-service-py-3.0.2.tar.gz (11.0 kB view details)

Uploaded Source

Built Distribution

nanowire_service_py-3.0.2-py2.py3-none-any.whl (9.6 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file nanowire-service-py-3.0.2.tar.gz.

File metadata

  • Download URL: nanowire-service-py-3.0.2.tar.gz
  • Upload date:
  • Size: 11.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.5 Linux/5.11.0-1022-azure

File hashes

Hashes for nanowire-service-py-3.0.2.tar.gz
Algorithm Hash digest
SHA256 e08a8c1ad6895718faf5f43ba9b7f68e5665bf3cb08719a5f046fc1b18310a75
MD5 30e72f7edcef6cd4a491d5b53b7150ff
BLAKE2b-256 95617a6a65eecf46a684cb1bdd200bada897de31d03df79f53f4e1a54de79af6

See more details on using hashes here.

File details

Details for the file nanowire_service_py-3.0.2-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for nanowire_service_py-3.0.2-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 e30a083d3a44c8b38b26188d2b7a86553fa3c6959217e134518a94c6b7bc39d8
MD5 7c73814b4c4f54d745bc9b45d14e9332
BLAKE2b-256 ff665fdbfc80a4bf6f3c35331d29d5cc2467bb0923a9c0053cab81f330c22b44

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page