Skip to main content

SDK for interacting with Corva

Project description

Corva python-sdk is a framework for building Corva DevCenter apps.

Contents

Requirements

Python 3.8+

Installation

$ pip install corva-sdk

App types

There are three app types that you can build:

  1. stream - works with real-time data
  2. scheduled - works with data at defined schedules/intervals (e.g. once an hour)
  3. task - works with data on-demand

Note: Use type hints like those in the examples below for better support from editors and tools.

Stream

Stream apps can be of two types - time or depth based.

Stream time app
from corva import Api, Cache, StreamTimeEvent, stream


@stream
def stream_time_app(event: StreamTimeEvent, api: Api, cache: Cache):
    # get some data from the api
    drillstrings = api.get_dataset(
        provider='corva',
        dataset='data.drillstring',
        query={
            'asset_id': event.asset_id,
        },
        sort={'timestamp': 1},
        limit=1,
    )

    # do some calculations/modifications to the data
    for drillstring in drillstrings:
        drillstring['id'] = drillstring.pop('_id')

    # save the data to private collection
    api.post(path='api/v1/data/my_provider/my_collection/', data=drillstrings)
Stream depth app
from corva import Api, Cache, StreamDepthEvent, stream


@stream
def stream_depth_app(event: StreamDepthEvent, api: Api, cache: Cache):
    # get some data from the api
    drillstrings = api.get_dataset(
        provider='corva',
        dataset='data.drillstring',
        query={
            'asset_id': event.asset_id,
        },
        sort={'timestamp': 1},
        limit=1,
    )

    # do some calculations/modifications to the data
    for drillstring in drillstrings:
        drillstring['id'] = drillstring.pop('_id')

    # save the data to private collection
    api.post(path='api/v1/data/my_provider/my_collection/', data=drillstrings)

Scheduled

from corva import Api, Cache, ScheduledEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledEvent, api: Api, cache: Cache):
    print('Hello, World!')

Task

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    print('Hello, World!')

Event

An event is an object that contains data for an app function to process. event instance is inserted automatically as a first parameter to each app type. There are different event types for every app type: StreamTimeEvent, StreamDepthEvent, ScheduledEvent and TaskEvent.

Api

Apps might need to communicate with the Corva Platform API and Corva Data API. This SDK provides an Api class, which wraps the Python requests library and adds automatic authorization, convenient URL usage and reasonable timeouts to API requests. Api instance is inserted automatically as a second parameter to each app type. Api supports following HTTP methods: GET, POST, PATCH, PUT and DELETE.

Examples:

from corva import Api, scheduled


@scheduled
def my_app(event, api: Api, cache):
    # Corva API calls
    api.get('/v2/pads', params={'param': 'val'})
    api.post('/v2/pads', data={'key': 'val'})
    api.patch('/v2/pads/123', data={'key': 'val'})
    api.delete('/v2/pads/123')

    # Corva Data API calls
    api.get('/api/v1/data/provider/dataset/', params={'param': 'val'})
    api.post('/api/v1/data/provider/dataset/', data={'key': 'val'})
    api.put('/api/v1/data/provider/dataset/', data={'key': 'val'})
    api.delete('/api/v1/data/provider/dataset/')

Cache

Apps might need to share some data between runs. The sdk provides a Cache class, that allows you to store, load and do other operations with data. Cache instance is inserted automatically as a third parameter to stream and scheduled apps.
Note: task apps don't get a Cache parameter as they aren't meant to store data between invokes.

Cache uses a dict-like database, so the data is stored as key:value pairs. key should be of str type, and value can have any of the following types: str, int, float and bytes.

Examples:

  1. Store and load:

    from corva import Cache, scheduled
    
    
    @scheduled
    def my_app(event, api, cache: Cache):
        cache.store(key='key', value='val')
        # cache: {'key': 'val'}
    
        cache.load(key='key')  # returns 'val'
    
  2. Store and load multiple:

    from corva import Cache, scheduled
    
    
    @scheduled
    def my_app(event, api, cache: Cache):
        cache.store(mapping={'key1': 'val1', 'key2': 'val2'})
        # cache: {'key1': 'val1', 'key2': 'val2'}
    
        cache.load_all()  # returns {'key1': 'val1', 'key2': 'val2'}
    
  3. Delete and delete all:

    from corva import Cache, scheduled
    
    
    @scheduled
    def my_app(event, api, cache: Cache):
        cache.store(mapping={'key1': 'val1', 'key2': 'val2', 'key3': 'val3'})
        # cache: {'key1': 'val1', 'key2': 'val2', 'key3': 'val3'}
    
        cache.delete(keys=['key1'])
        # cache: {'key2': 'val2', 'key3': 'val3'}
    
        cache.delete_all()
        # cache: {}
    
  4. Expiry, ttl and exists. Note: by default Cache sets an expiry to 60 days.

    import time
    
    from corva import Cache, scheduled
    
    
    @scheduled
    def my_app(event, api, cache: Cache):
        cache.store(key='key', value='val', expiry=60)  # 60 seconds
        # cache: {'key': 'val'}
    
        cache.ttl()  # 60 seconds
        cache.pttl()  # 60000 milliseconds
        cache.exists()  # True
    
        time.sleep(60)  # wait 60 seconds
        # cache: {}
    
        cache.exists()  # False
        cache.ttl()  # -2: doesn't exist
        cache.pttl()  # -2: doesn't exist
    

Logging

As apps are executed very frequently (once a second or so), unlimited logging can lead to huge amounts of data.

The SDK provides a Logger object, which is a safe way for logging in apps.

The Logger is a logging.Logger instance and should be used like every other Python logger.

