Skip to main content

A simple little module to capture, process, and dispatch Postgresql NOTIFY streams

Project description

pgnotifier

A simple little module to capture, process, and dispatch Postgresql NOTIFY streams

Features

  • Monitor multiple channels
  • Register one or more callbacks with multiple channels
  • Register one or more channels with a callback
  • Control callbacks on a per channel basis
  • Add and remove channels at any time
  • Mute and un-mute channels
  • Add and remove subscribers at any time
  • Mute and un-mute subscribers
  • Abstract away asynchronous context for synchronous use
  • Automatic str -> type conversion of all valid python types via ast.literal_eval
  • Persistent, immutable internal data structures
  • Minimalist API
  • Tight footprint

Install

pip install pgnotifier

Usage

import sys
from pgnotifier import Notifier

print("Ctrl-C to exit", file=sys.stderr)

conf = {
    'dbname': "my_db_name",
    'user': "my_db_user",
    'password': "my_password",
    'host': "my_host",
    'port': "my_port",
}
n = Notifier(conf)
n.add_channels(['ch1', 'ch3'])

n.subscribe(42, 'ch1',
    lambda id, channel, payload, pid: print(
        "callback id: ", id, ", channel: ", channel,
        ", payload: ", payload, ", pid: ", pid))

n.subscribe('an_id', 'ch2',
            lambda *_: print("I'm just going to ignore that."))

def do_complex_thing(id, channel, payload, pid):
    if isinstance(payload, dict) or type(payload) == dict:
        for k,v in payload.items():
            print("doing something with key:",k, "-> val",v)
            # do something else
            # I think you get the idea...
    else:
        print("payload of type: ",
              type(payload), "is not what I was expecting!")

# subscriber with tuple id
n.subscribe((2, 'another_id'), 'ch2', do_complex_thing)

Test

From the Postrgesql end, send TEXT or JSON string notifications like so:

select pg_notify('ch1', '"WARNING: Something really bad happened"');
select pg_notify('ch1', '{"topic": "abc", "data": "some data", "something": "else"}');
select pg_notify('ch2', '{"topic": "xyz", "notice": "update", "data": [2, "stuff"]}');
select pg_notify('ch2', '[1,2,3,4,5]');
select pg_notify('ch3', '[1,2,3,4,5]');

Back in python, the payload is passed to callbacks subscribed to channels ch1, ch2, etc. The payload is cast to it's native python type via ast.literal_eval. See https://docs.python.org/3/library/ast.html and https://docs.python.org/3/library/ast.html#ast.literal_eval


[!IMPORTANT] Postgresql notifications must be text and must be shorter than 8000 bytes. It is recommended to only send the key of a record, or a view or table name, a function reference, etc.


API

The methods below provide everything needed to work with pgnotifier.

Notifier


Internal helper functions

The functions below are not required outside the internals of pgnotifier. They are publicly exposed and included here as a matter of interest.

Internal helper functions


Private methods

Documentation about the inner-workings of pgnotifier is kept separately from the README, and can be found over here: Private methods. or via the method links below.

Private methods


TODO

A list of stuff to look into at a later date can be found over here: TODO


API

The methods below provide everything needed to work with pgnotifier.

Notifier( dbconf )

Constructor.

Args:

  • dbconf database configuration, as dict.
from pgnotifier import Notifier

n = Notifier(dbconf)

Notifier.add_channels( channels )

Adds one or more channels to the set of channels to monitor. Is a no-op if channel already exists.

Args:

  • channels list of channels to add, as str (single channel), list or set.
from pgnotifier import Notifier

n = Notifier(conf)
n.add_channels(['ch1', 'ch2', 'ch3'])

Notifier.remove_channels( channels, autorun=True )

Removes one or more channels from the set of channels to monitor. Is a no-op if channel doesn't exist. Optionally restarts listener thread (if needed).

Args:

  • channels list of channels to remove, as str (single channel), list or set.
  • autorun restart listener thread with channels removed, as bool. Defaults to True.

[!IMPORTANT] Active channels, when removed, will only cease being monitored after a listener thread restart. Thread restarts happen automatically when autorun=True. Otherwise, if autorun=False, removed channels will continue to be monitored until a call to stop() and start(), or restart(), is made.

