Generates PL/pgSQL script creating LISTEN/NOTIFY channels
Project description
pg-channel
Generate ready-to-run PL/pgSQL scripts that wire up PostgreSQL LISTEN/NOTIFY channels for INSERT / UPDATE / DELETE events on a given table.
Why this project
PostgreSQL's LISTEN/NOTIFY is the cheapest way to push row-level changes to application code (workers, websockets, cache invalidators, etc.) without polling. But writing trigger functions by hand is repetitive and error-prone:
- you need a separate trigger function per table per event;
- payload formatting must be consistent across services that consume the channel;
- on
UPDATEyou usually don't want the full row — you want the diff (which fields changed and from what to what); - you often need conditional channels — e.g. notify only when specific fields change, with
AND/ORlogic between them.
pg-channel solves all of this with a single function plsql() that, from a table name and an optional dict describing update conditions, produces a complete PL/pgSQL script. Run the output once against your database — and the channels are live.
Channel payload conventions
tables_new— onINSERT, payload = the full new row as JSON (row_to_json(NEW)).tables_del— onDELETE, payload =OLD.id(as string).tables_upd(ortables_upd_<event>) — onUPDATE, payload = JSON array[NEW.id, diff], wherediffis an object of the form{field_name: [old_value, new_value], ...}containing only changed fields. If nothing changed,diffisnull.
The
delchannel intentionally sends only the id — a deleted row has no other useful state for consumers, and a numeric id keeps payloads small.
Installation
pip install pg-channel
Usage
from pg_channel import plsql, Act
plsql(table, ops=7, updates=None) returns a str containing the full PL/pgSQL script. Execute it against your database with any client (psycopg, asyncpg, psql -f, a migration tool, etc.).
Arguments
| name | type | default | description |
|---|---|---|---|
table |
str |
— | Table name. Used as-is for the channel prefix ({table}s_*) and is wrapped in double quotes inside the CREATE TRIGGER statement. |
ops |
int (bitmask of Act) |
7 |
Which triggers to generate. Act.DEL=1, Act.NEW=2, Act.UPD=4. Combine with bitwise OR. 7 = all three. |
updates |
dict[str, tuple | list] |
None |
Optional conditional routing for the update trigger. Keys are event names, values are field sets. See Conditional update channels. |
Examples
All three triggers, single update channel
plsql("my_ad")
Fires:
my_ads_newon insert,my_ads_delon delete,my_ads_updon any update.
Only the update trigger
plsql("my_ad", Act.UPD) # same as plsql("my_ad", 4)
The generated update function looks like this:
CREATE OR REPLACE FUNCTION my_ad_upd() returns trigger as $my_ad_upd_trg$
DECLARE
diff jsonb;
BEGIN
SELECT jsonb_object_agg(key, jsonb_build_array(to_jsonb(OLD) -> key, value))
INTO diff
FROM jsonb_each(to_jsonb(NEW))
WHERE value IS DISTINCT FROM (to_jsonb(OLD) -> key);
PERFORM pg_notify('my_ads_upd', jsonb_build_array(NEW.id, diff)::varchar);
RETURN NULL;
END
$my_ad_upd_trg$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER my_ad_upd AFTER UPDATE ON "my_ad" FOR EACH ROW EXECUTE FUNCTION my_ad_upd();
A consumer of my_ads_upd will receive payloads like:
[42, {"status": ["draft", "published"], "price": [100, 120]}]
Conditional update channels
The updates argument splits the single tables_upd channel into several named sub-channels, each fired only when its field condition is met.
- Key — event name; the channel becomes
{table}s_upd_{event}. - Value — fields that must change for the event to fire:
list→ OR (any of the listed fields changed),tuple→ AND (all of the listed fields changed).
- Key prefixed with
_→ the branch is emitted asELSIFinstead of an independentIF, so it fires only when no earlier branch matched (useful for mutually exclusive events).
One specific field
plsql("order", 7, {"change": ["status"]})
Fires orders_upd_change only when order.status changes.
AND / OR mix
plsql("table3", 5, {
"event1": ("field1", "field2"), # tuple → AND
"event2": ["field2", "field3"], # list → OR
})
Generates two independent IF blocks — both can fire on the same row update.
Mutually exclusive branches (ELSIF)
plsql("order", 4, {
"published": ["status"],
"_priced": ["price"], # leading "_" → ELSIF
})
If status changes, orders_upd_published fires and the price branch is skipped, even if price also changed.
Listening from Python
import asyncpg, asyncio, json
async def main():
conn = await asyncpg.connect(dsn=...)
await conn.add_listener("my_ads_upd", lambda c, pid, ch, payload: print(ch, json.loads(payload)))
await asyncio.Future() # keep the connection open
asyncio.run(main())
Caveats and nuances
NEW.idis assumed. The update trigger callsjsonb_build_array(NEW.id, diff), and the delete trigger usesOLD.id. Tables without anidcolumn require a manual edit of the generated SQL.- Channel name pluralisation is naive.
plsql("my_ad")yieldsmy_ads_*— the library just appendss. Pick table names that look reasonable when pluralised this way, or rename the channels after generation. - Table name is quoted, channel name is not.
CREATE TRIGGERuses"{table}"(so reserved words likeorderare safe), but channel names are raw — keep them to lowercase identifiers. row_to_jsonvsto_jsonb. TheNEWchannel usesrow_to_json(insert payload), theUPDdiff usesto_jsonb. Both serialise PostgreSQL types to JSON, but be aware of how your column types (especiallybytea, custom enums, ranges) render before relying on the payload format.- Payload size limit. PostgreSQL caps each
NOTIFYpayload at 8000 bytes by default. Wide rows or large text fields can exceed this — prefer narrow update channels (use theupdatesdict) or notify only ids and re-fetch on the consumer side. CREATE OR REPLACE. Both functions and triggers are emitted withCREATE OR REPLACE, so re-running the generated script is idempotent.- Trigger timing. All triggers are
AFTERrow triggers andRETURN NULL— they don't influence the operation itself.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pg_channel-0.0.7.tar.gz.
File metadata
- Download URL: pg_channel-0.0.7.tar.gz
- Upload date:
- Size: 10.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8d078ad0f78be76e8df78f38f3864cdd81a478c8bf0038f68c786b355dedc05
|
|
| MD5 |
5e6f78864ca2e23235b1a0627ef60b52
|
|
| BLAKE2b-256 |
0066addbaa53d8e37a6c26b34c1164721f783a61e652b72695abe8a157b05856
|
File details
Details for the file pg_channel-0.0.7-py3-none-any.whl.
File metadata
- Download URL: pg_channel-0.0.7-py3-none-any.whl
- Upload date:
- Size: 5.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
301487b7c07131dfea9672dd50113a40a0664fa3da2991f9246b11334ff1fc2e
|
|
| MD5 |
3030367ea027bdfc884ed9b4991010a4
|
|
| BLAKE2b-256 |
22c5cc441459f949e5b6dd871874bfce073fd19abb87a055bf6c92fd3d09224a
|