閱讀723 返回首頁    go 阿裏雲 go 技術社區[雲棲]


使用PostgreSQL_Notify實現多實例緩存同步

Parallel與Hierarchy是架構設計的兩大法寶,**緩存**是Hierarchy在IO領域的體現。單線程場景下緩存機製的實現可以簡單到不可思議,但很難想象成熟的應用會隻有一個實例。在使用緩存的同時引入並發,就不得不考慮一個問題:如何保證每個實例的緩存與底層數據副本的數據一致性。

分布式係統受到CAP定理的約束,分區一致性P是一般來說是不允許犧牲的,不可能讓兩個實例對同樣的請求卻給出不同的結果。用緩存是為了更好的性能,所以如果還要追求可用性A,就一定會犧牲C。我們能做的,就是通過巧妙設計讓AP係統的一致性損失最小化。

傳統方法

最簡單粗暴的辦法就是定時重新拉取,例如每個整點,所有應用一起去數據庫拉取一次最新版本的數據。很多應用都是這麼做的。當然問題也很多:拉的間隔長了,變更不能及時應用,用戶體驗差;拉的頻繁了,IO壓力大。而且實例數目和數據大小一旦膨脹起來,對於寶貴的IO資源是很大的浪費。

異步通知是一種更好的辦法,尤其是在讀請求遠多於寫請求的情況下。接受到寫請求的實例,通過發送廣播的方式通知其他實例。RedisPubSub就可以很好地實現這個功能。如果原本下層存儲就是Redis自然是再方便不過,但如果下層存儲是關係型數據庫的話,為這樣一個功能引入一個新的組件似乎有些得不償失。況且考慮到後台管理程序或者其他應用如果在修改了數據庫後也要去redis發布通知,實在太麻煩了。一種可行的辦法是通過數據庫中間件來監聽RDS變動並廣播通知,淘寶不少東西就是這麼做的。但如果DB本身就能搞定的事情,為什麼要加一個中間件呢?通過PostgreSQL的Notfiy-Listen機製,可以方便地實現這種功能。

目標

無論從任何渠道產生的數據庫記錄變更(增刪改)都能被所有相關應用實時感知,用於維護自身緩存與數據庫內容的一致性。

原理

PostgreSQL行級觸發器 + Notify機製 + 自定義協議 + Smart Client

  • 行級觸發器:通過為我們感興趣的表建立一個行級別的寫觸發器,對數據表中的每一行記錄的Update,Delete,Insert都會出發自定義函數的執行。
  • Notify:通過PostgreSQL內建的異步通知機製向指定的Channel發送通知
  • 自定義協議:協商消息格式,傳遞操作的類型與變更記錄的標識
  • Smart Client:客戶端監聽消息變更,根據消息對緩存執行相應的操作。

實際上這樣一套東西就是一個超簡易的WAL(Write After Log)實現,從而使應用內部的緩存狀態能與數據庫保持*實時*一致(compare to poll)。

實現

DDL

這裏以一個最簡單的表作為示例,一張以主鍵標識的users表。

-- 用戶表
CREATE TABLE users (
  id   TEXT,
  name TEXT,
  PRIMARY KEY (id)
);

觸發器

-- 通知觸發器
CREATE OR REPLACE FUNCTION notify_change() RETURNS TRIGGER AS $$
BEGIN
  IF    (TG_OP = 'INSERT') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'I' || NEW.id); RETURN NEW;
  ELSIF (TG_OP = 'UPDATE') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'U' || NEW.id); RETURN NEW;
  ELSIF (TG_OP = 'DELETE') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'D' || OLD.id); RETURN OLD;
  END IF;
END; $$ LANGUAGE plpgsql SECURITY DEFINER;

這裏創建了一個觸發器函數,通過內置變量TG_OP獲取操作的名稱,TG_RELNAME獲取表名。每當觸發器執行時,它會向名為<table_name>_chan的通道發送指定格式的消息:[I|U|D]<id>

題外話:通過行級觸發器,還可以實現一些很實用的功能,例如In-DB Audit,自動更新字段值,統計信息,自定義備份策略與回滾邏輯等。

-- 為用戶表創建行級觸發器,監聽INSERT UPDATE DELETE 操作。
CREATE TRIGGER t_user_notify AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE PROCEDURE notify_change();

