模組間的解耦合:發佈/訂閱模型

Observer Pattern 是物件導向常用的架構,例如多個 Chart 與單一 Data Source 的互動,就可以使用 Observer Pattern 設計,好避免資料不同步的問題。而且 Observer Pattern 可以切開 Subject 跟 Observer,讓個別模組的功能更明確,修改副作用更小。

Golang 的哲學是簡單,語言上更強調小元件的延展與復用,例如使用組合取代繼承,使用 Goroutine 取代 Thread 等。本文會講解怎麼使用 channel 來實現 Observer Pattern,或者更現代的說法,實現 Publish/Subscribe 的架構,來建構彼此獨立的模組。

Introduce Observer Pattern

先來對 Observer Pattern 做個解說。依照 GoF 的物件導向經典《Design Pattern》,Observer Pattern 的需求場景是

定義對象間的一種一對多的依賴關係,當一個對象的狀態發生改變時,所有依賴於它的對象都得到通知並被自動更新。

我們可以想像成訂閱報紙的情境,當現在有個新的事件發生,所有有訂閱報紙的讀者,都可以收到最新的事件訊息,讀者可以根據這個訊息來採取反應,例如買賣股票、規劃行程、改變計劃等。報紙的發行人不知道讀者會採取什麼行動,它只負責將消息傳遞給讀者。

如果將 Observer Pattern 用 Class Diagram 來表示,會是

由圖中可以看到,主要分成兩個物件,Subject 知道有誰訂閱,當發生消息時,會 Notify 所有訂閱者,要求它們 Update;Observer 則實作 Update,會由 Subject 中取得最新資料,以供後續使用。將前後順序用 Sequence Diagram 來表示後,就會是

Design Pub/Sub Module

我們的需求是設計一個發佈/訂閱的模組,用於支撐商業邏輯的開發。這個模組類似於中間人的角色,發佈者透過這個模組來發佈訊息,模組也會負責將收到的訊息轉發給訂閱者。

專案架構為

project
├── cmd
├── pkg
│   └── pubsub
│       └── pubsub.go
├── scripts
│   └── build_win.bat
├── go.mod
└── README.md

完整的程式碼如下

package pubsub

import (
    "fmt"
    "sync"
)

// DataType is data type of message
type DataType string

// Client is a client of pub/sub pattern
type Client struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

// Pub publish message
func (m *Client) Pub(data DataType) {
    m.writer <- data
}

// Sub subscribe message
func (m *Client) Sub(handler func(DataType) error) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.readers = append(m.readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

// NewClient new a client
func NewClient() *Client {
    broker := &Client{
        writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-broker.writer
            for _, reader := range broker.readers {
                reader <- data
            }
        }
    }()
    return broker
}

一段一段來看。

首先定義訊息格式 DataType,假設為 string,但可依照需求自行定義,如果要傳輸的格式比較複雜,也可以定義成 struct

// DataType is data type of message
type DataType string

定義用戶端,讓創建此用戶端的人,可以進行 pub/sub。這邊用 channel 做為 pub/sub 溝通的管道。

channel 在 Golang 中,類似 Linux 的 pipeline 概念,常用於在兩個不同的 Go routine 間傳遞資料。

https://www.slideshare.net/ssuser9ebf46/golang-101
https://www.slideshare.net/ssuser9ebf46/golang-101

writer 這個 channel 用於接受發佈訊息,經由中間人的 Goroutine 後,會將訊息用各訂閱的 channel readers,轉交給訂閱者的 Goroutine

// Client is a client of pub/sub pattern
type Client struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

實現 pub 的邏輯,當使用者輸入訊息後,將此訊息丟到 channel 中

// Pub publish message
func (m *Client) Pub(data DataType) {
    m.writer <- data
}

實現 sub 的邏輯,當使用者訂閱時,創建一個新的 channel,並將它加入 readers 的陣容內,同時啟動 Goroutine,持續監聽這個 channel。如果有任何訊息進來,調用使用者註冊的 handler 來處理這則訊息。

