模組間的解耦合:發佈/訂閱模型

Observer Pattern 是物件導向常用的架構,例如多個 Chart 與單一 Data Source 的互動,就可以使用 Observer Pattern 設計,好避免資料不同步的問題。而且 Observer Pattern 可以切開 Subject 跟 Observer,讓個別模組的功能更明確,修改副作用更小。

Golang 的哲學是簡單,語言上更強調小元件的延展與復用,例如使用組合取代繼承,使用 Goroutine 取代 Thread 等。本文會講解怎麼使用 channel 來實現 Observer Pattern,或者更現代的說法,實現 Publish/Subscribe 的架構,來建構彼此獨立的模組。

Introduce Observer Pattern

先來對 Observer Pattern 做個解說。依照 GoF 的物件導向經典《Design Pattern》,Observer Pattern 的需求場景是

定義對象間的一種一對多的依賴關係,當一個對象的狀態發生改變時,所有依賴於它的對象都得到通知並被自動更新。

我們可以想像成訂閱報紙的情境,當現在有個新的事件發生,所有有訂閱報紙的讀者,都可以收到最新的事件訊息,讀者可以根據這個訊息來採取反應,例如買賣股票、規劃行程、改變計劃等。報紙的發行人不知道讀者會採取什麼行動,它只負責將消息傳遞給讀者。

如果將 Observer Pattern 用 Class Diagram 來表示,會是

由圖中可以看到,主要分成兩個物件,Subject 知道有誰訂閱,當發生消息時,會 Notify 所有訂閱者,要求它們 Update;Observer 則實作 Update,會由 Subject 中取得最新資料,以供後續使用。將前後順序用 Sequence Diagram 來表示後,就會是

Design Pub/Sub Module

我們的需求是設計一個發佈/訂閱的模組,用於支撐商業邏輯的開發。這個模組類似於中間人的角色,發佈者透過這個模組來發佈訊息,模組也會負責將收到的訊息轉發給訂閱者。

專案架構為

project
├── cmd
├── pkg
│   └── pubsub
│       └── pubsub.go
├── scripts
│   └── build_win.bat
├── go.mod
└── README.md

完整的程式碼如下

package pubsub

import (
    "fmt"
    "sync"
)

// DataType is data type of message
type DataType string

// Client is a client of pub/sub pattern
type Client struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

// Pub publish message
func (m *Client) Pub(data DataType) {
    m.writer <- data
}

// Sub subscribe message
func (m *Client) Sub(handler func(DataType) error) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.readers = append(m.readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

// NewClient new a client
func NewClient() *Client {
    broker := &Client{
        writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-broker.writer
            for _, reader := range broker.readers {
                reader <- data
            }
        }
    }()
    return broker
}

一段一段來看。

首先定義訊息格式 DataType,假設為 string,但可依照需求自行定義,如果要傳輸的格式比較複雜,也可以定義成 struct

// DataType is data type of message
type DataType string

定義用戶端,讓創建此用戶端的人,可以進行 pub/sub。這邊用 channel 做為 pub/sub 溝通的管道。

channel 在 Golang 中,類似 Linux 的 pipeline 概念,常用於在兩個不同的 Go routine 間傳遞資料。

https://www.slideshare.net/ssuser9ebf46/golang-101
https://www.slideshare.net/ssuser9ebf46/golang-101

writer 這個 channel 用於接受發佈訊息,經由中間人的 Goroutine 後,會將訊息用各訂閱的 channel readers,轉交給訂閱者的 Goroutine

// Client is a client of pub/sub pattern
type Client struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

實現 pub 的邏輯,當使用者輸入訊息後,將此訊息丟到 channel 中

// Pub publish message
func (m *Client) Pub(data DataType) {
    m.writer <- data
}

實現 sub 的邏輯,當使用者訂閱時,創建一個新的 channel,並將它加入 readers 的陣容內,同時啟動 Goroutine,持續監聽這個 channel。如果有任何訊息進來,調用使用者註冊的 handler 來處理這則訊息。

