Skip to main content

Python SDK for AveniECA

Project description

avenieca-python

Python SDK for ingress and retrieval to an AveniECA instance. Stream state through Kafka, interact with the REST API.

pip install avenieca-python

Usage

Stream continuously to your twin's sub_topic

import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Stream

# Define a handler that returns a Signal dataclass

def handler():
    signal = Signal(
            valence=10,
            state=[0.2, 0.3, 0.8],
        )
    return signal

# Initialize broker configuration for the Stream
config = Broker(
        url=os.environ["KAFKA_URL"],
        sub_topic="left_wheel", #ECA twin subscriber-topic
        group="test",
        pub_topic=""
    )

# Initialize the Stream object with a sync_rate 
stream = Stream(config=config, sync_rate=1)
stream.publish(handler)

Publish one signal as an event

import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Event

config = Broker(
        url=os.environ["KAFKA_URL"],
        sub_topic="left_wheel",
        group="test",
        pub_topic=""
    )

# Define the signal
signal = Signal(
            valence=9.0,
            state=[0.2, 0.3, 0.8],
        )

event = Event(config=config)
event.publish(signal)

Consume from kafka topic

import os
import numpy as np
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.utils.signal import get_state_as_list, get_state_as_array
from avenieca.consumer import Consumer

config = Broker(
        url=os.environ["KAFKA_URL"],
        sub_topic="left_wheel",
        group="test",
        pub_topic=""
    )

signal = Signal(
            valence=9.0,
            state=[0.2, 0.3, 0.8],
        )

# Define a handler to process incoming messages
def handler(data):
    valence = data["valence"]
    state = data["state"]
    assert valence == 10
    assert state == "[0.2, 0.3, 0.8]"

client = Consumer(config=config)
client.consume(handler, True) # pass in handler

# You can use util functions in your handler to 
# convert the state signal from byte string to 
# np.ndarray or python list
def handler1(data):
    assert data["valence"] == 10
    assert data["state"] == "[0.2, 0.3, 0.8]"
    get_state_as_list(data)
    assert data["state"] == [0.2, 0.3, 0.8]

def handler2(data):
    assert data["valence"] == 10
    assert data["state"] == "[0.2, 0.3, 0.8]"
    get_state_as_array(data)
    assert True, np.array_equal(data["state"], np.array([0.2, 0.3, 0.8]))

REST API

import os
from pprint import pprint

import avenieca.config.twin
from avenieca.api.model import *
from avenieca.api.eca import ECA

username = os.getenv("USERNAME")
password = os.getenv("PASSWORD")
config = Config(uri="http://localhost:2580/v1", username=username, password=password)

eca = ECA(config)

'''ESS'''
ess = ESSInsert(
    module_id="air_conditioner",
    state=[11],
    valence=10.0,
    score=4,
    embedding_input=1,
    context=None,
)
res, status = eca.ess.create(data=ess)
res, status = eca.ess.get_all(module_id="air_conditioner")
res, status = eca.ess.get_one(module_id="air_conditioner", db_id=8)
res, status = eca.ess.update(module_id="air_conditioner", db_id=8, data=ess)
res, status = eca.ess.get_one_sequence(module_id="air_conditioner", sequence_id=3)
res, status = eca.ess.get_all_sequence(module_id="air_conditioner")
res, status = eca.ess.get_one_pretty(module_id="gwp_record", db_id=1)
res, status = eca.ess.search(data=Search(
    module_id="air_conditioner",
    state=[18],
    limit=1
))
res, status = eca.ess.get_one_embedding(module_id="air_conditioner", embedding_input=1)
res, status = eca.ess.get_all_aggregates(module_id="team", aggregate_module_id="gwp_aggregate", ess_id=1)

'''Sequence'''
sequence = SequenceInsert(
    module_id="air_conditioner",
    instance_id=10,
    status="e",
    context=None,
)
res, status = eca.sequence.create(data=sequence)
res, status = eca.sequence.get_one(module_id="air_conditioner", db_id=4)
res, status = eca.sequence.get_all(module_id="air_conditioner")
res, status = eca.sequence.update(module_id="air_conditioner", db_id=4, data=sequence)
res, status = eca.sequence.delete(module_id="aggregate001", db_id=59)


nsr = NextStateRequest(
    module_id="aggregate001",
    recall=20,
    range=20,
    n=1,
    status="e",
)
res, status = eca.cortex.predictions(data=nsr)
res, status = eca.cortex.predictions_raw(data=nsr)

'''Document'''
document = DocumentInsert(
    doc_id="001",
    text="testing 123",
    embed=True,
)
res, status = eca.document.create(data=document)
res, status = eca.document.get_one(db_id=20)
res, status = eca.document.get_all()
res, status = eca.document.update(db_id=20, data=document)
res, status = eca.document.delete(db_id=100)

