Goroutine 的併發治理:由錯誤處理談起

當需要同時執行多個任務時,Go 開發者會多開 Goroutine 來分擔任務,這稱為併發。併發聽起來似乎很理想,能其他任務等待時,照樣執行需要運算的任務,有效利用 CPU 資源,但如果要用在生產環境,它也需要完善的管理機制。想想看,Goroutine 在哪個情況下會被啟動?哪個情況下會結束?如果任務需要回傳結果,它應該要怎麼回傳?而如果執行中發生錯誤,又應該怎麼處理?

我們可以稱呼這類主題為「併發治理」,需要開發者理解執行期的運作,而如何處理好 Goroutine 的開始與結束,讓錯誤能被意識到,可說是併發治理的第一關。

基本併發

來看個基本的併發操作。我們起 100 個 Goroutine,讓它們處理任務。如果執行時發生 error,就呼叫 HandleError 處理錯誤。

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			err := DoTask(i)
			if err != nil {
				HandleError(err)
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
}

func DoTask(i int) error {
	err := fmt.Errorf("%d: some err", i)
	return err
}

func HandleError(err error) {
	fmt.Println(err)
}

這裡用 Go 標準包的 wait group 來管理 Goroutine,啟動 Goroutine 前,先用 wg.Add 將計數器加 1,Goroutine 執行完後,再用 wg.Done 將計數器減 1。等所有計數器歸零,代表 Goroutine 全部執行完成。wait group 的功用是同步化,確保主程式結束前,所有的 Goroutine 都執行完畢。

在這個模型中,Goroutine 的錯誤是在 Goroutine 中被處理,這讓 Goroutine 承擔額外的任務,例如它可能會需要依賴 Logger 才能處理錯誤,這也降低 Goroutine 的可測試性。如果我們希望分離彼此的責任,集中管理錯誤的話,就得想個方式,把錯誤傳出來。

共享記憶體來通訊

第一種傳遞錯誤的方式稱為 Shared memory,可以想像成把 Goroutine 中發生的錯誤記錄在某個儲存空間,等待 Goroutine 執行完後再來處理,程式碼類似

var lock sync.Mutex
var errs []error

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			err := DoTask(i)
			if err != nil {
				lock.Lock()
				errs = append(errs, err)
				lock.Unlock()
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
	for _, err := range errs {
		HandleError(err)
	}
}

這段程式用 slice 來存放錯誤,因為 slice 沒保證併發安全,使用時要記得用 sync.Mutex 鎖起來再操作。

這個模型有什麼問題呢?因為引入互斥鎖,Goroutine 執行期間有了同步化機制,讓不同的 Goroutine 可能會互相等待;再來,當應用變得複雜的時候,可能會存在好幾個鎖,一不小心就會造成 Dead Lock;另外,使用 Shared Memory 意味著所有 Goroutine 都能 Access 共享區塊,如果有哪個 Goroutine 沒有遵守規範,修改了共享區塊內的值,就會影響到其他的 Goroutine。開發者原本從錯誤處理釋放出來的專注力,變成要轉投入到併發處理,從結果來講,對生產力幫助有限。

errGroup

既然目標是處理錯誤,我們可以建立一些前提,針對這個情境特化,讓併發治理跟業務邏輯分離開來。具體來講,希望對 wait group 與 Goroutine 的使用進行封裝。這就來到 golang.org/x/sync/errgroup 這個 package 了,先來上 code

import (
	"fmt"
	"time"

	"golang.org/x/sync/errgroup"
)

func main() {
	var eg errgroup.Group
	for i := 0; i < 100; i++ {
		eg.Go(func() error {
			return DoTask()
		})
	}
	if err := eg.Wait(); err != nil {
		HandleError(err)
	}
}

eg.Go 會啟動一個 Goroutine,而 eg.Wait 會等待所有的 Goroutine 都執行完畢,如果在執行過程中有發生錯誤,eg.Wait 會將錯誤回傳給處理函式。

從名稱看,eg 封裝了 wait group 的邏輯,可以讓操作變得更簡單,它的內部實現跟原本 wait group 的操作類似

func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

只是用到 sync.Once 來鎖定 critical section。

errgroup 幫助開發者分離併發治理與業務邏輯,也降低無意中引發 Dead Lock 的可能性。

用通訊來共享資訊

