Goroutine 的併發治理:由錯誤處理談起

當需要同時執行多個任務時,Go 開發者會多開 Goroutine 來分擔任務,這稱為併發。併發聽起來似乎很理想,能其他任務等待時,照樣執行需要運算的任務,有效利用 CPU 資源,但如果要用在生產環境,它也需要完善的管理機制。想想看,Goroutine 在哪個情況下會被啟動?哪個情況下會結束?如果任務需要回傳結果,它應該要怎麼回傳?而如果執行中發生錯誤,又應該怎麼處理?

我們可以稱呼這類主題為「併發治理」,需要開發者理解執行期的運作,而如何處理好 Goroutine 的開始與結束,讓錯誤能被意識到,可說是併發治理的第一關。

基本併發

來看個基本的併發操作。我們起 100 個 Goroutine,讓它們處理任務。如果執行時發生 error,就呼叫 HandleError 處理錯誤。

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			err := DoTask(i)
			if err != nil {
				HandleError(err)
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
}

func DoTask(i int) error {
	err := fmt.Errorf("%d: some err", i)
	return err
}

func HandleError(err error) {
	fmt.Println(err)
}

這裡用 Go 標準包的 wait group 來管理 Goroutine,啟動 Goroutine 前,先用 wg.Add 將計數器加 1,Goroutine 執行完後,再用 wg.Done 將計數器減 1。等所有計數器歸零,代表 Goroutine 全部執行完成。wait group 的功用是同步化,確保主程式結束前,所有的 Goroutine 都執行完畢。

在這個模型中,Goroutine 的錯誤是在 Goroutine 中被處理,這讓 Goroutine 承擔額外的任務,例如它可能會需要依賴 Logger 才能處理錯誤,這也降低 Goroutine 的可測試性。如果我們希望分離彼此的責任,集中管理錯誤的話,就得想個方式,把錯誤傳出來。

共享記憶體來通訊

第一種傳遞錯誤的方式稱為 Shared memory,可以想像成把 Goroutine 中發生的錯誤記錄在某個儲存空間,等待 Goroutine 執行完後再來處理,程式碼類似

var lock sync.Mutex
var errs []error

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			err := DoTask(i)
			if err != nil {
				lock.Lock()
				errs = append(errs, err)
				lock.Unlock()
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
	for _, err := range errs {
		HandleError(err)
	}
}

這段程式用 slice 來存放錯誤,因為 slice 沒保證併發安全,使用時要記得用 sync.Mutex 鎖起來再操作。

這個模型有什麼問題呢?因為引入互斥鎖,Goroutine 執行期間有了同步化機制,讓不同的 Goroutine 可能會互相等待;再來,當應用變得複雜的時候,可能會存在好幾個鎖,一不小心就會造成 Dead Lock;另外,使用 Shared Memory 意味著所有 Goroutine 都能 Access 共享區塊,如果有哪個 Goroutine 沒有遵守規範,修改了共享區塊內的值,就會影響到其他的 Goroutine。開發者原本從錯誤處理釋放出來的專注力,變成要轉投入到併發處理,從結果來講,對生產力幫助有限。

errGroup

既然目標是處理錯誤,我們可以建立一些前提,針對這個情境特化,讓併發治理跟業務邏輯分離開來。具體來講,希望對 wait group 與 Goroutine 的使用進行封裝。這就來到 golang.org/x/sync/errgroup 這個 package 了,先來上 code

import (
	"fmt"
	"time"

	"golang.org/x/sync/errgroup"
)

func main() {
	var eg errgroup.Group
	for i := 0; i < 100; i++ {
		eg.Go(func() error {
			return DoTask()
		})
	}
	if err := eg.Wait(); err != nil {
		HandleError(err)
	}
}

eg.Go 會啟動一個 Goroutine,而 eg.Wait 會等待所有的 Goroutine 都執行完畢,如果在執行過程中有發生錯誤,eg.Wait 會將錯誤回傳給處理函式。

從名稱看,eg 封裝了 wait group 的邏輯,可以讓操作變得更簡單,它的內部實現跟原本 wait group 的操作類似

func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

只是用到 sync.Once 來鎖定 critical section。