Inactive channels (e.g. channel is muted and/or has no subscribers and/or has all muted subscribers), when removed, do not require a restart as they will have already been removed from the listener thread.

Listener thread (re)starts are only required under certain, specific circumstances. It's advisable to allow pgnotifier take care of listener thread management via the default autorun=True, unless there is a very good reason to manage it manually. See __maybe_restart for more detail.

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.remove_channels('ch2')
c = n.channels()
print("channels:", c)

Notifier.channels( )

Returns channel and subscriber data, as dict.

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
c = n.channels()
print("channels: ", c)

Notifier.subscribe( id, channel, fn, autorun=True )

Adds a callback function with id for notifications on channel. Creates channel if channel does not exist. Optionally restarts listener thread (if needed).

Args:

  • id subscriber id, as hashable (i.e. any immutable type such as strings, numbers, and tuples containing immutable types).
  • channel notification channel to subscribe to, as str.
  • fn callback function, as callable (i.e. function or method).
  • autorun restart listener thread (if needed), as bool. Defaults to True.

[!IMPORTANT] A new channel, when added with this subscriber, or, a channel that becomes active due to this subscriber can only be monitored after a listener thread restart. Thread restarts happen automatically when autorun=True. Otherwise, if autorun=False, activated channels containing this subscriber will not be monitored until a call to stop() and start(), or restart(), is made.

Listener thread (re)starts are only required under certain, specific circumstances. It's advisable to allow pgnotifier take care of listener thread management via the default autorun=True, unless there is a very good reason to manage it manually. See __maybe_restart for more detail.

When a notification is received on a channel, callbacks subscribed to that channel will be executed.

Args:

  • id the subscriber id as hashable.
  • channel the notification channel, as str.
  • payload the notification received, as native type as cast by ast.literal_eval.
  • pid the notifying sessions server process PID, as int.
from pgnotifier import Notifier

n = Notifier(conf)
n.subscribe(42, 'ch4',
    lambda id, channel, payload, pid: print("id: ", id, ", channel: ", channel,
        ", payload: ", payload, ", pid: ", pid))

Notifier.unsubscribe( id, channel, autorun=True )

Removes a callback function with id from notifications on channel. Optionally restarts listener thread (if needed).

Args:

  • id the subscriber id, as hashable.
  • channel notification channel to unsubscribe from, as str.
  • autorun restart listener thread (if needed), as bool. Defaults to True.
from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.unsubscribe(42, 'ch1')

Notifier.subscribers( )

Returns subscriber and channel data, as dict.

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
s = n.subscribers()
print("subscribers:", s)

Notifier.mute_channels( channels=pyrsistent.PVector )

Mutes channels. Removes channels from listener thread, thereby muting all subscribers associated with those channels (no matter their mute status).

Subscribers will retain their mute status associated with those channels.

Args:

  • channels list of channels to mute, as str (single channel), list or set. If no channels given, ALL channels will be muted.
from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.mute_channels('ch1')
m = n.muted_channels()
print("muted channels:", m)

Notifier.unmute_channels( channels=pyrsistent.PVector )

Un-mutes channels. Adds channels to the listener thread, thereby adding all un-muted subscribers associated with those channels.

Args:

  • channels list of channels to un-mute, as str (single channel), list or set. If no channels given, ALL channels will be un-muted.

[!NOTE] Channel will remain inactive (i.e. excluded from the listener thread) if it does not contain any non-muted subscribers.

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.unmute_channels()
m = n.non_muted_channels()
print("non muted channels:", m)

Notifier.mute_subscriber( id, channels=pyrsistent.PVector )

Mutes subscriber on channels. If a channel no longer contains any non-muted subscribers, it is said to be inactive and is removed from the listener thread.

Args:

  • id subscriber id, as hashable (i.e. any immutable type such as strings, numbers, and tuples containing immutable types).
  • channels list of channels to mute the subscriber on, as str (single channel), list or set. If no channels given, the subscriber will be muted on ALL channels it is subscribed to.
from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.mute_subscriber('an_id', 'ch2')
m = n.muted_subscribers()
print("muted subscribers:", m)

Notifier.unmute_subscriber( id, channels=pyrsistent.PVector )

Un-mutes subscriber on channels. If subscriber is on a non-muted, inactive channel, the channel becomes active and is added to the listener thread.