創建觸發器也很簡單,表級觸發器對每次表變更執行一次,而行級觸發器對每條記錄都會執行一次。這樣,數據庫的裏的工作就算全部完成了。

消息格式

通知需要傳達出兩個信息:變更的操作類型,變更的實體標記。

  • 變更的操作類型就是增刪改:INSERT,DELETE,UPDATE。通過一個打頭的字符'[I|U|D]'就可以標識。
  • 變更的對象可以通過實體主鍵來標識。如果不是字符串類型,還需要確定一種無歧義的序列化方式。

這裏為了省事直接使用字符串類型作為ID,那麼插入一條id=1的記錄,對應的消息就是I1,更新一條id=5的記錄消息就是U5,刪除id=3的記錄消息就是D3

完全可以通過更複雜的消息協議實現更強大的功能。

SmartClient

數據庫的機製需要客戶端的配合才能生效,客戶端需要監聽數據庫的變更通知,才能將變更實時應用到自己的緩存副本中。對於插入和更新,客戶端需要根據ID重新拉取相應實體,對於刪除,客戶端需要刪除自己緩存副本的相應實體。以Go語言為例,編寫了一個簡單的客戶端模塊。

本例中使用一個以User.ID作為鍵,User對象作為值的並發安全字典Users sync.Map作為緩存。

作為演示,啟動了另一個goroutine對數據庫寫入了一些變更。

package main

import "sync"
import "strings"
import "github.com/go-pg/pg"
import . "github.com/Vonng/gopher/db/pg"
import log "github.com/Sirupsen/logrus"

type User struct {
    ID   string `sql:",pk"`
    Name string
}

// Users 內部數據緩存
var Users sync.Map 

// 輔助函數:加載全部用戶,初始化時使用
func LoadAllUser() {
    var users []User
    Pg.Query(&users, `SELECT ID,name FROM users;`)
    for _, user := range users {
        Users.Store(user.ID, user)
    }
}

// 輔助函數:根據ID重載單個用戶,當插入和更新時執行
func LoadUser(id string) {
    user := User{ID: id}
    Pg.Select(&user)
    Users.Store(user.ID, user)
}

// 打印緩存內部的Key列表
func PrintUsers() string {
    var buf []string
    Users.Range(func(key, value interface{}) bool {
        buf = append(buf, key.(string));
        return true
    })
    return strings.Join(buf, ",")
}

// ListenUserChange 會監聽PostgreSQL users數據表中的變動通知,並維護緩存狀態
func ListenUserChange() {
    go func(c <-chan *pg.Notification) {
        for notify := range c {
            action, id := notify.Payload[0], notify.Payload[1:]
            switch action {
            case 'I': fallthrough
            case 'U': LoadUser(id);
            case 'D': Users.Delete(id)
            }
            log.Infof("[NOTIFY] Action:%c ID:%s Users: %s", action, id, PrintUsers())
        }
    }(Pg.Listen("users_chan").Channel())
}

// MakeSomeChange 會向數據庫寫入一些變更
func MakeSomeChange() {
    Pg.Exec(`TRUNCATE TABLE users;`)
    Pg.Insert(&User{"001", "張三"})
    Pg.Insert(&User{"002", "李四"})
    Pg.Insert(&User{"003", "王五"})  // 插入
    Pg.Update(&User{"003", "王麻子"}) // 改名
    Pg.Delete(&User{ID: "002"})    // 刪除
}

func main() {
    LoadAllUser()
    ListenUserChange()
    go MakeSomeChange()
    <-make(chan struct{})
}

運行結果如下:

[NOTIFY] Action:I ID:001 Users: 001          
[NOTIFY] Action:I ID:002 Users: 001,002      
[NOTIFY] Action:I ID:003 Users: 002,003,001  
[NOTIFY] Action:U ID:003 Users: 001,002,003  
[NOTIFY] Action:D ID:002 Users: 001,003      

可以看出,緩存確是與數據庫保持了同樣的狀態。

應用場景

讀遠大於寫的場景。

最後更新:2017-08-13 22:33:04

  上一篇:go  RDS SQL Server - 專題分享 - 巧用執行計劃緩存之執行計劃編譯
  下一篇:go  RFID技術將給飲料零售行業帶來很多利益