errgroup 幫助開發者分離併發治理與業務邏輯,也降低無意中引發 Dead Lock 的可能性。

用通訊來共享資訊

還有沒有其他的可能呢?不妨換另一個角度來看待錯誤的傳遞。在 Go 中,錯誤是一種值,如果把 Goroutine 看成是處理值的處理程序,那只要能定義出程序的 input/output,就能將值傳遞出去。可能有人會想,這跟 function 不是差不多的意思嗎?是的,但關鍵在於,Goroutine 間不是順序式的關係,而是程序式的併發關係,在訊息經過 Goroutine 內部循序處理後,它會透過交談的方式,傳遞給另一個 Goroutine,這套模型又因此被稱為交談循序程式(CSP)。依照 CSP 的語法結構,可以修改程式為

func main() {
	var wg sync.WaitGroup
	errCh := make(chan error)
	routineEndCh := make(chan struct{})
	logEndCh := make(chan struct{})
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			err := DoTask(i)
			if err != nil {
				errCh <- err
			}
			wg.Done()
		}(i)
	}
	go func() {
		for {
			select {
			case err := <-errCh:
				HandleError(err)
			case <-routineEndCh:
				close(logEndCh)
				return
			}
		}
	}()
	go func() {
		wg.Wait()
		close(routineEndCh)
	}()
	<-logEndCh
}

在這段程式中,Goroutine 產生的錯誤被送進 channel,而錯誤處理的函式則放在另一個 Goroutine,假設稱為 G2,G2 在 channel 的一端接收錯誤,收到後立刻進行錯誤處理。此外,我們需要明訂 G2 的結束時間,因此開了再一個 Goroutine G3 來協調,當 wait group 的任務都結束後,G3 會關閉 routineEndCh,讓 G2 的 case 2 可以執行並關閉,G2 關閉前同樣關閉 logEndCh,讓主程式順利結束。

儘管用到一些看起來很潮的字,在採用模型前,我們還是得先自問,這個做法真的有比較好嗎?程式碼長度由 20L 變成 30L,還有許多 channel 的同步處理問題,它對生產力真的有幫助嗎?

唔,這是個好問題,CSP 的設計類似數學,從設計層面上切開彼此的相依性(在數學中,沒有狀態這回事)。在程式碼中,我們可以察覺到,原本的 Goroutine 跟 G2 間變成像是生產者跟消費者的關係,當訊息一生產出來,G2 會立刻消費它,讓程式變成像是生產線一樣,訊息處理完後,會被送到下一站繼續處理。而在共享記憶體的例子中,訊息是先搜集起來放在記憶體中,等待 Goroutine 完成後再批次處理。使用 channel 串接的方式,儘管不見得有更好的總處理時間(total time),但理論上,避免了批次性的等待,它應該會具備更好的平均處理時間(average time)。

技術本質上,channel 也是使用加鎖後複製值來實現,但它具備更高級的應用語義,我們可以把 channel 看成是對底層技術的封裝,因為這層封裝,開發者可以區別出生產者與消費者,也保證了消息的唯一性,從而在設計上防止 race condition 的發生。

小結

在討論 Go 的併發時,質數篩是個很經典的例子,用共享記憶體的方式,質數篩會是

func main() {
	n := 20
	primes := make([]bool, n)
	for i := 2; i*i < n; i++ {
		if !primes[i] {
			for j := i; j*i < n; j++ {
				primes[i*j] = true
			}
		}
	}
	for i := 2; i < n; i++ {
		if !primes[i] {
			fmt.Println(i)
		}
	}
}

但如果用 CSP 方式,則會變成

func main() {
	c := make(chan int)
	go counter(c)

	for i := 0; i < 20; i++ {
		p := <- c
		fmt.Println(p)
		primes := make(chan int)
		go filter(p, c, primes)
		c = primes
	}
}

很明顯,兩個模型一對照,CSP 的可讀性更低,因為人類對訊息的理解是歷時性,而不是共時性的。我們可以輕易回想起某場棒球賽的再見全壘打,卻不容易記得某個賽季的平均打擊率。既然如此,為什麼我們會需要用反人類的方式來設計?因為當程式像數學一樣運作,它會變得無狀態、鬆耦合、更適合機器執行。至於可讀性方面,errgroup 給了一個靈感,我們可以將 channel 的操作封裝起來,透過框架來解決併發問題。