Goroutine 是 Golang 的最大特色,類似其他語言中的 Thread,中文翻成協程,相對 Thread 輕量,適合用在高併發的場景。它的底層對應到內部的 scheduler,會根據當前的狀況來決定調用哪個 Goroutine。我們利用 Go routine 來實現 Publish/Subscribe 的架構,可以更有效率處理訂閱問題。

使用 Goroutine 只要用關鍵字 go 即可

// Sub subscribe message
func (m *Client) Sub(handler func(DataType) error) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.readers = append(m.readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

接著建立 writer 與 readers 間的轉發關係。

當 Client 初始化時,建立 writer,同時用 Goroutine 來監看 writer,如果 writer 內有任何的訊息,Goroutine 會遍歷 readers,將消息轉發給 readers。

// NewClient new a client
func NewClient() *Client {
    broker := &Client{
    writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-broker.writer
            for _, reader := range broker.readers {
                reader <- data
            }
        }
    }()
    return broker
}

Step 3: Use Pub/Sub Module

完成模組開發後,再來就是使用模組了,新增使用的主程式

project
├── cmd
│   └── pubsub
│       └── main.go
├── pkg
│   └── pubsub
│       └── pubsub.go
├── scripts
│   └── build_win.bat
├── go.mod
└── README.md

內容是

package main

import (
    "errors"
    "example/pkg/pubsub"
    "fmt"
    "time"
)

func main() {
    client := pubsub.NewClient()
    var printMessage func(pubsub.DataType) error
    printMessage = func(msg pubsub.DataType) error {
        if msg == "error" {
            return errors.New("this is an error")
        }
        fmt.Println(msg)
        return nil
    }
    client.Sub(printMessage)
    client.Pub("Hello")
    client.Pub("error")
    time.Sleep(time.Second)
}

使用模組內的初始化函式(因為 Golang 不是物件導向語言,沒有建構式),來取得要使用的 client

client := pubsub.NewClient()

建立 handler 做為訂閱時的 callback function,當訂閱的訊息出現時,調用這個 handler 來處理。這裡建立的 handler 會判斷訊息是不是 error 這個字串,如果是的話,回傳錯誤訊息,否則正常印出訊息

var printMessage func(pubsub.DataType) error
printMessage = func(msg pubsub.DataType) error {
    if msg == "error" {
        return errors.New("this is an error")
    }
    fmt.Println(msg)
    return nil
}

最後訂閱與發佈訊息

用 Sub 註冊訂閱用的 handler;用 Pub 發佈訊息。發佈的訊息有兩則,第一則是正常訊息,內容是 Hello;第二則是 error 訊息,如果正常運行的話,會讓 handler 回傳錯誤

client.Sub(printMessage)
client.Pub("Hello")
client.Pub("error")
time.Sleep(time.Second)

實際執行

D:\git\golang-project>.\bin\pubsub.exe
Hello
this is an error

得到訊息內容跟 error!

Multiple Topic

在前面的設計中,已經可以進行發佈跟訂閱了,但如果想要訂閱多個主題,就需要建立多個 client,用起來很麻煩,這衍生出新的需求:我們需要擴充原來的介面,使它可以支持多主題訂閱

回去修改模組為

package pubsub

import (
    "fmt"
    "sync"
)

// DataType is data type of message
type DataType string

// MessageChannel is a channel of pub/sub pattern
type MessageChannel struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

// Client is client of pub/sub pattern
type Client struct {
    topic map[string]*MessageChannel
}

// Pub publish message
func (m *Client) Pub(topic string, data DataType) {
    m.topic[topic].writer <- data
}

