閱讀495 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《GO並發編程實戰》—— 條件變量

我們在第6章講多線程編程的時候詳細說明過條件變量的概念、原理和適用場景。因此,我們在本小節僅對sync代碼包中與條件變量相關的API進行簡單的介紹,並使用它們來改造我們之前實現的*myDataFile類型的相關方法。

在Go語言中,sync.Cond類型代表了條件變量。與互斥鎖和讀寫鎖不同,簡單的聲明無法創建出一個可用的條件變量。為了得到這樣一個條件變量,我們需要用到sync.NewCond函數。該函數的聲明如下:

1 func NewCond(l Locker) *Cond

我們在第6章中說過,條件變量總是要與互斥量組合使用。因此,sync.NewCond函數的唯一參數是sync.Locker類型的,而具體的參數值既可以是一個互斥鎖也可以是一個讀寫鎖。sync.NewCond函數在被調用之後會返回一個*sync.Cond類型的結果值。我們可以調用該值擁有的幾個方法來操縱對應的條件變量。

 

類型*sync.Cond的方法集合中有三個方法,即:Wait方法、Signal方法和Broadcast方法。它們分別代表了等待通知、單發通知和廣播通知的操作。

方法Wait會自動的對與該條件變量關聯的那個鎖進行解鎖,並且使調用方所在的Goroutine被阻塞。一旦該方法收到通知,就會試圖再次鎖定該鎖。如果鎖定成功,它就會喚醒那個被它阻塞的Goroutine。否則,該方法會等待下一個通知,那個Goroutine也會繼續被阻塞。而方法Signal和Broadcast的作用都是發送通知以喚醒正在為此而被阻塞的Goroutine。不同的是,前者的目標隻有一個,而後者的目標則是所有。

我們在第6章的“線程的同步”小節中詳細的描述過這些操作的行為和意義。讀者可以在需要時回顧其中的內容。

在上一小節,我們在*myDataFile類型的Read方法和Write方法的實現中使用到了讀寫鎖fmutex。在Read方法中,我們對一種邊界情況進行了特殊處理,即:如果*os.File類型的f字段的ReadAt方法在被調用後返回了一個非nil且等於io.EOF的錯誤值,那麼Read方法就忽略這個錯誤並再次嚐試讀取相同位置的數據塊,直到讀取成功為止。從這個特殊處理的具體流程上來看,似乎使用條件變量來作為輔助手段會帶來一些好處。下麵我們就來動手試驗一下。

我們先在結構體類型myDataFile增加一個類型為*sync.Cond的字段rcond。為了快速實現想法,我們暫時不考慮怎樣初始化這個字段,而直接去改造Read方法和Write方法。

在Read方法中,我們使用一個for循環來達到重新嚐試獲取數據塊的目的。為此,我們添加了若幹條重複的語句、降低了程序的性能,還造成了一個潛在的問題——在某個情況下讀寫鎖fmutex不會被讀解鎖。為了解決這一係列新生的問題,我們使用代表條件變量的字段rcond。Read方法的第三個版本如下:

01 func (df *myDataFile) Read() (rsn int64, d Data, err error) {
02  
03 // 讀取並更新讀偏移量
04  
05 // 省略若幹條語句
06  
07  
08  
09 //讀取一個數據塊
10  
11 rsn = offset / int64(df.dataLen)
12  
13 bytes := make([]byte, df.dataLen)
14  
15 df.fmutex.RLock()
16  
17 defer df.fmutex.RUnlock()
18  
19 for {
20  
21 _, err = df.f.ReadAt(bytes, offset)
22  
23 if err != nil {
24  
25 if err == io.EOF {
26  
27 df.rcond.Wait()
28  
29 continue
30  
31 }
32  
33 return
34  
35 }
36  
37 d = bytes
38  
39 return
40  
41 }
42  
43 }

在這裏,我們假設條件變量rcond與讀寫鎖fmutex中的“讀鎖”相關聯。可以看到,我們讓defer df.fmutex.RUnlock()語句回歸了,並刪除了所有return語句和continue語句前麵的針對fmutex的讀解鎖操作。這都得益於新增在continue語句前麵的df.rcond.Wait()。添加這條語句的意義在於:當發現由文件內容讀取造成的EOF錯誤時,要讓當前Goroutine暫時放棄fmutex的“讀鎖”並等待通知的到來。放棄fmutex的“讀鎖”也就意味著Write方法中的數據塊寫操作不會受到它的阻礙了。在寫操作完成之後,我們應該及時向條件變量rcond發送通知以喚醒為此而等待的Goroutine。請注意,在某個Goroutine被喚醒之後,應該再次檢查需要被滿足的條件。在這裏,這個需要被滿足的條件是在進行文件內容讀取時不會造成EOF錯誤。如果該條件被滿足,那麼就可以進行後續的操作了。否則,應該再次放棄“讀鎖”並等待通知。這也是我們依然保留for循環的原因。