還有沒有其他的可能呢?不妨換另一個角度來看待錯誤的傳遞。在 Go 中,錯誤是一種值,如果把 Goroutine 看成是處理值的處理程序,那只要能定義出程序的 input/output,就能將值傳遞出去。可能有人會想,這跟 function 不是差不多的意思嗎?是的,但關鍵在於,Goroutine 間不是順序式的關係,而是程序式的併發關係,在訊息經過 Goroutine 內部循序處理後,它會透過交談的方式,傳遞給另一個 Goroutine,這套模型又因此被稱為交談循序程式(CSP)。依照 CSP 的語法結構,可以修改程式為

func main() {
	var wg sync.WaitGroup
	errCh := make(chan error)
	routineEndCh := make(chan struct{})
	logEndCh := make(chan struct{})
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			err := DoTask(i)
			if err != nil {
				errCh <- err
			}
			wg.Done()
		}(i)
	}
	go func() {
		for {
			select {
			case err := <-errCh:
				HandleError(err)
			case <-routineEndCh:
				close(logEndCh)
				return
			}
		}
	}()
	go func() {
		wg.Wait()
		close(routineEndCh)
	}()
	<-logEndCh
}

在這段程式中,Goroutine 產生的錯誤被送進 channel,而錯誤處理的函式則放在另一個 Goroutine,假設稱為 G2,G2 在 channel 的一端接收錯誤,收到後立刻進行錯誤處理。此外,我們需要明訂 G2 的結束時間,因此開了再一個 Goroutine G3 來協調,當 wait group 的任務都結束後,G3 會關閉 routineEndCh,讓 G2 的 case 2 可以執行並關閉,G2 關閉前同樣關閉 logEndCh,讓主程式順利結束。

儘管用到一些看起來很潮的字,在採用模型前,我們還是得先自問,這個做法真的有比較好嗎?程式碼長度由 20L 變成 30L,還有許多 channel 的同步處理問題,它對生產力真的有幫助嗎?

唔,這是個好問題,CSP 的設計類似數學,從設計層面上切開彼此的相依性(在數學中,沒有狀態這回事)。在程式碼中,我們可以察覺到,原本的 Goroutine 跟 G2 間變成像是生產者跟消費者的關係,當訊息一生產出來,G2 會立刻消費它,讓程式變成像是生產線一樣,訊息處理完後,會被送到下一站繼續處理。而在共享記憶體的例子中,訊息是先搜集起來放在記憶體中,等待 Goroutine 完成後再批次處理。使用 channel 串接的方式,儘管不見得有更好的總處理時間(total time),但理論上,避免了批次性的等待,它應該會具備更好的平均處理時間(average time)。

技術本質上,channel 也是使用加鎖後複製值來實現,但它具備更高級的應用語義,我們可以把 channel 看成是對底層技術的封裝,因為這層封裝,開發者可以區別出生產者與消費者,也保證了消息的唯一性,從而在設計上防止 race condition 的發生。

小結

在討論 Go 的併發時,質數篩是個很經典的例子,用共享記憶體的方式,質數篩會是

func main() {
	n := 20
	primes := make([]bool, n)
	for i := 2; i*i < n; i++ {
		if !primes[i] {
			for j := i; j*i < n; j++ {
				primes[i*j] = true
			}
		}
	}
	for i := 2; i < n; i++ {
		if !primes[i] {
			fmt.Println(i)
		}
	}
}

但如果用 CSP 方式,則會變成

func main() {
	c := make(chan int)
	go counter(c)

	for i := 0; i < 20; i++ {
		p := <- c
		fmt.Println(p)
		primes := make(chan int)
		go filter(p, c, primes)
		c = primes
	}
}

很明顯,兩個模型一對照,CSP 的可讀性更低,因為人類對訊息的理解是歷時性,而不是共時性的。我們可以輕易回想起某場棒球賽的再見全壘打,卻不容易記得某個賽季的平均打擊率。既然如此,為什麼我們會需要用反人類的方式來設計?因為當程式像數學一樣運作,它會變得無狀態、鬆耦合、更適合機器執行。至於可讀性方面,errgroup 給了一個靈感,我們可以將 channel 的操作封裝起來,透過框架來解決併發問題。

Reference

Read more

Weekly Issue 第 27 期:Nvidia 收購 Groq