// Sub subscribe message
func (m *Client) Sub(topic string, handler func(DataType) error) {
    m.topic[topic].mutex.Lock()
    defer m.topic[topic].mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.topic[topic].readers = append(m.topic[topic].readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

// AddTopic publish message
func (m *Client) AddTopic(topic string) {
    m.topic[topic] = &MessageChannel{
        writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-m.topic[topic].writer
            for _, reader := range m.topic[topic].readers {
                reader <- data
            }
        }
    }()
}

// NewClient new a client
func NewClient() *Client {
    client := &Client{
        topic: make(map[string]*MessageChannel),
    }
    return client
}

在原來的架構上,再抽象一層,重新命名原來的 Client 為 MessageChannel,負責各 Topic 的實際執行。上面則建立新的 Client,內部是一個 map,可以放置多個 MessageChannel,好實現多主題的訂閱

// MessageChannel is a channel of pub/sub pattern
type MessageChannel struct {
    writer  chan DataType
    readers []chan DataType
    mutex   sync.Mutex
}

// Client is client of pub/sub pattern
type Client struct {
    topic map[string]*MessageChannel
}

當 Pub/Sub 時,會取出對應 Topic 的 MessageChannel,進行 Pub/Sub

// Pub publish message
func (m *Client) Pub(topic string, data DataType) {
    m.topic[topic].writer <- data
}

// Sub subscribe message
func (m *Client) Sub(topic string, handler func(DataType) error) {
    m.topic[topic].mutex.Lock()
    defer m.topic[topic].mutex.Unlock()
    readChannel := make(chan DataType, 10)
    m.topic[topic].readers = append(m.topic[topic].readers, readChannel)
    go func() {
        for {
            data := <-readChannel
            if err := handler(data); err != nil {
                fmt.Println(err)
            }
        }
    }()
}

新增 AddTopic,當使用者要使用新的 Topic 時,會在內部建立轉發機制

// AddTopic publish message
func (m *Client) AddTopic(topic string) {
    m.topic[topic] = &MessageChannel{
        writer: make(chan DataType, 10),
    }
    go func() {
        for {
            data := <-m.topic[topic].writer
            for _, reader := range m.topic[topic].readers {
                reader <- data
            }
        }
    }()
}

同時修改使用的程式碼 main.go,改為

func main() {
    client := pubsub.NewClient()
    client.AddTopic("hello")
    client.AddTopic("echo")
    var printMessage = func(msg pubsub.DataType) error {
        if msg == "error" {
            return errors.New("This is an error")
        }
        fmt.Println(msg)
        return nil
    }
    var echoMessage = func(msg pubsub.DataType) error {
        fmt.Println(msg + " nice to meet you!")
        return nil
    }
    client.Sub("hello", printMessage)
    client.Sub("echo", echoMessage)
    client.Pub("hello", "Hello")
    client.Pub("hello", "error")
    client.Pub("echo", "Go")
    time.Sleep(time.Second)
}

在這個例子中,對 hello、echo 兩個 topic 進行訂閱

client.AddTopic("hello")
client.AddTopic("echo")

兩個訂閱有不同的 handler。echo 除了印出訊息外,也副加其他內容

var echoMessage = func(msg pubsub.DataType) error {
    fmt.Println(msg + " nice to meet you!")
    return nil
}

對兩個 Topic 進行發佈/訂閱

client.Sub("hello", printMessage)
client.Sub("echo", echoMessage)
client.Pub("hello", "Hello")
client.Pub("hello", "error")
client.Pub("echo", "Go")

確認成果

Go nice to meet you!
Hello
This is an error

小結

用 Golang 完成 Pub/Sub 模型後,可以回去跟標準的 Observer Pattern 比較。兩者的概念是類似的,中間都有一段程式碼負責轉發訊息,在 Observer Pattern 是 Notify,在我們的設計中,是 AddTopic 的 Goroutine。

Observer Pattern 在設計上,是由發佈者調用 Subject 的函式 SetState 來發佈訊息,再由各訂閱者自行調用 Subject 的函式來獲得訂閱的訊息,Subject 在中間只充當通知的角色。整個動作是由發佈者發起,並在發佈者的 thread 中執行,如果想要併發的效果,則需要另外設計。

Golang 由於有 Goroutine 跟 channel,在設計上可以更精巧,發佈者與訂閱者都有各自獨立的 Goroutine 在處理,效率高得多;而且因為有 channel 的存在,使得 Goroutine 推送或接收訊息時,無須理會其他模組的執行狀況,即使有 Fail 的情形,也不會影響到其他模組。簡單講,因為併發實現的成本降低了,可以更容易享受到併發的優勢。

在設計處理上,Golang 似乎認為 OOP 過早強調設計,導致編寫程式碼時,都需要先定義類別,這樣會讓程式碼因為類別而硬化,重構時需要反覆對類別進行 Push Up 跟 Push Down,有看過 Martin Fowler 的《Refactoring》應該會很有感,整本書的精神就在講如何重新設計類別。

Golang 的概念更偏向使用時才設計,例如處理函式不用放在事先定義的 Update 中,而是訂閱時才有的 echoMessage,某程度上,Golang 不強調模型,而是強調使用。

Reference

Read more

OAuth 2.0 的身份認證:OpenID Connect

OAuth 2.0 的身份認證:OpenID Connect

OAuth 2 讓網路服務可以存取第三方的受保護資源,因此,有些開發者會進一步利用 OAuth 2 來進行使用者認證。但這中間存在著一些語義落差,因為 OAuth 2 當初設計目的是「授權」而不是「認證」,兩者關注的焦點會有些不同。OpenID Connect 是基於 OAuth 2 的一套身份認證協定,讓開發者可以在 OAuth 2 授權的基礎上,再加入標準的認證流程。在這篇文章中,我會說明授權跟認證的場景有何差異,並講解 OpenID Connect 如何滿足認證需求。 因為 OpenID Connect 是建構在 OAuth 2 的基礎上,我會假設這篇文章的讀者已經知道 OAuth 2 的組件與流程,如果你不熟悉,可以先閱讀另外兩篇文章 * OAuth 2.0:

By Ken Chen
更好的選擇?用 JWT 取代 Session 的風險

更好的選擇?用 JWT 取代 Session 的風險

因為 HTTP 是無狀態協定,為了保持使用者狀態,需要後端實作 Session 管理機制。在早期方式中,使用者狀態會跟 HTTP 的 Cookie 綁定,等到有需要的時候,例如驗證身份,就能使用 Cookie 內的資訊搭配後端 Session 來進行。但自從 JWT 出現後,使用者資訊可以編碼在 JWT 內,也開始有人用它來管理使用者身份。前些日子跟公司的資安團隊討論,發現 JWT 用來管理身份認證會有些風險。在這篇文章中,我會比較原本的 Session 管理跟 JWT 的差異,並說明可能的風險所在。 Session 管理 Session 是什麼意思?為什麼需要管理?我們可以從 HTTP 無狀態的特性聊起。所謂的無狀態,翻譯成白話,就是後面請求不會受前面請求的影響。想像現在有個朋友跟你借錢,

By Ken Chen

Goroutine 的併發治理:掌握生命週期

從併發的角度來看,Goroutine 跟 Thread 的概念很類似,都是將任務交給一個執行單元來處理。然而不同的是,Goroutine 將調度放在用戶態,因此更加輕量,也能避免多餘的 Context Switch。我們可以說,Go 的併發處理是由語言原生支援,有著更好的開發者體驗,但也因此更容易忘記底層仍存在著輕量成本,當這些成本積沙成塔,就會造成 Out of Memory。這篇文章會從 Goroutine 的生命週期切入,試著說明在併發的情境中,應該如何保持 Goroutine 的正常運作。 因為這篇講的內容會比較底層,如果對應用情境不熟的人,建議先看過同系列 * Goroutine 的併發治理:由錯誤處理談起 * Goroutine 的併發治理:值是怎麼傳遞? * Goroutine 的併發治理:管理 Worker Pool 再回來看這篇,應該會更容易理解。 Goroutine 的資源使用量 讓我們看個最簡單的例子,假設現在同時開

By Ken Chen

Goroutine 的併發治理:管理 Worker Pool

併發會需要多個 Goroutine 來同時執行任務,Goroutine 雖然輕量,也還是有配置成本,如果每次新的任務進來,都需要重新建立並配置 Goroutine,一方面不容易管理 Goroutine 的記憶體,一方面也會消耗 CPU 的運算效能。這時 Worker Pool 就登場了,我們可以在執行前,先將 Goroutine 配置好放到資源池中,要用時再調用閒置資源來處理,藉此資源回收重複利用。這篇文章會從 0 開始建立 Work Pool,試著丟進不同的場景需求,看看如何實現。 基本的 Worker Pool Worker Pool 的概念可以用這張圖來解釋 Job 會放在 Queue 中送給 Pool 內配置好的 Worker,Worker 處理完後再將結果送到另一個 Queue 內。因為這是很常見的併發模式,

By Ken Chen