949
阿裏雲
技術社區[雲棲]
《GO並發編程實戰》—— Concurrent Map
我們在本章前麵的部分中對Go語言提供的各種傳統同步工具和方法進行了逐一的介紹。在本節,我們將運用它們來構造一個並發安全的字典(Map)類型。
我們已經知道,Go語言提供的字典類型並不是並發安全的。因此,我們需要使用一些同步方法對它進行擴展。這看起來並不困難。我們隻要使用讀寫鎖將針對一個字典類型值的讀操作和寫操作保護起來就可以了。確實,讀寫鎖應該是我們首先想到的同步工具。不過,我們還不能確定隻使用它是否就足夠了。不管怎樣,讓我們先來編寫並發安全的字典類型的第一個版本。
我們先來確定並發安全的字典類型的行為。還記得嗎?依然,這需要聲明一個接口類型。我們在第4章帶領讀者編寫過OrderedMap接口類型及其實現類型。我們可以借鑒OrderedMap接口類型的聲明並編寫出需要在這裏聲明的接口類型ConcurrentMap。實際上,ConcurrentMap接口類型的方法集合應該是OrderedMap接口類型的方法集合的一個子集。我們隻需從OrderedMap中去除那些代表有序Map特有行為的方法聲明即可。既然是這樣,我何不從這兩個自定義的字典接口類型中抽出一個公共接口呢?
這個公共的字典接口類型可以是這樣的:
03 |
type GenericMap interface {
|
05 |
// 獲取給定鍵值對應的元素值。若沒有對應元素值則返回nil。 |
07 |
Get(key interface {}) interface {}
|
09 |
// 添加鍵值對,並返回與給定鍵值對應的舊的元素值。若沒有舊元素值則返回(nil, true)。 |
11 |
Put(key interface {}, elem interface {}) ( interface {}, bool)
|
13 |
// 刪除與給定鍵值對應的鍵值對,並返回舊的元素值。若沒有舊元素值則返回nil。 |
15 |
Remove(key interface {}) interface {}
|
27 |
Contains(key interface {}) bool
|
39 |
ToMap() map[ interface {}] interface {}
|
43 |
KeyType() reflect.Type |
47 |
ElemType() reflect.Type |
然後,我們把這個名為GenericMap的字典接口類型嵌入到OrderedMap接口類型中,並去掉後者中的已在前者內聲明的那些方法。修改後的OrderedMap接口類型如下:
03 |
type OrderedMap interface {
|
05 |
GenericMap // 泛化的Map接口
|
07 |
// 獲取第一個鍵值。若無任何鍵值對則返回nil。 |
09 |
FirstKey() interface {}
|
11 |
// 獲取最後一個鍵值。若無任何鍵值對則返回nil。 |
15 |
// 獲取由小於鍵值toKey的鍵值所對應的鍵值對組成的OrderedMap類型值。 |
17 |
HeadMap(toKey interface {}) OrderedMap
|
19 |
// 獲取由小於鍵值toKey且大於等於鍵值fromKey的鍵值所對應的鍵值對組成的OrderedMap類型值。 |
21 |
SubMap(fromKey interface {}, toKey interface {}) OrderedMap
|
23 |
// 獲取由大於等於鍵值fromKey的鍵值所對應的鍵值對組成的OrderedMap類型值。 |
25 |
TailMap(fromKey interface {}) OrderedMap
|
我們要記得在修改完成後立即使用go test命令重新運行相關的功能測試,並以此確保這樣的重構沒有破壞任何現有的功能。
有了GenericMap接口類型之後,我們的ConcurrentMap接口類型的聲明就相當簡單了。由於後者沒有任何特殊的行為,所以我們隻要簡單地將前者嵌入到後者的聲明中即可,就像這樣:
1 |
type ConcurrentMap interface {
|
下麵我們來編寫該接口類型的實現類型。我們依然使用一個結構體類型來充當,並把它命名為myConcurrentMap。myConcurrentMap類型的基本結構如下:
01 |
type myConcurrentMap struct { |
03 |
m map[ interface {}] interface {}
|
有了編寫myOrderedMap類型(還記得嗎?它的指針類型是OrderedMap的實現類型)的經驗,寫出myConcurrentMap類型的基本結構也是一件比較容易的事情。可以看到,在基本需要之外,我們隻為myConcurrentMap類型加入了一個代表了讀寫鎖的rwmutex字段。此外,我們需要為myConcurrentMap類型添加的那些指針方法的實現代碼實際上也可以以myOrderedMap類型中的相應方法為藍本。不過,在實現前者的過程中要注意合理運用同步方法以保證它們的並發安全性。下麵,我們就開始編寫它們。
首先,我們來看Put、Remove和Clear這幾個方法。它們都屬於寫操作,都會改變myConcurrentMap類型的m字段的值。
方法Put的功能是向myConcurrentMap類型值添加一個鍵值對。那麼,我們在這個操作的前後一定要分別鎖定和解鎖rwmutex的寫鎖。Put方法的實現如下:
01 |
func (cmap *myConcurrentMap) Put(key interface {}, elem interface {}) ( interface {}, bool) {
|
03 |
if !cmap.isAcceptablePair(key, elem) {
|
11 |
defer cmap.rwmutex.Unlock() |
13 |
oldElem := cmap.m[key] |
該實現中的isAcceptablePair方法的功能是檢查參數值key和elem是否均不為nil且它們的類型是否均與當前值允許的鍵類型和元素類型一致。在通過該檢查之後,我們就需要對rwmutex進行鎖定了。相應的,我們使用defer語句來保證對它的及時解鎖。與此類似,我們在Remove和Clear方法的實現中也應該加入相同的操作。
與這些代表著寫操作的方法相對應的,是代表讀操作的方法。在ConcurrentMap接口類型中,此類方法有Get、Len、Contains、Keys、Elems和ToMap。我們需要分別在這些方法的實現中加入對rwmutex的讀鎖的鎖定和解鎖操作。以Get方法為例,我們應該這樣來實現它:
1 |
func (cmap *myConcurrentMap) Get(key interface {}) interface {} {
|
5 |
defer cmap.rwmutex.RUnlock() |
這裏有兩點需要特別注意。
- 我們在使用寫鎖的時候,要注意方法間的調用關係。比如,一個代表寫操作的方法中調用了另一個代表寫操作的方法。顯然,我們在這兩個方法中都會用到讀寫鎖中的寫鎖。但如果使用不當,我們就會使前者被永遠鎖住。當然,對於代表寫操作的方法調用代表讀操作的方法的這種情況來說,也會是這樣。請看下麵的示例:
1 |
func (cmap *myConcurrentMap) Remove(key interface {}) interface {} { cmap.rwmutex.Lock() defer cmap.rwmutex.Unlock() oldElem := cmap.Get() delete(cmap.m, key) return oldElem }
|
可以看到,我們在Remove方法中調用了Get方法。並且,在這個調用之前,我們已經鎖定了rwmutex的寫鎖。然而,由前麵的展示可知,我們在Get方法的開始處對rwmutex的讀鎖進行了鎖定。由於這兩個鎖定操作之間的互斥性,所以我們一旦調用這個Remove方法就會使當前Goroutine永遠陷入阻塞。更嚴重的是,在這之後,其他Goroutine在調用該*myConcurrentMap類型值的一些方法(涉及到其中的rwmutex字段的讀鎖或寫鎖)的時候也會立即被阻塞住。
我們應該避免這種情況的方式。這裏有兩種解決方案。第一種解決方案是,把Remove方法中的oldElem := cmap.Get()語句與在它前麵的那兩條語句的位置互換,即變為:
1 |
oldElem := cmap.Get() cmap.rwmutex.Lock() defer cmap.rwmutex.Unlock() |
這樣可以保證在解鎖讀鎖之後才會去鎖定寫鎖。相比之下,第二種解決方案更加徹底一些,即:消除掉方法間的調用。也就是說,我們需要把oldElem := cmap.Get()語句替換掉。在Get方法中,體現其功能的語句是oldElem := cmap.m[key]。因此,我們把後者作為前者的替代品。若如此,那麼我們必須保證該語句出現在對寫鎖的鎖定操作之後。這樣,我們才能依然確保其在鎖的保護之下。實際上,通過這樣的修改,我們升級了Remove方法中的被用來保護從m字段中獲取對應元素值的這一操作的鎖(由讀鎖升級至寫鎖)。
- 對於rwmutex字段的讀鎖來說,雖然鎖定它的操作之間不是互斥的,但是這些操作與相應的寫鎖的鎖定操作之間卻是互斥的。我們在上一條注意事項中已經說明了這一點。因此,為了最小化對寫操作的性能的影響,我們應該在鎖定讀鎖之後盡快的對其進行解鎖。也就是說,我們要在相關的方法中盡量減少持有讀鎖的時間。這需要我們綜合的考量。
依據前麵的示例和注意事項說明,讀者可以試著實現Remove、Clear、Len、Contains、Keys、Elems和ToMap方法。它們實現起來並不困難。注意,我們想讓*myConcurrentMap類型成為ConcurrentMap接口類型的實現類型。因此,這些方法都必須是myConcurrentMap類型的指針方法。這包括馬上要提及的那兩個方法。
方法KeyType和ElemType的實現極其簡單。我們可以直接分別返回myConcurrentMap類型的keyType字段和elemType字段的值。這兩個字段的值應該是在myConcurrentMap類型值的使用方初始化它的時候給出的。
按照慣例,我們理應提供一個可以方便的創建和初始化並發安全的字典值的函數。我們把它命名為NewConcurrentMap,其實現如下:
01 |
func NewConcurrentMap(keyType, elemType reflect.Type) ConcurrentMap { |
03 |
return &myConcurrentMap{
|
09 |
m: make(map[ interface {}] interface {})}
|
這個函數並沒有什麼特別之處。由於myConcurrentMap類型的rwmutex字段並不需要額外的初始化,所以它並沒有出現在該函數中的那個複合字麵量中。此外,為了遵循麵向接口編程的原則,我們把該函數的結果的類型聲明為了ConcurrentMap,而不是它的實現類型*myConcurrentMap。如果將來我們編寫出了另一個ConcurrentMap接口類型的實現類型,那麼就應該考慮調整該函數的名稱。比如變更為NewDefaultConcurrentMap,或者其他。
待讀者把還未實現的*myConcurrentMap類型的那幾個方法都補全之後(可以利用NewConcurrentMap函數來檢驗這個類型是否是一個合格的ConcurrentMap接口的實現類型),我們就開始一起為該類型編寫功能和性能測試了。
參照我們之前為*myOrderedMap類型編寫的功能測試,我們可以很快的照貓畫虎的創建出*myConcurrentMap類型的功能測試函數。這些函數和本小節前麵講到的所有代碼都被放到了goc2p項目的basic/map1代碼包中。其中,接口類型ConcurrentMap的聲明和myConcurrentMap類型的基本結構及其所有的指針方法均在庫源碼文件cmap.go中。因此,我們應該把對應的測試代碼放到cmap_test.go文件中。
既然有了很好的參照,作者並不想再贅述*myConcurrentMap類型的功能測試函數了。我希望讀者能夠先獨立的編寫出來並通過go test命令的檢驗,然後再去與cmap_test.go文件中的代碼對照。
另外,在myConcurrentMap類型及其指針方法的實現中,我們多處用到了讀寫鎖和反射API(聲明在reflect代碼包中的那些公開的程序實體)。它們執行的都是可能會對程序性能造成一定影響的操作。因此,針對*myConcurrentMap類型的性能測試(或稱基準測試)是很有必要的。這樣我們才能知道它的值在性能上到底與官方的字典類型有怎樣的差別。
我們在測試源碼文件cmap_test.go文件中聲明兩個基準測試函數——BenchmarkConcurrentMap和BenchmarkMap。顧名思義,這兩個函數是分別被用來測試*myConcurrentMap類型和Go語言官方的字典類型的值的性能的。
在BenchmarkConcurrentMap函數中,我們執行這樣一個流程。
(1) 初始化一個*myConcurrentMap類型的值,同時設定鍵類型和元素類型均為int32類型。
(2) 執行迭代次數預先給定(即該函數的*testing.B類型的參數b的字段N的值)的循環。在單次迭代中,我們向字典類型值添加一個鍵值對,然後再試圖從該值中獲取與當前鍵值對應的元素值。
(3) 打印出一行提示信息,包含該值的鍵類型、元素類型以及長度等內容。
下麵是該函數的實現:
01 |
func BenchmarkConcurrentMap(b *testing.B) { |
03 |
keyType := reflect.TypeOf(int32( 2 ))
|
07 |
cmap := NewConcurrentMap(keyType, elemType) |
11 |
fmt.Printf("N=%d.\n", b.N) |
15 |
for i := 0 ; i < b.N; i++ {
|
23 |
elem = seed << 10
|
43 |
mapType := fmt.Sprintf("ConcurrentMap<%s, %s>", |
45 |
keyType.Kind().String(), elemType.Kind().String()) |
47 |
b.Logf("The length of % value is %d.\n", mapType, ml) |
在這段代碼中,我們用到了參數b的幾個方法。我們在第5章講基準測試的時候說明過它們的功用。這裏再簡單回顧一下。b.ResetTimer方法的功能是將針對該函數的本次執行的計時器歸零。而b.StartTimer方法和b.StopTimer方法的功能則分別是啟動和停止這個計時器。在該函數體中,我們使用這三個方法忽略掉一些無關緊要的語句的執行時間。更具體的講,我們隻對for語句的for子句及其代碼塊中的cmap.Put(key, elem)語句和_ = cmap.Get(key)語句,以及ml := cmap.Len()語句的執行時間進行計時。注意,隻要它們的耗時不超過1秒或由go test命令的benchtime標記給定的時間,那麼測試運行程序就會嚐試著多次執行該函數並在每次執行前增加b.N的值。所以,我們去掉無關語句的執行耗時也意味著會讓BenchmarkConcurrentMap函數被執行更多次。
除此之外,我們還用到了b.SetBytes方法。它的作用是記錄在單次操作中被處理的字節的數量。在這裏,我們每次記錄一個鍵值對所用的字節數量。由於鍵和元素的類型都是int32類型的,所以它們共會用掉8個字節。
在編寫完成BenchmarkConcurrentMap函數之後,我們便可以如法炮製針對Go官方的字典類型的基準測試函數BenchmarkMap了。請注意,為了公平起見,我們在初始化這個字典類型值的時候也要把它的鍵類型和元素類型都設定為interface{},就像這樣:
1 |
imap := make(map[ interface {}] interface {})
|
但是,在為其添加鍵值對的時候要讓鍵和元素值的類型均為int32類型。
在一切準備妥當之後,我們在相應目錄下使用命令
go test -bench=”.” -run=”^$” -benchtime=1s -v
運行goc2p項目的basic/map1代碼包中的基準測試。
稍等片刻,標準輸出上會出現如下內容:
03 |
BenchmarkConcurrentMap N= 1 .
|
11 |
1000000 1612 ns/op 4.96 MB/s
|
13 |
--- BENCH: BenchmarkConcurrentMap |
15 |
cmap_test.go: 240 : The length of ConcurrentMap<int32, int32>alue is 1 .
|
17 |
cmap_test.go: 240 : The length of ConcurrentMap<int32, int32>alue is 100 .
|
19 |
cmap_test.go: 240 : The length of ConcurrentMap<int32, int32>alue is 10000 .
|
21 |
cmap_test.go: 240 : The length of ConcurrentMap<int32, int32>alue is 1000000 .
|
33 |
2000000 856 ns/op 9.35 MB/s
|
35 |
--- BENCH: BenchmarkMap |
37 |
cmap_test.go: 268 : The length of Map<int32, int32> value is 1 .
|
39 |
cmap_test.go: 268 : The length of Map<int32, int32> value is 100 .
|
41 |
cmap_test.go: 268 : The length of Map<int32, int32> value is 10000 .
|
43 |
cmap_test.go: 268 : The length of Map<int32, int32> value is 1000000 .
|
45 |
cmap_test.go: 268 : The length of Map<int32, int32> value is 2000000 .
|
47 |
ok basic/map1 258 .327s
|
我們看到,測試運行程序執行BenchmarkConcurrentMap函數的次數是4,而執行BenchmarkMap函數的次數是5。這從以“N=”為起始的輸出內容和測試日誌的行數上都可以看得出來。由我們前麵提到的測試運行程序多次執行基準測試函數的前提條件已經可知,Go語言提供的字典類型的值的性能要比我們自行擴展的並發安全的*myConcurrentMap類型的值的性能好。具體的性能差距可以參看測試輸出中的那兩行代表了測試細節的內容,即:
1 |
1000000 1612 ns/op 4.96 MB/s
|
和
1 |
2000000 856 ns/op 9.35 MB/s
|
前者代表針對*myConcurrentMap類型值的測試細節。測試運行程序在1秒鍾之內最多可以執行相關操作(包括添加鍵值對、根據鍵值獲取元素值和獲取字典類型值的長度)的次數為一百萬,平均每次執行的耗時為1612納秒。並且,根據我們在BenchmarkConcurrentMap函數中的設置,它每秒可以處理4.86兆字節的數據。
另一方麵,Go語言方法的字典類型的值的測試細節是這樣的:測試運行程序在1秒鍾之內最多可以執行相關操作的次數為兩百萬,平均每次執行的耗時為856納秒,根據BenchmarkMap函數中的設置,它每秒可以處理9.35兆字節的數據。
從上述測試細節可以看出,前者在性能上要比後者差,且差距將近一倍。這樣的差距幾乎都是由*myConcurrentMap類型及其方法中使用的讀寫鎖造成的。
由此,我們也印證了,同步工具在為程序的並發安全提供支持的同時也會對其性能造成了不可忽視的損耗。這也使我們認識到:在使用同步工具的時候應該仔細斟酌並盡量平衡各個方麵的指標,以使其無論是在功能上還是在性能上都能達到我們的要求。
順便提一句,Go語言未對自定義泛型提供支持,以至於我們在編寫此類擴展的時候並不是那麼方便。有時候,我們不得不使用反射API。但是,眾所周知,它們對程序性能的負麵影響也是不可小覷的。因此,我們應該盡量減少對它們的使用。
最後更新:2017-05-23 12:02:34