The Logger has following features:

  1. Log messages are injected with contextual information, which makes it easy to filter through logs while debugging issues.
  2. Log message length is limited. Too long messages are truncated to not exceed the limit. Set by LOG_THRESHOLD_MESSAGE_SIZE env variable. Default value is 1000 symbols or bytes.
  3. Number of log messages is limited. After reaching the limit logging gets disabled. Set by LOG_THRESHOLD_MESSAGE_COUNT env variable. Default value is 15.
  4. Logging level can be set using LOG_LEVEL env variable. Default value is INFO.

Logger usage example

from corva import Logger, stream


@stream
def stream_app(event, api, cache):
    Logger.debug('Debug message!')
    Logger.info('Info message!')
    Logger.warning('Warning message!')
    Logger.error('Error message!')
    try:
        0 / 0
    except ZeroDivisionError:
        Logger.exception('Exception message!')

Testing

Testing Corva applications is easy and enjoyable.

The SDK provides convenient tools for testing through pytest plugin. Write your tests using pytest to get the access to the plugin. To install the library run pip install pytest.

Stream time app example test

from corva import StreamTimeEvent, StreamTimeRecord, stream


@stream
def stream_app(event, api, cache):
    return 'Hello, World!'


def test_stream_time_app(app_runner):
    event = StreamTimeEvent(
        asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)]
    )

    result = app_runner(stream_app, event=event)

    assert result == 'Hello, World!'

Stream depth app example test

from corva import StreamDepthEvent, StreamDepthRecord, stream


@stream
def stream_app(event, api, cache):
    return 'Hello, World!'


def test_stream_depth_app(app_runner):
    event = StreamDepthEvent(
        asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)]
    )

    result = app_runner(stream_app, event=event)

    assert result == 'Hello, World!'

Scheduled app example test

from corva import ScheduledEvent, scheduled


@scheduled
def scheduled_app(event, api, cache):
    return 'Hello, World!'


def test_scheduled_app(app_runner):
    event = ScheduledEvent(asset_id=0, start_time=0, end_time=0)

    result = app_runner(scheduled_app, event=event)

    assert result == 'Hello, World!'

Task app example test

from corva import TaskEvent, task


@task
def task_app(event, api):
    return 'Hello, World!'


def test_task_app(app_runner):
    event = TaskEvent(asset_id=0, company_id=0)

    result = app_runner(task_app, event=event)

    assert result == 'Hello, World!'

Contributing

Set up the project

$ cd ~/YOUR_PATH/python-sdk
$ python -m venv venv
$ source venv/bin/activate
(venv) $ pip install -e .[dev]

Run tests

(venv) $ pytest

Run code linter

(venv) $ flake8

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

Unreleased

Added

  • corva.stream, corva.scheduled and corva.task app decorators. See readme for usage examples.
  • ScheduledEvent.company_id field.

Removed

  • corva.Corva class.

0.0.18 - 2021-04-16

Changed

  • Events are allowed to have extra fields.

0.0.17 - 2021-04-15

Added

  • corva.Logger object, that should be used for app logging.
  • LOG_THRESHOLD_MESSAGE_SIZE and LOG_THRESHOLD_MESSAGE_COUNT env variables, that should be used to configure logging.

0.0.16 - 2021-04-02

Added

  • app_runner fixture for testing apps.

Changed

  • StreamEvent was split into StreamTimeEvent and StreamDepthEvent, which have corresponding StreamTimeRecord and StreamDepthRecord records.
  • Deleted all unsued fields from ScheduledEvent, TaskEvent, StreamTimeEvent and StreamDepthEvent.

Removed

  • filter_mode parameter from Corva.stream. Filtering is now automatic.

0.0.15 - 2021-03-23

Added

  • Corva.task decorator for task apps.

0.0.14 - 2021-03-12

Added

  • Testing section to README.md.
  • Api.get_dataset method.

Changed

  • ScheduledEvent.schedule_end field is now optional.
  • ScheduledEvent.schedule_end and ScheduledEvent.schedule_start field types from datetime to int.

0.0.13 - 2021-03-04

Added

  • Tools for testing apps.

0.0.12 - 2021-02-11

Fixed

  • TaskEvent queue event parsing.

###Changed

  • StreamEvent must have at least one record.
  • StreamEvent and ScheduledEvent:
    • Added descriptions to fields.
    • Simplified event structures.

0.0.11 - 2021-02-10

Fixed

  • ScheduledEvent queue event parsing.

0.0.10 - 2021-02-05

Changed

  • Api class:
    • Deleted retries.
    • Responses do not use raise_for_status anymore.
    • Lowered default_timeout to 30 seconds.
    • Fixed url build exceptions on Windows.

0.0.9 - 2021-02-05

Removed

  • Obsolete StreamEvent fields: app_version.
  • Obsolete ScheduledEvent fields: app_version.

0.0.8 - 2021-02-05

Fixed

  • api_key extraction from context.

0.0.7 - 2021-02-04

Fixed

  • StreamEvent queue event parsing.

0.0.5 - 2021-02-04

Added

  • Required context parameter to Corva.
  • Documentation in README.md.

0.0.4 - 2021-01-20

Added

  • Corva class, which contains stream and scheduled decorators for stream and scheduled apps.

Removed

  • StreamApp and ScheduledApp classes.

0.0.3 - 2020-12-15

Fixed

  • Deployment to PyPI.

0.0.2 - 2020-12-15

###Added

  • StreamApp to build stream apps.
  • ScheduledApp to build scheduled apps.
  • TaskApp to build task apps.
  • Api class to access Platform and Data Corva APIs.
  • Cache class to share data between app invokes.
  • Event classes: StreamEvent, ScheduledEvent and TaskEvent.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

corva-sdk-1.0.0.tar.gz (22.0 kB view hashes)

Uploaded Source

Built Distribution

corva_sdk-1.0.0-py3-none-any.whl (21.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page