從 Windsurf 的案件後,我一直在想,新創成立時,支持創辦人的早期員工與投資人,是不是能得到合理的報酬?Groq 收購案給出很多細節,告訴我們即使是 Acquhire,也還是能盡可能公平。 改變世界很重要,但使用的方式也很重要。 🗞️ 熱門新聞 Nvidia deal a big win for Groq employees and investors Groq 收購案的消息出來後,我關心的事情是:這跟 Windsurf 的案子有什麼不同?早期投資人跟團隊有得到回報嗎? 先講結論:如果消息正確,該得到的報酬都有得到,皆大歡喜。但我覺得 Acquhire 存在太多灰色地帶,應該要納入監管,不是每家收購發起者跟創辦人都有同樣的素質。 按照 Axios 的說法,輝達付出的 200 億將被視為估值分配給股票持有人,90% 員工會加入輝達,到期股票會變現,

By Ken Chen

Weekly Issue 第 26 期:AI 批評指南

最近在讀《高效槓桿力》,書中提出一套變革管理框架:「尋找關鍵支點,重新配置資源。」當然,書裡給出很多案例,說明如何找到支點,只是我同時在想,如何將他們帶到我面對的情境呢? ✨ 科技觀點 Pluralistic: The Reverse-Centaur’s Guide to Criticizing AI 看到有人非常認真討論事情,即使是批評 AI,都會讓我有興趣。 附上一些我的觀點: 1) 成長型公司聽起來很美好,每個人都會想待在那,但當它變成前提時就是另一回事了。很多決策都會以成長為基礎,最後就是投資人跟企業都沒辦法接受不成長的代價。 2) 常常在爭論 AI 是否會取代工作,看的是 AI 的兩個面向,賦能與自動化,哪個會更符合當前情境。贊同賦能的人會認為 AI 帶來生產力的解放,並創造價值,可是實際上呢? 3) 很多人提過 AI 的解壓縮 / 壓縮特性,特別是在履歷或信件應用。

By Ken Chen

Weekly Issue 第 25 期:Slack 基礎設施爭議

因為地緣政治議題,我們會關心資料存放的地點是否足夠安全,即使當使用者被盯上,他仍然可以放心資料足夠隱密。這也是為什麼當網路上傳出 Slack 台灣的資料轉移到阿里雲時,會引起爭議的原因。 Slack 已經出面澄清並無此事,這也讓我們反思,當軟體業面臨這類公關危機時,應該要揭露到什麼程度。 🗞️ 熱門新聞 Slack 在臺服務將移轉至中國? Salesforce:臺灣用戶使用全球基礎設施,與阿里巴巴無關 前幾天 Salesforce 傳出要將 Slack 台灣資料轉移到阿里雲,立刻引起一陣討論,有 Salesforce 的人出來澄清,說沒有這回事。 「台灣市場一直以來都是採用 Global Infrastructure 全球基礎設施。簡單說,台灣用戶的資料是儲存在美洲或亞太區(如日本),跟中國的阿里雲在物理和邏輯上都是完全切開的。 」 讓我有興趣的是,Salesforce 沒有說他們是用哪個雲平台。我們以前有次遇到類似情況,也討論到是否揭露使用平台。當時我持反對意見,認為只需要揭露「使用全球基礎設施」已經夠了,頂多說非中國廠商的服務就好,不需要也不應該說明具體是哪個。

By Ken Chen

Weekly Issue 第 24 期:網路的精神高地

前陣子去了雪梨一趟,跟布里斯本或台北都形成有趣的對比,旅行中也不斷在想,一座城市如何發展出自己的文化?這有點像是網路平台如何形成聚落,而又如何消亡。 很喜歡本期談知乎的一篇文章,理想主義的光輝是最吸引人的,我常在想,有沒有辦法將那座「看不見的城市」帶到真實世界中。 🗞️ 熱門新聞 A ChatGPT prompt equals about 5.1 seconds of Netflix 看到 Simon Willison 提到,如果 Sam Altman 的資訊是對的,每個 LLM 提問相當於 5.1s 的 Netflix 影片耗能。 計算的需求讓輝達跟台積電挖到金礦,那電力需求又會讓誰挖到金礦呢? ✨ 科技觀點 我们失去的不只是知乎,而是中文互联网的精神高地 「那时的知乎,更像“思想沙龙”,而非“内容平台”。」 昨天跟朋友聊天,

By Ken Chen