SDK for interacting with Corva
Project description
Corva python-sdk is a framework for building Corva DevCenter apps.
Contents
Requirements
Python 3.8+
Installation
$ pip install corva-sdk
App types
There are three app types that you can build:
stream
- works with real-time datascheduled
- works with data at defined schedules/intervals (e.g. once an hour)task
- works with data on-demand
Note: Use type hints like those in the examples below for better support from editors and tools.
Stream
Stream apps can be of two types - time or depth based.
Stream time app
from corva import Api, Cache, StreamTimeEvent, stream
@stream
def stream_time_app(event: StreamTimeEvent, api: Api, cache: Cache):
# get some data from the api
drillstrings = api.get_dataset(
provider='corva',
dataset='data.drillstring',
query={
'asset_id': event.asset_id,
},
sort={'timestamp': 1},
limit=1,
)
# do some calculations/modifications to the data
for drillstring in drillstrings:
drillstring['id'] = drillstring.pop('_id')
# save the data to private collection
api.post(path='api/v1/data/my_provider/my_collection/', data=drillstrings)
Stream depth app
from corva import Api, Cache, StreamDepthEvent, stream
@stream
def stream_depth_app(event: StreamDepthEvent, api: Api, cache: Cache):
# get some data from the api
drillstrings = api.get_dataset(
provider='corva',
dataset='data.drillstring',
query={
'asset_id': event.asset_id,
},
sort={'timestamp': 1},
limit=1,
)
# do some calculations/modifications to the data
for drillstring in drillstrings:
drillstring['id'] = drillstring.pop('_id')
# save the data to private collection
api.post(path='api/v1/data/my_provider/my_collection/', data=drillstrings)
Scheduled
from corva import Api, Cache, ScheduledEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledEvent, api: Api, cache: Cache):
print('Hello, World!')
Task
from corva import Api, TaskEvent, task
@task
def task_app(event: TaskEvent, api: Api):
print('Hello, World!')
Event
An event is an object that contains data for an app function to process.
event
instance is inserted automatically as a first parameter to each app type.
There are different event types for every app type:
StreamTimeEvent
, StreamDepthEvent
, ScheduledEvent
and TaskEvent
.
Api
Apps might need to communicate with the
Corva Platform API and Corva Data API.
This SDK provides an Api
class, which wraps the Python requests
library and adds automatic authorization, convenient URL usage and
reasonable timeouts to API requests.
Api
instance is inserted automatically as a second parameter to each app type.
Api
supports following HTTP methods: GET
, POST
, PATCH
, PUT
and DELETE
.
Examples:
from corva import Api, scheduled
@scheduled
def my_app(event, api: Api, cache):
# Corva API calls
api.get('/v2/pads', params={'param': 'val'})
api.post('/v2/pads', data={'key': 'val'})
api.patch('/v2/pads/123', data={'key': 'val'})
api.delete('/v2/pads/123')
# Corva Data API calls
api.get('/api/v1/data/provider/dataset/', params={'param': 'val'})
api.post('/api/v1/data/provider/dataset/', data={'key': 'val'})
api.put('/api/v1/data/provider/dataset/', data={'key': 'val'})
api.delete('/api/v1/data/provider/dataset/')
Cache
Apps might need to share some data between runs. The sdk provides a Cache
class, that allows you to store, load and do
other operations with data.
Cache
instance is inserted automatically as a third parameter to stream
and scheduled
apps.
Note: task
apps don't get a Cache
parameter as they aren't meant to store data between invokes.
Cache
uses a dict-like database, so the data is stored as key:value
pairs.
key
should be of str
type, and value
can have any of the following types: str
, int
, float
and bytes
.
Examples:
-
Store and load:
from corva import Cache, scheduled @scheduled def my_app(event, api, cache: Cache): cache.store(key='key', value='val') # cache: {'key': 'val'} cache.load(key='key') # returns 'val'
-
Store and load multiple:
from corva import Cache, scheduled @scheduled def my_app(event, api, cache: Cache): cache.store(mapping={'key1': 'val1', 'key2': 'val2'}) # cache: {'key1': 'val1', 'key2': 'val2'} cache.load_all() # returns {'key1': 'val1', 'key2': 'val2'}
-
Delete and delete all:
from corva import Cache, scheduled @scheduled def my_app(event, api, cache: Cache): cache.store(mapping={'key1': 'val1', 'key2': 'val2', 'key3': 'val3'}) # cache: {'key1': 'val1', 'key2': 'val2', 'key3': 'val3'} cache.delete(keys=['key1']) # cache: {'key2': 'val2', 'key3': 'val3'} cache.delete_all() # cache: {}
-
Expiry, ttl and exists. Note: by default
Cache
sets an expiry to 60 days.import time from corva import Cache, scheduled @scheduled def my_app(event, api, cache: Cache): cache.store(key='key', value='val', expiry=60) # 60 seconds # cache: {'key': 'val'} cache.ttl() # 60 seconds cache.pttl() # 60000 milliseconds cache.exists() # True time.sleep(60) # wait 60 seconds # cache: {} cache.exists() # False cache.ttl() # -2: doesn't exist cache.pttl() # -2: doesn't exist
Logging
As apps are executed very frequently (once a second or so), unlimited logging can lead to huge amounts of data.
The SDK provides a Logger
object,
which is a safe way for logging in apps.
The Logger
is a logging.Logger
instance
and should be used like every other Python logger.
The Logger
has following features:
- Log messages are injected with contextual information, which makes it easy to filter through logs while debugging issues.
- Log message length is limited.
Too long messages are truncated to not exceed the limit.
Set by
LOG_THRESHOLD_MESSAGE_SIZE
env variable. Default value is1000
symbols or bytes. - Number of log messages is limited.
After reaching the limit logging gets disabled.
Set by
LOG_THRESHOLD_MESSAGE_COUNT
env variable. Default value is15
. - Logging level can be set using
LOG_LEVEL
env variable. Default value isINFO
.
Logger usage example
from corva import Logger, stream
@stream
def stream_app(event, api, cache):
Logger.debug('Debug message!')
Logger.info('Info message!')
Logger.warning('Warning message!')
Logger.error('Error message!')
try:
0 / 0
except ZeroDivisionError:
Logger.exception('Exception message!')
Testing
Testing Corva applications is easy and enjoyable.
The SDK provides convenient tools for testing through
pytest
plugin.
Write your tests using pytest
to get the access to the plugin.
To install the library run pip install pytest
.
Stream time app example test
from corva import StreamTimeEvent, StreamTimeRecord, stream
@stream
def stream_app(event, api, cache):
return 'Hello, World!'
def test_stream_time_app(app_runner):
event = StreamTimeEvent(
asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)]
)
result = app_runner(stream_app, event=event)
assert result == 'Hello, World!'
Stream depth app example test
from corva import StreamDepthEvent, StreamDepthRecord, stream
@stream
def stream_app(event, api, cache):
return 'Hello, World!'
def test_stream_depth_app(app_runner):
event = StreamDepthEvent(
asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)]
)
result = app_runner(stream_app, event=event)
assert result == 'Hello, World!'
Scheduled app example test
from corva import ScheduledEvent, scheduled
@scheduled
def scheduled_app(event, api, cache):
return 'Hello, World!'
def test_scheduled_app(app_runner):
event = ScheduledEvent(asset_id=0, start_time=0, end_time=0)
result = app_runner(scheduled_app, event=event)
assert result == 'Hello, World!'
Task app example test
from corva import TaskEvent, task
@task
def task_app(event, api):
return 'Hello, World!'
def test_task_app(app_runner):
event = TaskEvent(asset_id=0, company_id=0)
result = app_runner(task_app, event=event)
assert result == 'Hello, World!'
Contributing
Set up the project
$ cd ~/YOUR_PATH/python-sdk
$ python -m venv venv
$ source venv/bin/activate
(venv) $ pip install -e .[dev]
Run tests
(venv) $ pytest
Run code linter
(venv) $ flake8
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
Unreleased
Added
corva.stream
,corva.scheduled
andcorva.task
app decorators. See readme for usage examples.ScheduledEvent.company_id
field.
Removed
corva.Corva
class.
0.0.18 - 2021-04-16
Changed
- Events are allowed to have extra fields.
0.0.17 - 2021-04-15
Added
corva.Logger
object, that should be used for app logging.LOG_THRESHOLD_MESSAGE_SIZE
andLOG_THRESHOLD_MESSAGE_COUNT
env variables, that should be used to configure logging.
0.0.16 - 2021-04-02
Added
app_runner
fixture for testing apps.
Changed
StreamEvent
was split intoStreamTimeEvent
andStreamDepthEvent
, which have correspondingStreamTimeRecord
andStreamDepthRecord
records.- Deleted all unsued fields from
ScheduledEvent
,TaskEvent
,StreamTimeEvent
andStreamDepthEvent
.
Removed
filter_mode
parameter fromCorva.stream
. Filtering is now automatic.
0.0.15 - 2021-03-23
Added
Corva.task
decorator for task apps.
0.0.14 - 2021-03-12
Added
Testing
section toREADME.md
.Api.get_dataset
method.
Changed
ScheduledEvent.schedule_end
field is now optional.ScheduledEvent.schedule_end
andScheduledEvent.schedule_start
field types fromdatetime
toint
.
0.0.13 - 2021-03-04
Added
- Tools for testing apps.
0.0.12 - 2021-02-11
Fixed
TaskEvent
queue event parsing.
###Changed
StreamEvent
must have at least one record.StreamEvent
andScheduledEvent
:- Added descriptions to fields.
- Simplified event structures.
0.0.11 - 2021-02-10
Fixed
ScheduledEvent
queue event parsing.
0.0.10 - 2021-02-05
Changed
Api
class:- Deleted retries.
- Responses do not use
raise_for_status
anymore. - Lowered
default_timeout
to 30 seconds. - Fixed url build exceptions on Windows.
0.0.9 - 2021-02-05
Removed
- Obsolete
StreamEvent
fields:app_version
. - Obsolete
ScheduledEvent
fields:app_version
.
0.0.8 - 2021-02-05
Fixed
api_key
extraction fromcontext
.
0.0.7 - 2021-02-04
Fixed
StreamEvent
queue event parsing.
0.0.5 - 2021-02-04
Added
- Required
context
parameter toCorva
. - Documentation in
README.md
.
0.0.4 - 2021-01-20
Added
Corva
class, which containsstream
andscheduled
decorators for stream and scheduled apps.
Removed
StreamApp
andScheduledApp
classes.
0.0.3 - 2020-12-15
Fixed
- Deployment to PyPI.
0.0.2 - 2020-12-15
###Added
StreamApp
to build stream apps.ScheduledApp
to build scheduled apps.TaskApp
to build task apps.Api
class to access Platform and Data Corva APIs.Cache
class to share data between app invokes.- Event classes:
StreamEvent
,ScheduledEvent
andTaskEvent
.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for corva_sdk-1.0.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4980a071615fd11d5e6e2fbae7863121bd5f4fe5913dcc1fe4bdeff0bea92979 |
|
MD5 | 830a6d68af2b2802c0eb0da22f70e534 |
|
BLAKE2b-256 | 6830d8a429d29a42f6effebdd0f32d8bd8ac732bc9699bfbfe11667bf632eee5 |