Skip to main content

A Svelte Stores implementation in Python.

Project description

Sveltish

Svelte Stores are one of the secret weapons of the Svelte framework (the recently voted most loved web framework).

Stores allow easy reactive programming by presenting an Observer pattern that is as simple as necessary, but not simpler.

Install

pip install sveltish

How to use

Sometimes, you’ll have values that need to be accessed by multiple unrelated objects.

For that, you can use stores. It is a very simple implementation (around 100 lines of code) of the Observer/Observable pattern.

A store is simply an object with a subscribe method that allows interested parties to be notified when its value changes.

Writable Stores

from sveltish.stores import writable
count = writable(0)
history = []  # logging for testing
# subscribe returns an unsubscriber
def record(x): 
    history.append(x)
    print(history)
stop = count.subscribe(record)

test_eq(history, [0])
[0]

We just created a count store. Its value can be accessed via a callback we pass in the count.subscribe method:

A Writable can be set from the outside. When it happens, all its subscribers will react.

def increment(): count.update(lambda x: x + 1)
def decrement(): count.update(lambda x: x - 1)
def reset(): count.set(0)

count.set(3)
increment()
decrement()
decrement()
reset()
count.set(42)

test_eq(history, [0, 3, 4, 3, 2, 0, 42])
[0, 3]
[0, 3, 4]
[0, 3, 4, 3]
[0, 3, 4, 3, 2]
[0, 3, 4, 3, 2, 0]
[0, 3, 4, 3, 2, 0, 42]

The unsubscriber, in this example the stop function, stops the notifications to the subscriber.

stop()
reset()
count.set(22)
test_eq(history, [0, 3, 4, 3, 2, 0, 42])
count
w<0> $int: 22

Notice that you can still change the store but there was no print message this time. There was no observer listening.

Note

Observer, Subscriber and Callback are used as synomyms here.

When we subscribe new callbacks, they will be promptly informed of the current state of the store.

stop  = count.subscribe(lambda x: print(f"Count is now {x}"))
stop2 = count.subscribe(lambda x: print(f"double of count is {2*x}"))
Count is now 22
double of count is 44
reset()
Count is now 0
double of count is 0
stop()
stop2()

You can create an empty Writable Store.

store = writable()
history = []
unsubscribe = store.subscribe(lambda x: history.append(x))
unsubscribe()
test_eq(history, [None])

If you try to unsubscribe twice, it won’t break. It just does nothing the second time… and in the third time… and…

unsubscribe(), unsubscribe(), unsubscribe()
(None, None, None)

Stores assume mutable objects.

Note

In Python everythong is an object. Here we are calling an object something that is not a primitive (eg. int, bool, etc)

class Bunch:
    __init__ = lambda self, **kw: setattr(self, '__dict__', kw)

obj = Bunch()
called = 0
store = writable(obj)
def callback(x):
    global called
    called += 1
stop = store.subscribe(callback)
test_eq(called, 1)
obj.a = 1 #type: ignore
store.set(obj)
test_eq(called, 2)

Readable Stores

However… It is clear that not all stores should be writable by whoever has a reference to them. Many times you want a single publisher of change in store that is only consumed (subscribed) by many other objects. For those cases, we have readable stores.

Note

The Publisher Subscriber (PubSub) pattern is a variant of the Observable/Observer pattern.

from sveltish.stores import readable

A Readable store without a start function is a constant value and has no meaning for us. Therefore, start is a required argument.

try:
    c = readable(0) # shoud fail
except Exception as error:
    print(error)

test_fail(lambda: readable(0))
readable() missing 1 required positional argument: 'start'
class Publisher:
    def __init__(self): self.set = lambda x: None
    def set_set(self, set): 
        self.set = set
        return lambda: None
    def use_set(self, value): self.set(value)
p = Publisher()
reader = readable(0, p.set_set)
reader
r<0> $int: 0

Ths store only starts updating after the first subscriber. Here, the publisher does not change the store.

p.use_set(1), reader
(None, r<0> $int: 0)
stop = reader.subscribe(lambda x: print(f"reader is now {x}"))
reader is now 0
p.use_set(2)
reader is now 2
stop()

Another example of Readable Store usage:

from threading import Event, Thread
import time
def start(set): # the start function is the publisher
    stopped = Event()
    def loop(): # needs to be in a separate thread
        while not stopped.wait(1): # in seconds
            set(time.localtime())
    Thread(target=loop).start()    
    return stopped.set
now = readable(time.localtime(), start)
now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)

Note

