Skip to main content

A utility library based on KaithemAutomation featuring a GStreamer wrapper

Project description

scullery

LGPL Python Pytest Pre-commit Badge

Python library that provides some core parts of KaithemAutomation. It's a general purpose utility library

  • Data persistance
  • Internal MQTT-like message bus
  • Thread pool workers
  • State Machines(With conditions and timers)
  • Scheduling, including repeating events
  • Media Handling and JACK now moved to IceMedia

Intro

See example.py for more details. Also see the equally simple audio and video player.

Examples

import scullery.workers
"This file demonstrates a lot of scullery's functionality in a small package"

#Including just importing this, #Todo?
import scullery.messagebus

import scullery.persist

#Start the gst pipeline
def subscriber(topic,val):

    print("Got a message")

#Unsubscribe happens automatically if we don't keep a ref to the function
scullery.messagebus.subscribe("/test/topic",subscriber)

#Post a message, the actual subscribers all run in the background worker pool
scullery.messagebus.post_message("/test/topic","TestPayload")

import time

#Also supports YAML, txt, bin for the appropriate datatypes, if the YAML lib is there.
#Can use .gz or .b2z to compress. Saved atomically with tilde files and UNIX rename semantics.
#Checks if it actually needs to save before actually writing the file.
import os
#Get an abs path
fn = os.path.join(os.path.dirname(os.path.abspath(__file__)),"testFile.json")
print("Going to save data to: "+fn)
scullery.persist.save(myData,fn)
assert scullery.persist.load(fn)==myData



while(1):
    time.sleep(1)

scullery.messagebus

kaithem.message.post(topic,message, timestamp=None, annotation=None)

Post a message to the internal system-wide message bus. Message topics are hierarchial, delimited by forward slashes, and the root directory is /. However /foo is equivalent to foo.

messages may be any python object at all.

The timestamp will be set to time.time() if it is None.

Annotation is used for sending "extra" or "hidden" metadata, usually for preventing loops. It defaults to None.

kaithem.message.subscribe(topic,callback)

Request that function callback which must take four arguments(topic,message, timestamp,annotation), two arguments(topic,message), or just one argument(message) be called whenever a message matching the topic is posted.

Wildcards follow MQTT subscription rules.

Should the topic end with a slash and a hash, it will also match all subtopics(e.g. "/foo/#" will match "/foo", "/foo/bar" and "/foo/anything").

Uncaught errors in the callback are ignored but logged.

You must always maintain a reference to the callback, otherwise, the callback will be garbage collected and auto-unsubscribed. This is also how you unsubscribe.

scullery.units

This module deals with unit conversions.

scullery.units.convert(value,from_unit, to_unit)

Try to convert the value, falling back to the (very slow) pint library for less common conversions not natively supported.

scullery.units.si_format_number(value, digits=2)

Format a number like 2000 -> "2k", digits controls max decimal places after.

Scheduling

Wraps the very simple scheduling module in a way that supports repeating events, error reporting, and weakref-based cleanup.

Example

import logging
import sys
import time
import gc
import scullery.scheduling

# Set up logging, for demo purposes
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

@scullery.scheduling.every(1)
def f():
    print("test")

time.sleep(4)

# You could delete f and it would go away, but
# lets close it properly.
# del f
f.unregister()


# Since you can't decorate the class methods
# the same way, we do this

class Foo():
    def __init__(self):
        # Must keep a reference to scheduled
        # Or it will stop
        self.scheduled = scullery.scheduling.every(self.bar,1)

    def bar(self):
        print("test 2")


f = Foo()
time.sleep(3)
# We don't call unregister, so we get a warning.
# f.scheduled.unregister()
del f
# Garbage collect so the deleter runs right away
gc.collect()

# Should stop running because we deleted the class
time.sleep(3)

State Machines

import time
import scullery.statemachines

sm = scullery.statemachines.StateMachine("start")

sm.add_state('start', exit=lambda: print('Exiting start state'))
sm.add_state('state2', enter=lambda: print('Entering state 2'))
sm.add_state('state3', enter=lambda: print('Entering state 3'))

sm.add_rule('start', 'my_event', 'state2')
sm.set_timer('state2', 2, 'state3')

# Event triggered, now we are in state2
# A timer is running in the background.
sm.event('my_event')

time.sleep(3)

# The timer went off, now we are in state 3
# Stateage returns a tuple of state, age
# Where age is how long we've been in that state.
print(sm.stateage)

Rate Limiting(0.17.0 and up)

import scullery.ratelimits

# 1hz average, up to max burst of 50
rl = scullery.ratelimits.RateLimiter(hz=1, burst=50)

for i in range(100):
    # Returns number of credits remaining.
    # They refill at the given hz rate up to the burst limit

    if not rl.limit():
        raise RuntimeError("No rate limiter credits remaining")

snake_compat(0.17.0 and up)

This module converts between snake_case, camelCase, and kebeb-case.

It can do so for one string, or it can make a shallow copy of a dict with all the keys converted and values untouched.

# Example only, * imports are bad!
from scullery.snake_compat import *

# def camel_to_kebab(s: str) -> str:

# def kebab_to_snake(s: str):

# def snake_to_kebab(s: str):

# def snake_to_camel(s: str):

# def camel_to_snake(s: str):

# def snakify_dict_keys(d: Dict[str, Any]) -> Dict[str, Any]:


# def kebabify_dict_keys(d: Dict[str, Any]) -> Dict[str, Any]:

# def camelify_dict_keys(d: Dict[str, Any]) -> Dict[str, Any]:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scullery-0.2.0.tar.gz (46.7 kB view details)

Uploaded Source

Built Distribution

scullery-0.2.0-py3-none-any.whl (49.0 kB view details)

Uploaded Python 3

File details

Details for the file scullery-0.2.0.tar.gz.

File metadata

  • Download URL: scullery-0.2.0.tar.gz
  • Upload date:
  • Size: 46.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.3 Linux/6.8.0-35-generic

File hashes

Hashes for scullery-0.2.0.tar.gz
Algorithm Hash digest
SHA256 84dcab459df4401acfbb76e82de9783898c18156d03f6438681b52acdc26a505
MD5 4517fac78bec1d184bb8a5d097e4361f
BLAKE2b-256 faa32c5e94e5645e9fdbb764860bb4baf5c0cddb35cc442e060f441400f438e2

See more details on using hashes here.

File details

Details for the file scullery-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: scullery-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 49.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.3 Linux/6.8.0-35-generic

File hashes

Hashes for scullery-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 63cb019ac4ba892fbe553f80eb9f45984eef9a5d73daa24cc973fa90a49744cc
MD5 b5a2d3e30da1d156f014dc43e9ed1842
BLAKE2b-256 90d8e92db62d303cf9ff822541066ad8402c8d982f0ea05e30e7d04b3c09b258

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page