'''Embedding'''
input_hash = avenieca.encode("my_secret", "the inputs")
embedding = EmbeddingInputInsert(
    module_id="air_conditioner",
    input="the inputs",
    hash=input_hash
)
res, status = eca.embedding.create(data=embedding)
res, status = eca.embedding.get_one(module_id="air_conditioner", db_id=1)
res, status = eca.embedding.get_all(module_id="air_conditioner")
res, status = eca.embedding.update(module_id="air_conditioner", db_id=1, data=embedding)
res, status = eca.embedding.delete(module_id="air_conditioner", db_id=1)

'''Response'''
res, status = eca.response.get_one(db_id=100)
res, status = eca.response.get_all()

'''Retrieval'''
retrieval = RetrievalRequest(
    query="what is the temperature on 3rd of may at around 1pm?"
)
res, status = eca.retrieval.query(data=retrieval)

'''create ess, then create aggregate'''
ess_temperature = ESSResponse(
    id=2,
    created_at='',
    updated_at='',
    module_id="temperature",
    state=[28.0],
    valence=-90,
    score=1,
    context=None,
    embedding_input=None
)
ess_air_conditioner = ESSResponse(
    id=5,
    created_at='',
    updated_at='',
    state=[25.0],
    module_id='air_conditioner',
    valence=90.0,
    score=18,
    embedding_input=None,
    context=None
)
ess_occupancy = ESSResponse(
    id=7,
    created_at='',
    updated_at='',
    state=[10.0],
    module_id='occupancy',
    valence=-90.0,
    score=6,
    embedding_input=None,
    context=None
)
ess_purifier = ESSResponse(
    id=3,
    state=[2.0],
    module_id='purifier',
    valence=90.0,
    score=28,
    embedding_input=None,
    context=None)
ess_air_quality_index = ESSResponse(
    id=6,
    created_at='',
    updated_at='',
    state=[70.0],
    module_id='air_quality_index',
    valence=-90.0,
    score=2,
    embedding_input=None,
    context=None
)
aggregate_insert = ESSInsert(
    module_id="aggregate001",
    state=[],
    valence=10.0,
    avg_ess_valence=0.0,
    score=0,
    total_ess_score=0,
    avg_ess_score=0,
    aggregate_id=[],
    aggregate_valence=[],
    aggregate_score=[],
    aggregate_module_id=[],
    aggregate_shape=[],
    aggregate_context=[],
    aggregate_emb_inp=[],
    context=None,
)


def create_aggregate_from_ess(array_ess: List[ESSResponse], aggregate_insert: ESSInsert):
    total_ess_score = 0
    total_ess_valence = 0.0
    for ess in array_ess:
        aggregate_insert.state.extend(ess.state)
        aggregate_insert.aggregate_module_id.append(ess.module_id)
        aggregate_insert.aggregate_id.append(ess.id)
        aggregate_insert.aggregate_context.append(ess.context)
        aggregate_insert.aggregate_valence.append(ess.valence)
        aggregate_insert.aggregate_score.append(ess.score)
        aggregate_insert.aggregate_emb_inp.append(ess.embedding_input)
        aggregate_insert.aggregate_shape.append(len(ess.state))
        total_ess_score += ess.score
        total_ess_valence += ess.valence
    aggregate_insert.total_ess_score = total_ess_score
    aggregate_insert.avg_ess_score = int(total_ess_score / len(array_ess))
    aggregate_insert.avg_ess_valence = total_ess_valence / len(array_ess)
    aggregate_insert.valence = total_ess_valence
    return aggregate_insert

agg_in = create_aggregate_from_ess(
    [
        ess_air_conditioner,
        ess_air_quality_index,
        ess_occupancy,
        ess_purifier,
        ess_temperature
    ],
    aggregate_insert)

res, status = eca.ess.create(data=agg_in)
res, status = eca.ess.upsert(module_id="aggregate001", db_id=5)

res, status = eca.sequence.create(data=SequenceInsert(
    module_id="aggregate001",
    instance_id=34,
    status="sk"
))

try:
    pprint(res.__dict__)
except:
    print(len(res))
print(status)

Tests

python -m pytest test/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

avenieca-python-0.1.4.tar.gz (28.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

avenieca_python-0.1.4-py3-none-any.whl (35.5 kB view details)

Uploaded Python 3

File details

Details for the file avenieca-python-0.1.4.tar.gz.

File metadata

  • Download URL: avenieca-python-0.1.4.tar.gz
  • Upload date:
  • Size: 28.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for avenieca-python-0.1.4.tar.gz
Algorithm Hash digest
SHA256 5f0efd04d68bd944fb6251223a45a637b487ed6d51f40627591cc4e7243f005c
MD5 62c5dfa282a847bb0d0c03c3e3570737
BLAKE2b-256 1cb19f61b00bb73523a406674f052e3be71f658ac53d70597fd62cefb86a89d6

See more details on using hashes here.

File details

Details for the file avenieca_python-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for avenieca_python-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 359d92e80a50548e55f2d0322a0d1abc8db27f7da01c68f05008de451ae0e0d9
MD5 523772316fbbc396e37ebe6c59665130
BLAKE2b-256 5f890cde8ddfdfa67d131b8c5361f412a23347785d739ff3a24a359dd11a7e72

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page