The loop needs to be in its own thread, otherwise the function would never return and we would wait forever.

While there is no subscriber, the Readable will not be updated.

now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)
OhPleaseStop = now.subscribe(lambda x: print(time.strftime(f"%H:%M:%S", x), end="\r"))
22:12:41
time.sleep(2)
OhPleaseStop()
22:12:43

Note

The Svelte Store api allow you to create a Readable Store without a Notifier. See discussion here.

Derived Stores

A Derived Store stores a value based on the value of another store.

from sveltish.stores import derived

For example:

count = writable(1)
stopCount = count.subscribe(lambda x: print(f"count is {x}"))
double = derived(count, lambda x: x * 2)
stopDouble = double.subscribe(lambda x: print(f"double is {x}"))
test_eq(double.get(), 2*count.get())
count is 1
double is 2
count.set(2)
test_eq(double.get(), 4)
count is 2
double is 4
stopCount(), stopDouble()
(None, None)

Building on our previous example, we can create a store that derives the elapsed time since the original store was started.

elapsing = None
def calc_elapsed(now):
    global elapsing
    if not elapsing: 
        elapsing = now
    return time.mktime(now) - time.mktime(elapsing)
now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=43, tm_wday=2, tm_yday=67, tm_isdst=0)
elapsed = derived(now, lambda x: calc_elapsed(x))
elapsed
r<0> $float: 0.0
stopElapsed = elapsed.subscribe(lambda x: print(f"Elapsed time of source store: {x} seconds.", end="\r"))
Elapsed time of source store: 0.0 seconds.
time.sleep(1)
stopElapsed()
Elapsed time of source store: 2.0 seconds.

Derived stores allow us to transform the value of a store. In RxPy they are called operators. You can build several operators like: filter, fold, map, zip

Let’s build a custom filter operator:

user = writable({"name": "John", "age": 32})
stopLog = user.subscribe(lambda x: print(f"User: {x}"))
User: {'name': 'John', 'age': 32}
name = derived(user, lambda x: x["name"])
stopName = name.subscribe(lambda x: print(f"Name: {x}"))
Name: John
user.update(lambda x: x | {"age": 45})
User: {'name': 'John', 'age': 45}

Updating the age does not trigger the name subscriber. Let’s see what happens when we update the name.

user.update(lambda x: x | {"name": "Fred"})
Name: Fred
User: {'name': 'Fred', 'age': 45}

Only changes to the name of the user triggers the name subscriber.

stopName(), stopLog()
(None, None)

Another cool thing about Derived Stores is that you can derive from a list of stores. Let’s build a zip operator.

a = writable([1,2,3,4])
b = writable([5,6,7,8])
a,b
(w<0> $list: [1, 2, 3, 4], w<0> $list: [5, 6, 7, 8])
zipper = derived([a,b], lambda a,b: list(zip(a,b)))
test_eq(zipper.get(), [(1, 5), (2, 6), (3, 7), (4, 8)])

While zipper has no subscribers, it keeps the initial value, it is stopped.

a.set([4,3,2,1])
test_eq(zipper.get(), [(1, 5), (2, 6), (3, 7), (4, 8)])

A subscription starts zipper and it will start to react to the changes of the stores.

u = zipper.subscribe(lambda x: None)
test_eq(zipper.get(), [(4, 5), (3, 6), (2, 7), (1, 8)])
b.set([8,7,6,5])
test_eq(zipper.get(), [(4, 8), (3, 7), (2, 6), (1, 5)])
u()

Store composition with pipes

writable(1).pipe(lambda x: x + 1).pipe(lambda x: x * 2)
r<0> $int: 4
writable(1).pipe(lambda x: x+1, lambda x: x*2)
r<0> $int: 4
writable(1) | (lambda x: x+1) | (lambda x: x*2)
r<0> $int: 4
a = writable(1)
u5 = (a 
      | (lambda x: x*2) 
      | (lambda x: x*2) 
      | (lambda x: x*2)).subscribe(lambda x: print(f"u5: {x}"))
u5: 8
a.set(2)
u5: 16
u5()

Missing features

You may have noticed that along the way we had always to subscribe and then had to remember to unsubscribe when we were done. This is a bit of a nuisance. Svelte has a compiler that provide some syntatic sugar to make this easier. They call it auto-subscriptions.

Sveltish does not have auto-subscriptions yet. But if you have a nice idea how to implement it, please let me know.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sveltish-1.2.9.tar.gz (19.6 kB view hashes)

Uploaded Source

Built Distribution

sveltish-1.2.9-py3-none-any.whl (16.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page