PostgreSQL 的小玩具, async Notification as a chat group
PostgreSQL 提供了一個很有意思的異步消息通信功能。
利用這個功能,可以實現很多有趣的想法,例如離線聊天組,數據邏輯複製,審計日誌等。
notify就是往一個指定的通道發消息。
postgres=# \h notify
Command: NOTIFY
Description: generate a notification
Syntax:
NOTIFY channel [ , payload ]
listen就是監聽一個指定的通道。
postgres=# \h listen
Command: LISTEN
Description: listen for a notification
Syntax:
LISTEN channel
一個客戶端可以往多個通道發消息,也可以監聽來自多個通道的消息。
例子:
1. 大夥首先要加到一個聊天組(channel)
session A:
postgres=# listen cnpug;
LISTEN
session B:
postgres=# listen cnpug;
LISTEN
2. 大夥往這個聊天組發消息,自己發的消息會立即收到。
SESSION A:
postgres=# notify cnpug, 'hello, every body.';
NOTIFY
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
3. 大夥可以隨時去取聊天組的曆史消息,不是自己發的消息,要使用listen去獲取。
SESSION B:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
如果很長時間沒有接收消息,會有很多堆積的。
SESSION B:
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello1';
NOTIFY
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.
SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.
4. 如果有新的小夥伴要加入聊天組,隻能看到加入後大家發的消息,以前的消息是看不到的。
postgres=# listen cnpug;
LISTEN
以前發的消息不會顯示,即使還在隊列裏麵也不會顯示。
5. 退出聊天組
unlisten cnpug;
除此之外,異步消息還可以用於審計,數據複製等場景。
例如tcn插件就是一個類似審計或數據複製的場景。
test=# create table tcndata
test-# (
test(# a int not null,
test(# b date not null,
test(# c text,
test(# primary key (a, b)
test(# );
CREATE TABLE
創建觸發器,當發生dml操作時,調用triggered_change_notification函數發出notify.
test=# create trigger tcndata_tcn_trigger
test-# after insert or update or delete on tcndata
test-# for each row execute procedure triggered_change_notification();
CREATE TRIGGER
監聽tcn通道
test=# listen tcn;
LISTEN
現在你會發現每當執行DML時,我們可以從tcn通道接收到triggered_change_notification函數發出的異步消息。
test=# insert into tcndata values (1, date '2012-12-22', 'one'),
test-# (1, date '2012-12-23', 'another'),
test-# (2, date '2012-12-23', 'two');
INSERT 0 3
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='2',"b"='2012-12-23'" received from server process with PID 22770.
test=# update tcndata set c = 'uno' where a = 1;
UPDATE 2
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
test=# delete from tcndata where a = 1 and b = date '2012-12-22';
DELETE 1
Asynchronous notification "tcn" with payload ""tcndata",D,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
使用這個方法,可以用於審計或數據異步複製。
異步消息的代碼見:
src/backend/commands/async.c
注意允許的隊列長度限製,超出的話會報隊列滿的錯誤。
另外需要注意一條消息的長度,和數據塊的大小接近。
/*-------------------------------------------------------------------------
* Async Notification Model as of 9.0:
*
* 1. Multiple backends on same machine. Multiple backends listening on
* several channels. (Channels are also called "conditions" in other
* parts of the code.)
*
* 2. There is one central queue in disk-based storage (directory pg_notify/),
* with actively-used pages mapped into shared memory by the slru.c module.
* All notification messages are placed in the queue and later read out
* by listening backends.
*
* There is no central knowledge of which backend listens on which channel;
* every backend has its own list of interesting channels.
*
* Although there is only one queue, notifications are treated as being
* database-local; this is done by including the sender's database OID
* in each notification message. Listening backends ignore messages
* that don't match their database OID. This is important because it
* ensures senders and receivers have the same database encoding and won't
* misinterpret non-ASCII text in the channel name or payload string.
*
* Since notifications are not expected to survive database crashes,
* we can simply clean out the pg_notify data at any reboot, and there
* is no need for WAL support or fsync'ing.
*
* 3. Every backend that is listening on at least one channel registers by
* entering its PID into the array in AsyncQueueControl. It then scans all
* incoming notifications in the central queue and first compares the
* database OID of the notification with its own database OID and then
* compares the notified channel with the list of channels that it listens
* to. In case there is a match it delivers the notification event to its
* frontend. Non-matching events are simply skipped.
*
* 4. The NOTIFY statement (routine Async_Notify) stores the notification in
* a backend-local list which will not be processed until transaction end.
*
* Duplicate notifications from the same transaction are sent out as one
* notification only. This is done to save work when for example a trigger
* on a 2 million row table fires a notification for each row that has been
* changed. If the application needs to receive every single notification
* that has been sent, it can easily add some unique string into the extra
* payload parameter.
*
* When the transaction is ready to commit, PreCommit_Notify() adds the
* pending notifications to the head of the queue. The head pointer of the
* queue always points to the next free position and a position is just a
* page number and the offset in that page. This is done before marking the
* transaction as committed in clog. If we run into problems writing the
* notifications, we can still call elog(ERROR, ...) and the transaction
* will roll back.
*
* Once we have put all of the notifications into the queue, we return to
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
* make the actual updates to the effective listen state (listenChannels).
*
* Finally, after we are out of the transaction altogether, we check if
* we need to signal listening backends. In SignalBackends() we scan the
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
* to every listening backend (we don't know which backend is listening on
* which channel so we must signal them all). We can exclude backends that
* are already up to date, though. We don't bother with a self-signal
* either, but just process the queue directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* can call inbound-notify processing immediately if this backend is idle
* (ie, it is waiting for a frontend command and is not within a transaction
* block). Otherwise the handler may only set a flag, which will cause the
* processing to occur just before we next go idle.
*
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or
* the head pointer's position. Then we check if we were the laziest
* backend: if our pointer is set to the same position as the global tail
* pointer is set, then we move the global tail pointer ahead to where the
* second-laziest backend is (in general, we take the MIN of the current
* head position and all active backends' new tail pointers). Whenever we
* move the global tail pointer we also truncate now-unused pages (i.e.,
* delete files in pg_notify/ that are no longer used).
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's
* PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
* frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies.
*
* The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
* can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined
* by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
*-------------------------------------------------------------------------
*/
/*
* Maximum size of a NOTIFY payload, including terminating NULL. This
* must be kept small enough so that a notification message fits on one
* SLRU page. The magic fudge factor here is noncritical as long as it's
* more than AsyncQueueEntryEmptySize --- we make it significantly bigger
* than that, so changes in that data structure won't affect user-visible
* restrictions.
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
/*
* Struct representing an entry in the global notify queue
*
* This struct declaration has the maximal length, but in a real queue entry
* the data area is only big enough for the actual channel and payload strings
* (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
* entry size, if both channel and payload strings are empty (but note it
* doesn't include alignment padding).
*
* The "length" field should always be rounded up to the next QUEUEALIGN
* multiple so that all fields are properly aligned.
*/
typedef struct AsyncQueueEntry
{
int length; /* total allocated length of entry */
Oid dboid; /* sender's database OID */
TransactionId xid; /* sender's XID */
int32 srcPid; /* sender's PID */
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
/*
* slru.c currently assumes that all filenames are four characters of hex
* digits. That means that we can use segments 0000 through FFFF.
* Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
* the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
*
* It's of course possible to enhance slru.c, but this gives us so much
* space already that it doesn't seem worth the trouble.
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
* was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
* SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
*/
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
// src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT 32
[參考]
1. https://www.postgresql.org/docs/devel/static/tcn.html
2. https://www.postgresql.org/docs/9.4/static/libpq-notify.html
3. https://www.postgresql.org/docs/9.4/static/libpq-example.html#LIBPQ-EXAMPLE-2
4. contrib/tcn/tcn.c
5. https://www.postgresql.org/docs/9.4/static/sql-notify.html
6. https://www.postgresql.org/docs/9.4/static/sql-listen.html
7. https://www.postgresql.org/docs/9.4/static/sql-unlisten.html
8. https://www.postgresql.org/docs/9.4/static/contrib-dblink-get-notify.html
9. src/backend/commands/async.c
10. src/include/commands/async.h
最後更新:2017-04-01 13:37:08