這裏有兩點需要特別注意。

  • 一定要在調用rcond的Wait方法之前鎖定與之關聯的那個“讀鎖”,否則就會造成對Wait方法的調用永遠無法返回。這種情況會導致流程執行的停滯,甚至整個程序的死鎖!導致這種結果的原因與條件變量和讀寫鎖的內部實現方式有關(結果也許並不應該是這樣,作者已經向Go語言官方提交了一個issue;Go語言官方已經接受了這個issue,並承諾將會在Go 1.4版本中改進它)。假設,與條件變量rcond關聯的是某個讀寫鎖的“寫鎖”或普通的互斥鎖,那麼對rcond.Wait方法的調用將會引發一個運行時恐慌。原因是,該方法會先對與之關聯的鎖進行解鎖,而試圖解鎖未被鎖定的鎖就會引發一個運行時恐慌。
  • 一定不要忘記在讀操作完成之前解鎖與條件變量rcond關聯的那個“讀鎖”,否則對讀寫鎖的寫鎖定操作將會阻塞相關的Goroutine。其根本原因是,條件變量rcond的Wait方法在返回之前會重新鎖定與之關聯的那個“讀鎖”。因此,在結束這個從文件中讀取一個數據塊的流程之前,我們應該調用fmutex字段的RLock方法。那條defer語句就起到了這個作用。

我們對Read方法的這次改進使得它的實現變得更加簡潔和清晰了。不過,要想使其中的條件變量rcond真正發揮作用,還需要Write方法的配合。換句話說,為了讓rcond.Wait方法可以適時的返回,我們要在向文件寫入一個數據塊之後及時的向rcond發送通知。添加了這一操作的Write方法如下:

01 func (df *myDataFile) Write(d Data) (wsn int64, err error) {
02  
03 // 省略若幹條語句
04  
05 var bytes []byte
06  
07 // 省略若幹條語句
08  
09 df.fmutex.Lock()
10  
11 defer df.fmutex.Unlock()
12  
13 _, err = df.f.Write(bytes)
14  
15 df.rcond.Signal()
16  
17 return
18  
19 }

由於一個數據塊隻能由某一個讀操作讀取,所以我們隻是使用條件變量的Signal方法去通知某一個為此等待的Wait方法,並以此喚醒某一個相關的Goroutine。這可以免去其它相關的Goroutine中的一些無謂操作。

與Wait方法不同,我們在調用條件變量的Signal方法和Broadcast方法之前無需鎖定與之關聯的鎖。隨之,相應的解鎖操作也是不需要的。在這個Write方法中的鎖定操作和解鎖的操作針對的並不是df.rcond.Signal()語句。

我們一直在說,條件變量rcond是與讀寫鎖fmutex的“讀鎖”關聯的。這是怎樣做到的呢?讀者還記得我們在上一節提到讀寫鎖的RLocker方法嗎?它會返回當前讀寫鎖中的“讀鎖”。這個結果值同時也是sync.Locker接口的實現。因此,我們可以把它作為參數值傳給sync.NewCond函數。所以,我們在NewDataFile函數中的聲明df變量的語句的後麵加入了這樣一條語句:

1 df.rcond = sync.NewCond(df.fmutex.RLocker())

在這之後,我們就可以像前麵那樣使用這個條件變量了。

隨著對*myDataFile類型和NewDataFile函數的改造的完成,我們也將結束本節。Go語言提供的互斥鎖、讀寫鎖和條件變量都基本遵循了POSIX標準中描述的對應的同步工具的行為規範。它們簡單且高效。我們可以使用它們為複雜的類型提供並發安全的保證。在一些情況下,它們比通道更加靈活。在隻需對一個或多個臨界區進行保護的時候,使用鎖往往會對程序的性能損耗更小。

好了,現在簡單預告一下後麵的內容。在下一節中,我們將會介紹對程序性能損耗更小的同步工具——原子操作。同樣的,我們會使用這一工具進一步改造*myDataFile類型及其方法。

最後更新:2017-05-23 13:31:45

  上一篇:go  Java Reflection(三):構造器
  下一篇:go  Java IO: 字節和字符數組