Goroutine 是 Golang 的最大特色,類似其他語言中的 Thread,中文翻成協程,相對 Thread 輕量,適合用在高併發的場景。它的底層對應到內部的 scheduler,會根據當前的狀況來決定調用哪個 Goroutine。我們利用 Go routine 來實現 Publish/Subscribe 的架構,可以更有效率處理訂閱問題。

使用 Goroutine 只要用關鍵字 go 即可

// Sub subscribe message
func (m *Client) Sub(handler func(DataType) error) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.readers = append(m.readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

接著建立 writer 與 readers 間的轉發關係。

當 Client 初始化時,建立 writer,同時用 Goroutine 來監看 writer,如果 writer 內有任何的訊息,Goroutine 會遍歷 readers,將消息轉發給 readers。

// NewClient new a client
func NewClient() *Client {
    broker := &Client{
    writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-broker.writer
            for _, reader := range broker.readers {
                reader <- data
            }
        }
    }()
    return broker
}

Step 3: Use Pub/Sub Module

完成模組開發後,再來就是使用模組了,新增使用的主程式

project
├── cmd
│   └── pubsub
│       └── main.go
├── pkg
│   └── pubsub
│       └── pubsub.go
├── scripts
│   └── build_win.bat
├── go.mod
└── README.md

內容是

package main

import (
    "errors"
    "example/pkg/pubsub"
    "fmt"
    "time"
)

func main() {
    client := pubsub.NewClient()
    var printMessage func(pubsub.DataType) error
    printMessage = func(msg pubsub.DataType) error {
        if msg == "error" {
            return errors.New("this is an error")
        }
        fmt.Println(msg)
        return nil
    }
    client.Sub(printMessage)
    client.Pub("Hello")
    client.Pub("error")
    time.Sleep(time.Second)
}

使用模組內的初始化函式(因為 Golang 不是物件導向語言,沒有建構式),來取得要使用的 client

client := pubsub.NewClient()

建立 handler 做為訂閱時的 callback function,當訂閱的訊息出現時,調用這個 handler 來處理。這裡建立的 handler 會判斷訊息是不是 error 這個字串,如果是的話,回傳錯誤訊息,否則正常印出訊息

var printMessage func(pubsub.DataType) error
printMessage = func(msg pubsub.DataType) error {
    if msg == "error" {
        return errors.New("this is an error")
    }
    fmt.Println(msg)
    return nil
}

最後訂閱與發佈訊息

用 Sub 註冊訂閱用的 handler;用 Pub 發佈訊息。發佈的訊息有兩則,第一則是正常訊息,內容是 Hello;第二則是 error 訊息,如果正常運行的話,會讓 handler 回傳錯誤

client.Sub(printMessage)
client.Pub("Hello")
client.Pub("error")
time.Sleep(time.Second)

實際執行

D:\git\golang-project>.\bin\pubsub.exe
Hello
this is an error

得到訊息內容跟 error!

Multiple Topic

在前面的設計中,已經可以進行發佈跟訂閱了,但如果想要訂閱多個主題,就需要建立多個 client,用起來很麻煩,這衍生出新的需求:我們需要擴充原來的介面,使它可以支持多主題訂閱

回去修改模組為

package pubsub

import (
    "fmt"
    "sync"
)

// DataType is data type of message
type DataType string

// MessageChannel is a channel of pub/sub pattern
type MessageChannel struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

// Client is client of pub/sub pattern
type Client struct {
    topic map[string]*MessageChannel
}

// Pub publish message
func (m *Client) Pub(topic string, data DataType) {
    m.topic[topic].writer <- data
}

