Skip to main content

Generates PL/pgSQL script creating LISTEN/NOTIFY channels

Project description

pre-commit

pg-channel

Русская версия

Generate ready-to-run PL/pgSQL scripts that wire up PostgreSQL LISTEN/NOTIFY channels for INSERT / UPDATE / DELETE events on a given table.

Why this project

PostgreSQL's LISTEN/NOTIFY is the cheapest way to push row-level changes to application code (workers, websockets, cache invalidators, etc.) without polling. But writing trigger functions by hand is repetitive and error-prone:

  • you need a separate trigger function per table per event;
  • payload formatting must be consistent across services that consume the channel;
  • on UPDATE you usually don't want the full row — you want the diff (which fields changed and from what to what);
  • you often need conditional channels — e.g. notify only when specific fields change, with AND / OR logic between them.

pg-channel solves all of this with a single function plsql() that, from a table name and an optional dict describing update conditions, produces a complete PL/pgSQL script. Run the output once against your database — and the channels are live.

Channel payload conventions

  • tables_new — on INSERT, payload = the full new row as JSON (row_to_json(NEW)).
  • tables_del — on DELETE, payload = OLD.id (as string).
  • tables_upd (or tables_upd_<event>) — on UPDATE, payload = JSON array [NEW.id, diff], where diff is an object of the form {field_name: [old_value, new_value], ...} containing only changed fields. If nothing changed, diff is null.

The del channel intentionally sends only the id — a deleted row has no other useful state for consumers, and a numeric id keeps payloads small.

Installation

pip install pg-channel

Usage

from pg_channel import plsql, Act

plsql(table, ops=7, updates=None) returns a str containing the full PL/pgSQL script. Execute it against your database with any client (psycopg, asyncpg, psql -f, a migration tool, etc.).

Arguments

name type default description
table str Table name. Used as-is for the channel prefix ({table}s_*) and is wrapped in double quotes inside the CREATE TRIGGER statement.
ops int (bitmask of Act) 7 Which triggers to generate. Act.DEL=1, Act.NEW=2, Act.UPD=4. Combine with bitwise OR. 7 = all three.
updates dict[str, tuple | list] None Optional conditional routing for the update trigger. Keys are event names, values are field sets. See Conditional update channels.

Examples

All three triggers, single update channel

plsql("my_ad")

Fires:

  • my_ads_new on insert,
  • my_ads_del on delete,
  • my_ads_upd on any update.

Only the update trigger

plsql("my_ad", Act.UPD)   # same as plsql("my_ad", 4)

The generated update function looks like this:

CREATE OR REPLACE FUNCTION my_ad_upd() returns trigger as $my_ad_upd_trg$
    DECLARE
        diff jsonb;
    BEGIN
        SELECT jsonb_object_agg(key, jsonb_build_array(to_jsonb(OLD) -> key, value))
        INTO diff
        FROM jsonb_each(to_jsonb(NEW))
        WHERE value IS DISTINCT FROM (to_jsonb(OLD) -> key);

        PERFORM pg_notify('my_ads_upd', jsonb_build_array(NEW.id, diff)::varchar);
        RETURN NULL;
    END
    $my_ad_upd_trg$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER my_ad_upd AFTER UPDATE ON "my_ad" FOR EACH ROW EXECUTE FUNCTION my_ad_upd();

A consumer of my_ads_upd will receive payloads like:

[42, {"status": ["draft", "published"], "price": [100, 120]}]

Conditional update channels

The updates argument splits the single tables_upd channel into several named sub-channels, each fired only when its field condition is met.

  • Key — event name; the channel becomes {table}s_upd_{event}.
  • Value — fields that must change for the event to fire:
    • listOR (any of the listed fields changed),
    • tupleAND (all of the listed fields changed).
  • Key prefixed with _ → the branch is emitted as ELSIF instead of an independent IF, so it fires only when no earlier branch matched (useful for mutually exclusive events).

One specific field

plsql("order", 7, {"change": ["status"]})

Fires orders_upd_change only when order.status changes.

AND / OR mix

plsql("table3", 5, {
    "event1": ("field1", "field2"),   # tuple → AND
    "event2": ["field2", "field3"],   # list  → OR
})

Generates two independent IF blocks — both can fire on the same row update.

Mutually exclusive branches (ELSIF)

plsql("order", 4, {
    "published": ["status"],
    "_priced":   ["price"],   # leading "_" → ELSIF
})

If status changes, orders_upd_published fires and the price branch is skipped, even if price also changed.

Listening from Python

import asyncpg, asyncio, json

async def main():
    conn = await asyncpg.connect(dsn=...)
    await conn.add_listener("my_ads_upd", lambda c, pid, ch, payload: print(ch, json.loads(payload)))
    await asyncio.Future()   # keep the connection open

asyncio.run(main())

Caveats and nuances

  • NEW.id is assumed. The update trigger calls jsonb_build_array(NEW.id, diff), and the delete trigger uses OLD.id. Tables without an id column require a manual edit of the generated SQL.
  • Channel name pluralisation is naive. plsql("my_ad") yields my_ads_* — the library just appends s. Pick table names that look reasonable when pluralised this way, or rename the channels after generation.
  • Table name is quoted, channel name is not. CREATE TRIGGER uses "{table}" (so reserved words like order are safe), but channel names are raw — keep them to lowercase identifiers.
  • row_to_json vs to_jsonb. The NEW channel uses row_to_json (insert payload), the UPD diff uses to_jsonb. Both serialise PostgreSQL types to JSON, but be aware of how your column types (especially bytea, custom enums, ranges) render before relying on the payload format.
  • Payload size limit. PostgreSQL caps each NOTIFY payload at 8000 bytes by default. Wide rows or large text fields can exceed this — prefer narrow update channels (use the updates dict) or notify only ids and re-fetch on the consumer side.
  • CREATE OR REPLACE. Both functions and triggers are emitted with CREATE OR REPLACE, so re-running the generated script is idempotent.
  • Trigger timing. All triggers are AFTER row triggers and RETURN NULL — they don't influence the operation itself.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pg_channel-0.0.7.tar.gz (10.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pg_channel-0.0.7-py3-none-any.whl (5.5 kB view details)

Uploaded Python 3

File details

Details for the file pg_channel-0.0.7.tar.gz.

File metadata

  • Download URL: pg_channel-0.0.7.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for pg_channel-0.0.7.tar.gz
Algorithm Hash digest
SHA256 a8d078ad0f78be76e8df78f38f3864cdd81a478c8bf0038f68c786b355dedc05
MD5 5e6f78864ca2e23235b1a0627ef60b52
BLAKE2b-256 0066addbaa53d8e37a6c26b34c1164721f783a61e652b72695abe8a157b05856

See more details on using hashes here.

File details

Details for the file pg_channel-0.0.7-py3-none-any.whl.

File metadata

  • Download URL: pg_channel-0.0.7-py3-none-any.whl
  • Upload date:
  • Size: 5.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for pg_channel-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 301487b7c07131dfea9672dd50113a40a0664fa3da2991f9246b11334ff1fc2e
MD5 3030367ea027bdfc884ed9b4991010a4
BLAKE2b-256 22c5cc441459f949e5b6dd871874bfce073fd19abb87a055bf6c92fd3d09224a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page