模組間的解耦合:發佈/訂閱模型
Observer Pattern 是物件導向常用的架構,例如多個 Chart 與單一 Data Source 的互動,就可以使用 Observer Pattern 設計,好避免資料不同步的問題。而且 Observer Pattern 可以切開 Subject 跟 Observer,讓個別模組的功能更明確,修改副作用更小。
Golang 的哲學是簡單,語言上更強調小元件的延展與復用,例如使用組合取代繼承,使用 Goroutine 取代 Thread 等。本文會講解怎麼使用 channel 來實現 Observer Pattern,或者更現代的說法,實現 Publish/Subscribe 的架構,來建構彼此獨立的模組。
Introduce Observer Pattern
先來對 Observer Pattern 做個解說。依照 GoF 的物件導向經典《Design Pattern》,Observer Pattern 的需求場景是
定義對象間的一種一對多的依賴關係,當一個對象的狀態發生改變時,所有依賴於它的對象都得到通知並被自動更新。
我們可以想像成訂閱報紙的情境,當現在有個新的事件發生,所有有訂閱報紙的讀者,都可以收到最新的事件訊息,讀者可以根據這個訊息來採取反應,例如買賣股票、規劃行程、改變計劃等。報紙的發行人不知道讀者會採取什麼行動,它只負責將消息傳遞給讀者。
如果將 Observer Pattern 用 Class Diagram 來表示,會是
由圖中可以看到,主要分成兩個物件,Subject 知道有誰訂閱,當發生消息時,會 Notify 所有訂閱者,要求它們 Update;Observer 則實作 Update,會由 Subject 中取得最新資料,以供後續使用。將前後順序用 Sequence Diagram 來表示後,就會是
Design Pub/Sub Module
我們的需求是設計一個發佈/訂閱的模組,用於支撐商業邏輯的開發。這個模組類似於中間人的角色,發佈者透過這個模組來發佈訊息,模組也會負責將收到的訊息轉發給訂閱者。
專案架構為
project
├── cmd
├── pkg
│ └── pubsub
│ └── pubsub.go
├── scripts
│ └── build_win.bat
├── go.mod
└── README.md
完整的程式碼如下
package pubsub
import (
"fmt"
"sync"
)
// DataType is data type of message
type DataType string
// Client is a client of pub/sub pattern
type Client struct {
writer chan DataType
readers []chan DataType
mutex sync.Mutex
}
// Pub publish message
func (m *Client) Pub(data DataType) {
m.writer <- data
}
// Sub subscribe message
func (m *Client) Sub(handler func(DataType) error) {
m.mutex.Lock()
defer m.mutex.Unlock()
readChannel := make(chan DataType, 10)
m.readers = append(m.readers, readChannel)
go func() {
for {
data := <-readChannel
if err := handler(data); err != nil {
fmt.Println(err)
}
}
}()
}
// NewClient new a client
func NewClient() *Client {
broker := &Client{
writer: make(chan DataType, 10),
}
go func() {
for {
data := <-broker.writer
for _, reader := range broker.readers {
reader <- data
}
}
}()
return broker
}
一段一段來看。
首先定義訊息格式 DataType,假設為 string,但可依照需求自行定義,如果要傳輸的格式比較複雜,也可以定義成 struct
// DataType is data type of message
type DataType string
定義用戶端,讓創建此用戶端的人,可以進行 pub/sub。這邊用 channel 做為 pub/sub 溝通的管道。
channel 在 Golang 中,類似 Linux 的 pipeline 概念,常用於在兩個不同的 Go routine 間傳遞資料。
writer 這個 channel 用於接受發佈訊息,經由中間人的 Goroutine 後,會將訊息用各訂閱的 channel readers,轉交給訂閱者的 Goroutine
// Client is a client of pub/sub pattern
type Client struct {
writer chan DataType
readers []chan DataType
mutex sync.Mutex
}
實現 pub 的邏輯,當使用者輸入訊息後,將此訊息丟到 channel 中
// Pub publish message
func (m *Client) Pub(data DataType) {
m.writer <- data
}
實現 sub 的邏輯,當使用者訂閱時,創建一個新的 channel,並將它加入 readers 的陣容內,同時啟動 Goroutine,持續監聽這個 channel。如果有任何訊息進來,調用使用者註冊的 handler 來處理這則訊息。
Goroutine 是 Golang 的最大特色,類似其他語言中的 Thread,中文翻成協程,相對 Thread 輕量,適合用在高併發的場景。它的底層對應到內部的 scheduler,會根據當前的狀況來決定調用哪個 Goroutine。我們利用 Go routine 來實現 Publish/Subscribe 的架構,可以更有效率處理訂閱問題。
使用 Goroutine 只要用關鍵字 go 即可
// Sub subscribe message
func (m *Client) Sub(handler func(DataType) error) {
m.mutex.Lock()
defer m.mutex.Unlock()
readChannel := make(chan DataType, 10)
m.readers = append(m.readers, readChannel)
go func() {
for {
data := <-readChannel
if err := handler(data); err != nil {
fmt.Println(err)
}
}
}()
}
接著建立 writer 與 readers 間的轉發關係。
當 Client 初始化時,建立 writer,同時用 Goroutine 來監看 writer,如果 writer 內有任何的訊息,Goroutine 會遍歷 readers,將消息轉發給 readers。
// NewClient new a client
func NewClient() *Client {
broker := &Client{
writer: make(chan DataType, 10),
}
go func() {
for {
data := <-broker.writer
for _, reader := range broker.readers {
reader <- data
}
}
}()
return broker
}
Step 3: Use Pub/Sub Module
完成模組開發後,再來就是使用模組了,新增使用的主程式
project
├── cmd
│ └── pubsub
│ └── main.go
├── pkg
│ └── pubsub
│ └── pubsub.go
├── scripts
│ └── build_win.bat
├── go.mod
└── README.md
內容是
package main
import (
"errors"
"example/pkg/pubsub"
"fmt"
"time"
)
func main() {
client := pubsub.NewClient()
var printMessage func(pubsub.DataType) error
printMessage = func(msg pubsub.DataType) error {
if msg == "error" {
return errors.New("this is an error")
}
fmt.Println(msg)
return nil
}
client.Sub(printMessage)
client.Pub("Hello")
client.Pub("error")
time.Sleep(time.Second)
}
使用模組內的初始化函式(因為 Golang 不是物件導向語言,沒有建構式),來取得要使用的 client
client := pubsub.NewClient()
建立 handler 做為訂閱時的 callback function,當訂閱的訊息出現時,調用這個 handler 來處理。這裡建立的 handler 會判斷訊息是不是 error 這個字串,如果是的話,回傳錯誤訊息,否則正常印出訊息
var printMessage func(pubsub.DataType) error
printMessage = func(msg pubsub.DataType) error {
if msg == "error" {
return errors.New("this is an error")
}
fmt.Println(msg)
return nil
}
最後訂閱與發佈訊息
用 Sub 註冊訂閱用的 handler;用 Pub 發佈訊息。發佈的訊息有兩則,第一則是正常訊息,內容是 Hello;第二則是 error 訊息,如果正常運行的話,會讓 handler 回傳錯誤
client.Sub(printMessage)
client.Pub("Hello")
client.Pub("error")
time.Sleep(time.Second)
實際執行
D:\git\golang-project>.\bin\pubsub.exe
Hello
this is an error
得到訊息內容跟 error!
Multiple Topic
在前面的設計中,已經可以進行發佈跟訂閱了,但如果想要訂閱多個主題,就需要建立多個 client,用起來很麻煩,這衍生出新的需求:我們需要擴充原來的介面,使它可以支持多主題訂閱
回去修改模組為
package pubsub
import (
"fmt"
"sync"
)
// DataType is data type of message
type DataType string
// MessageChannel is a channel of pub/sub pattern
type MessageChannel struct {
writer chan DataType
readers []chan DataType
mutex sync.Mutex
}
// Client is client of pub/sub pattern
type Client struct {
topic map[string]*MessageChannel
}
// Pub publish message
func (m *Client) Pub(topic string, data DataType) {
m.topic[topic].writer <- data
}
// Sub subscribe message
func (m *Client) Sub(topic string, handler func(DataType) error) {
m.topic[topic].mutex.Lock()
defer m.topic[topic].mutex.Unlock()
readChannel := make(chan DataType, 10)
m.topic[topic].readers = append(m.topic[topic].readers, readChannel)
go func() {
for {
data := <-readChannel
if err := handler(data); err != nil {
fmt.Println(err)
}
}
}()
}
// AddTopic publish message
func (m *Client) AddTopic(topic string) {
m.topic[topic] = &MessageChannel{
writer: make(chan DataType, 10),
}
go func() {
for {
data := <-m.topic[topic].writer
for _, reader := range m.topic[topic].readers {
reader <- data
}
}
}()
}
// NewClient new a client
func NewClient() *Client {
client := &Client{
topic: make(map[string]*MessageChannel),
}
return client
}
在原來的架構上,再抽象一層,重新命名原來的 Client 為 MessageChannel,負責各 Topic 的實際執行。上面則建立新的 Client,內部是一個 map,可以放置多個 MessageChannel,好實現多主題的訂閱
// MessageChannel is a channel of pub/sub pattern
type MessageChannel struct {
writer chan DataType
readers []chan DataType
mutex sync.Mutex
}
// Client is client of pub/sub pattern
type Client struct {
topic map[string]*MessageChannel
}
當 Pub/Sub 時,會取出對應 Topic 的 MessageChannel,進行 Pub/Sub
// Pub publish message
func (m *Client) Pub(topic string, data DataType) {
m.topic[topic].writer <- data
}
// Sub subscribe message
func (m *Client) Sub(topic string, handler func(DataType) error) {
m.topic[topic].mutex.Lock()
defer m.topic[topic].mutex.Unlock()
readChannel := make(chan DataType, 10)
m.topic[topic].readers = append(m.topic[topic].readers, readChannel)
go func() {
for {
data := <-readChannel
if err := handler(data); err != nil {
fmt.Println(err)
}
}
}()
}
新增 AddTopic,當使用者要使用新的 Topic 時,會在內部建立轉發機制
// AddTopic publish message
func (m *Client) AddTopic(topic string) {
m.topic[topic] = &MessageChannel{
writer: make(chan DataType, 10),
}
go func() {
for {
data := <-m.topic[topic].writer
for _, reader := range m.topic[topic].readers {
reader <- data
}
}
}()
}
同時修改使用的程式碼 main.go,改為
func main() {
client := pubsub.NewClient()
client.AddTopic("hello")
client.AddTopic("echo")
var printMessage = func(msg pubsub.DataType) error {
if msg == "error" {
return errors.New("This is an error")
}
fmt.Println(msg)
return nil
}
var echoMessage = func(msg pubsub.DataType) error {
fmt.Println(msg + " nice to meet you!")
return nil
}
client.Sub("hello", printMessage)
client.Sub("echo", echoMessage)
client.Pub("hello", "Hello")
client.Pub("hello", "error")
client.Pub("echo", "Go")
time.Sleep(time.Second)
}
在這個例子中,對 hello、echo 兩個 topic 進行訂閱
client.AddTopic("hello")
client.AddTopic("echo")
兩個訂閱有不同的 handler。echo 除了印出訊息外,也副加其他內容
var echoMessage = func(msg pubsub.DataType) error {
fmt.Println(msg + " nice to meet you!")
return nil
}
對兩個 Topic 進行發佈/訂閱
client.Sub("hello", printMessage)
client.Sub("echo", echoMessage)
client.Pub("hello", "Hello")
client.Pub("hello", "error")
client.Pub("echo", "Go")
確認成果
Go nice to meet you!
Hello
This is an error
小結
用 Golang 完成 Pub/Sub 模型後,可以回去跟標準的 Observer Pattern 比較。兩者的概念是類似的,中間都有一段程式碼負責轉發訊息,在 Observer Pattern 是 Notify,在我們的設計中,是 AddTopic 的 Goroutine。
Observer Pattern 在設計上,是由發佈者調用 Subject 的函式 SetState 來發佈訊息,再由各訂閱者自行調用 Subject 的函式來獲得訂閱的訊息,Subject 在中間只充當通知的角色。整個動作是由發佈者發起,並在發佈者的 thread 中執行,如果想要併發的效果,則需要另外設計。
Golang 由於有 Goroutine 跟 channel,在設計上可以更精巧,發佈者與訂閱者都有各自獨立的 Goroutine 在處理,效率高得多;而且因為有 channel 的存在,使得 Goroutine 推送或接收訊息時,無須理會其他模組的執行狀況,即使有 Fail 的情形,也不會影響到其他模組。簡單講,因為併發實現的成本降低了,可以更容易享受到併發的優勢。
在設計處理上,Golang 似乎認為 OOP 過早強調設計,導致編寫程式碼時,都需要先定義類別,這樣會讓程式碼因為類別而硬化,重構時需要反覆對類別進行 Push Up 跟 Push Down,有看過 Martin Fowler 的《Refactoring》應該會很有感,整本書的精神就在講如何重新設計類別。
Golang 的概念更偏向使用時才設計,例如處理函式不用放在事先定義的 Update 中,而是訂閱時才有的 echoMessage,某程度上,Golang 不強調模型,而是強調使用。