// Sub subscribe message
func (m *Client) Sub(topic string, handler func(DataType) error) {
    m.topic[topic].mutex.Lock()
    defer m.topic[topic].mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.topic[topic].readers = append(m.topic[topic].readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

// AddTopic publish message
func (m *Client) AddTopic(topic string) {
    m.topic[topic] = &MessageChannel{
        writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-m.topic[topic].writer
            for _, reader := range m.topic[topic].readers {
                reader <- data
            }
        }
    }()
}

// NewClient new a client
func NewClient() *Client {
    client := &Client{
        topic: make(map[string]*MessageChannel),
    }
    return client
}

在原來的架構上,再抽象一層,重新命名原來的 Client 為 MessageChannel,負責各 Topic 的實際執行。上面則建立新的 Client,內部是一個 map,可以放置多個 MessageChannel,好實現多主題的訂閱

// MessageChannel is a channel of pub/sub pattern
type MessageChannel struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

// Client is client of pub/sub pattern
type Client struct {
    topic map[string]*MessageChannel
}

當 Pub/Sub 時,會取出對應 Topic 的 MessageChannel,進行 Pub/Sub

// Pub publish message
func (m *Client) Pub(topic string, data DataType) {
    m.topic[topic].writer <- data
}

// Sub subscribe message
func (m *Client) Sub(topic string, handler func(DataType) error) {
    m.topic[topic].mutex.Lock()
    defer m.topic[topic].mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.topic[topic].readers = append(m.topic[topic].readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

新增 AddTopic,當使用者要使用新的 Topic 時,會在內部建立轉發機制

// AddTopic publish message
func (m *Client) AddTopic(topic string) {
    m.topic[topic] = &MessageChannel{
        writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-m.topic[topic].writer
            for _, reader := range m.topic[topic].readers {
                reader <- data
            }
        }
    }()
}

同時修改使用的程式碼 main.go,改為

func main() {
    client := pubsub.NewClient()
    client.AddTopic("hello")
    client.AddTopic("echo")
    var printMessage = func(msg pubsub.DataType) error {
        if msg == "error" {
            return errors.New("This is an error")
        }
        fmt.Println(msg)
        return nil
    }
    var echoMessage = func(msg pubsub.DataType) error {
        fmt.Println(msg + " nice to meet you!")
        return nil
    }
    client.Sub("hello", printMessage)
    client.Sub("echo", echoMessage)
    client.Pub("hello", "Hello")
    client.Pub("hello", "error")
    client.Pub("echo", "Go")
    time.Sleep(time.Second)
}

在這個例子中,對 hello、echo 兩個 topic 進行訂閱

client.AddTopic("hello")
client.AddTopic("echo")

兩個訂閱有不同的 handler。echo 除了印出訊息外,也副加其他內容

var echoMessage = func(msg pubsub.DataType) error {
    fmt.Println(msg + " nice to meet you!")
    return nil
}

對兩個 Topic 進行發佈/訂閱

client.Sub("hello", printMessage)
client.Sub("echo", echoMessage)
client.Pub("hello", "Hello")
client.Pub("hello", "error")
client.Pub("echo", "Go")

確認成果

Go nice to meet you!
Hello
This is an error

小結

用 Golang 完成 Pub/Sub 模型後,可以回去跟標準的 Observer Pattern 比較。兩者的概念是類似的,中間都有一段程式碼負責轉發訊息,在 Observer Pattern 是 Notify,在我們的設計中,是 AddTopic 的 Goroutine。

Observer Pattern 在設計上,是由發佈者調用 Subject 的函式 SetState 來發佈訊息,再由各訂閱者自行調用 Subject 的函式來獲得訂閱的訊息,Subject 在中間只充當通知的角色。整個動作是由發佈者發起,並在發佈者的 thread 中執行,如果想要併發的效果,則需要另外設計。

Golang 由於有 Goroutine 跟 channel,在設計上可以更精巧,發佈者與訂閱者都有各自獨立的 Goroutine 在處理,效率高得多;而且因為有 channel 的存在,使得 Goroutine 推送或接收訊息時,無須理會其他模組的執行狀況,即使有 Fail 的情形,也不會影響到其他模組。簡單講,因為併發實現的成本降低了,可以更容易享受到併發的優勢。

在設計處理上,Golang 似乎認為 OOP 過早強調設計,導致編寫程式碼時,都需要先定義類別,這樣會讓程式碼因為類別而硬化,重構時需要反覆對類別進行 Push Up 跟 Push Down,有看過 Martin Fowler 的《Refactoring》應該會很有感,整本書的精神就在講如何重新設計類別。

Golang 的概念更偏向使用時才設計,例如處理函式不用放在事先定義的 Update 中,而是訂閱時才有的 echoMessage,某程度上,Golang 不強調模型,而是強調使用。

Reference

Read more

Weekly Issue 第 22 期:Google 發布 Nano Banana Pro

最近大新聞要算 Cloudflare 出問題,以及 Google 發布新的 AI 模型。新的 Nano Banana Pro 不管在一致性還是文字呈現,都出乎意料地好。如果 Google 真的能在這場 AI 大戰中笑到最後,這一定會成為商業競爭的經典案例。 🗞️ 熱門新聞 How we’re bringing AI image verification to the Gemini app Google 幾天前發布的 AI 模型太強了,各種錦上添花的稱讚就不說了,在 Simon Willison 的 Blog 看到,Google 設計出防偽機制,避免假圖到處跑。 機制有兩種,一種是在生成的內容中,插入人眼不可辨識的 SynthID,

By Ken Chen

Weekly Issue 第 21 期:JetBrains 發表 2025 Go 生態系調查

最近在讀 Tony Fadell 的 "Build",作者曾經參與過 iPhone 的開發,各種經驗談讓人嘆為觀止,例如這段:「如果故事有某個部分銜接不上,那麼產品本身也會有某個地方行不通…這便是為什麼最後 iPhone 的表面是玻璃,而不是塑膠,以及為什麼 iPhone 沒有硬體鍵盤。」 好在哪呢?好在如果能掌握這個觀念,就能知道如何「閱讀」產品,看見一個產品,就像閱讀一則故事一樣,知道它的抑揚頓挫,知道它想表現的東西。我相信每個經歷過產品開發的人,看這本書都會很有感覺。   🗞️ 熱門新聞 The Go Ecosystem in 2025: Key Trends in Frameworks, Tools, and Developer Practices JetBrains 前陣子公布 Go 生態系的調查結果。

By Ken Chen

Weekly Issue 第 20 期:AI 泡沫的遺產

2000 年的 .com 泡沫雖然造成嚴重的經濟問題,但也給後續的網路世代留下豐富的遺產。我們現在使用的網路基礎建設,很多是因為泡沫的原因,才能一次性投資到位。而當下經歷的 AI 浪潮,在時間過去後,又會給我們留下什麼遺產呢? 🗞️ 熱門新聞 The Benefits of Bubbles 我看 Ben Thompson 的文章通常會有兩種感受,負面是他太囉唆了,把簡單的觀念講得太長(儘管容易懂),而正面是他的觀點一向很有創造性。 這篇也是,前陣子看到有篇談 AI 泡沫後,什麼都不會留下,因為 GPU 很快會隨著時間折舊掉。我持保留態度,我認為重點不僅是 GPU(正如我認為 .com 泡沫的重點不是 CPU),還有其他的東西,至於是什麼,我沒想到。 BT 認為是晶圓製造與電力,It's amazing,

By Ken Chen

Weekly Issue 第 19 期:Coursera 的預覽模式宣告 MOOC 終結

我有時會上課程網站買課,特別是國外的網站,有些課程內容品質高,而且還能無價體驗,我常常在想這在商業上怎麼行得通。Coursera 最近推出預覽功能,某方面來說,也是在宣告長期要往付費走。 網路最大的特點是開放,因為開放,我們看到不可思議的成長,也因為開放,我們有時會很惋惜理想的落幕。 🗞️ 熱門新聞 The Day MOOCs Truly Died: Coursera's Preview Mode Kills Free Learning 很有趣的一篇新聞:Coursera 的預覽模式給了 MOOC 最後一擊。 我對 Coursera 的商業模式不熟,看起來它之前是靠證書與服務營利。很難想像線上課程能用免費支撐這麼久,這幾乎是公益了,將內容鎖在付費牆後比較像可理解的商業行為。 讓我困惑的是,這些年 Coursera 是如何獲利?以及,當時投資人對它的想像是什麼? The PSF has withdrawn

By Ken Chen