Reference

Read more

Weekly Issue 第 6 期:Duolingo CEO 看 AI 與遊戲化

現在是 AI 時代,大家都在想怎麼讓自己的產品跟 AI 掛勾,但具體要怎麼做呢?背後的思考有哪些?Duolingo 給出他們自己的觀點。 例如,現在的產品是否只是 AI 套皮,你接收使用者的問題,套上自己的提詞後,拿去給 OpenAI,要它回答你?在現在百家爭鳴的情況下,選擇哪個模型會有差嗎?AI 能帶來新用戶與新營收嗎?等等。 另外本週也選了一篇少數派的文章,談 AI 對 RSS 的影響,對 RSS 未來方向有興趣的人不妨看看。 🗞️ 熱門新聞 Duolingo CEO Luis von Ahn wants you addicted to learning Duolingo CEO 專訪,相當紮實,推薦閱讀。 「對我們來說,

By Ken Chen

Weekly Issue 第 5 期:OpenAI 的企業文化

我一直都喜歡看科技公司的願景與文化,原因是,我想知道別人是如何看待自己的使命,又是用什麼方式打造它。願景通常在官網都會有,但想要知道文化,只能聽內部人講講了。 Palantir 前陣子因為它不同於矽谷的文化,而引起很多討論。受此影響,前 OpenAI 的員工在離職創辦公司後,也發文談論他所見到的 OpenAI。最讓我震撼的是,他們幾乎沒有資金困擾,想的都是如何打造出色的 AI 模型。 🗞️ 熱門新聞 Reflections on OpenAI 前員工談 OpenAI 的內部文化。 讀起來最大的感觸是,有些價值觀、觀點、實踐,只有在世界級的公司跟資源下,才有可能建立起來。讓每個團隊各自為政,看誰能端出最好的成果,這對新創(特別是沒拿創投)實在太奢侈了。 我相信這種經歷會變成是「可以帶著走的饗宴」,那種衝擊也是最寶貴的。 AI Open Source Productivity METR 前陣子發了一篇研究,說使用 AI

By Ken Chen

Weekly Issue 第 4 期:Canonical 的面試經驗

這星期看了比較多職涯相關的內容,最讓我驚訝的是 Canonical 的面試流程,當我分享這則新聞後,有更多朋友紛紛補充他們的面試經驗:需要經歷三個 Tier,每個 Tier 都有三關,而內容甚至還包括問人選「高中成績」與「大學生活」。 我很難想像一家做 Linux 發行版的公司,會如此草率對待人選,這讓我對他們家的產品有了很大的問號。 🗞️ 熱門新聞 My experience with Canonical's interview process 這是一篇 Canonical 的面試經歷(如果你不知道什麼是 Canonical,就是開發 Ubuntu 的公司)。 整個過程讓人非常驚訝,甚至還需要人選回答「高中成績」,而在面試中做筆記居然是扣分項。我看完後有股移除 Ubuntu 的衝動。真的太扯啦。 What happens when engineers work

By Ken Chen

Weekly Issue 第 3 期:Cloudflare 宣布內容獨立日

最近用了很多 Cloudflare 的產品,像是 Zero Trust、WARP,還有 Cloudflare Tunnel。每次的體驗都讓我嘖嘖稱奇,好像它們預判了我的需求一樣。這家公司始終追求著「更好的網路」這個目標,內容付費又是另一個例子。 🗞️ 熱門新聞 Content Independence Day: no AI crawl without compensation! 賽博佛陀 Cloudflare 又來普渡眾生了。這次是針對 AI 爬蟲收費。 「網路正在改變。它的商業模式也將改變。在這個過程中,我們有機會從過去 30 年網路的優點中學習,並為未來的網路創造更好的環境。 」 Cloudflare 真的很有意思,連思考的角度都很有趣。 內容當然是有價的,只是價格會怎麼支付呢?在現代的內容創作,這題變得非常複雜。 Folklore.org: Joining Apple Computer

By Ken Chen