Args:

  • id subscriber id, as hashable (i.e. any immutable type such as strings, numbers, and tuples containing immutable types).
  • channels list of channels to un-mute the subscriber on, as str (single channel), list or set. If no channels given, the subscriber will be unmuted on ALL channels it is subscribed to.
from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added, removed, etc. ...
n.unmute_subscriber('an_id')
m = n.muted_subscribers()
print("muted subscribers:", m)

Notifier.start( )

Starts the listener thread (if not already running). Is a no-op if thread already running. This function is generally not needed in userland.

[!NOTE] Listener thread (re)starts are only required under certain, specific circumstances. See __maybe_restart for more detail.

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added removed, etc. ...
n.start()

Notifier.stop( )

Stops the listener thread (if running). Is a no-op if thread is not running.

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been added removed, etc. ...
n.stop()

Notifier.restart( )

(Re)starts listener thread. This function is generally not needed in userland.

[!NOTE] Listener thread (re)starts are only required under certain, specific circumstances. See __maybe_restart for more detail.

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been changed with arg autorun=False ...
n.restart()

Notifier.is_running( )

Returns True if listener thread currently running, else False, as bool

from pgnotifier import Notifier

n = Notifier(conf)
# channels and/or subscribers, have been changed with arg autorun=False ...
b = n.is_running()
print("listener running?",b)

Internal helper functions

The functions below are not required outside the internals of pgnotifier. They are publicly exposed and included here as a matter of interest.

notify.assoc_in( m, pv, v )

A clojure-esque nested associative map transformer for Pyrsistent. Associates a new value v at key path pv in map m. Returns a new map with associated changes, as pyrsistent.PMap.

Args:

  • m map to transform, as pyrsistent.PMap.
  • pv a path vector of keys indicating location of desired assoc, as pyrsistent.PVector or list
  • v new value to assoc into path given by pv, as whatever!

notify.dissoc_in( m, pv )

Nested associative map key->val remover for Pyrsistent. Returns a new map with dissociated changes, as pyrsistent.PMap.

Args:

  • m map to transform, as pyrsistent.PMap.
  • pv a path vector of keys indicating location of desired dissoc, as pyrsistent.PVector or list

notify.filterkv( m, f, *a )

Trivial associative map filter. Returns a new map with filtered changes, as pyrsistent.PMap.

Args:

  • m map to filter, as pyrsistent.PMap.
  • f filter function that accepts at least a key and a value as args, as callable.
  • *a optional additional args to pass to filter function, as whatever!

notify.as_async( f, *a )

Runs asynchronous and/or blocking functions in a new asyncio loop, as a task, in a thread. Designed to be called from a synchronous context. Returns concurrent.futures.Future.

Args:

  • f function to run in asyncio loop, as callable.
  • *a optional args to function f (e.g. a blocking function call that might produce a result in the future), as whatever!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgnotifier-0.0.10.tar.gz (14.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgnotifier-0.0.10-py3-none-any.whl (11.3 kB view details)

Uploaded Python 3

File details

Details for the file pgnotifier-0.0.10.tar.gz.

File metadata

  • Download URL: pgnotifier-0.0.10.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.12

File hashes

Hashes for pgnotifier-0.0.10.tar.gz
Algorithm Hash digest
SHA256 1eb48d5994200a755a56472b321ae26a68f76a417760dec66480df02dd2cc842
MD5 86aa877e1cac5f7f4b6a2e3580ccf120
BLAKE2b-256 dd472de57ae0c78ae8d1eee1596f0d059a693e267e93c9920fec7ab88e00372a

See more details on using hashes here.

File details

Details for the file pgnotifier-0.0.10-py3-none-any.whl.

File metadata

  • Download URL: pgnotifier-0.0.10-py3-none-any.whl
  • Upload date:
  • Size: 11.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.12

File hashes

Hashes for pgnotifier-0.0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 c05efa9eeba2edbca2ff3cb85f1470751650b1ec02d12cc5485ebd0ee4b9877f
MD5 c5ac93c42349b265f40de8f358744692
BLAKE2b-256 06d58c4704c9d9b33a95c63e6cbd52c38fe8d85432319f4ac241a061bfab0f9a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page