閱讀416 返回首頁    go 阿裏雲 go 技術社區[雲棲]


軟件事務內存導論(七)阻塞事務

阻塞事務——有意識地等待

我們經常會遇到這樣一種情況,即某事務T能否成功完成依賴於某個變量是否發生了變化,並且由於這種原因所引起的事務運行失敗也可能隻是暫時性的。作 為對這種暫時性失敗的響應,我們可能會返回一個錯誤碼並告訴事務T等待一段時間之後再重試。然而在事務T等待期間,即使其他任務已經更改了事務T所依賴的 數據,事務T也沒法立即感知到並重試了。為了解決這一問題,Akka為我們提供了一個簡單的工具——retry(),該函數可以先將事務進行回滾,並將事 務置為阻塞狀態直到該事物所依賴的引用對象發生變化或事務阻塞的時間超過了之前配置的阻塞超時為止。我本人更願意將這一過程稱為“有意識地等待”,因為這 種說法聽起來比“阻塞”更合適一些。下麵讓我們將阻塞(或有意識地等待)用於下麵的兩個例子當中。


在Java中阻塞事務

程序員一般都會對咖啡因上癮,所以加班的時候任何主動要去拿些咖啡回來喝的人都知道不能空手而歸。但是這個拿咖啡的人很聰明,他沒有忙等(busy wait)至咖啡罐被重新填滿,而是在Akka的幫助下給自己設置了一個消息提醒,一旦咖啡罐有變化他就能收到這個通知。下麵讓我們用retry()來實 現這個可以有意識等待的fillCup()函數。

public  class  CoffeePot  {
	private  static  final  long  start  =  System.nanoTime();
	private  static  final  Ref<Integer>  cups  =  new  Ref<Integer>(24);
	private  static  void  fillCup(final  int  numberOfCups)  {
		final  TransactionFactory  factory  =
			new  TransactionFactoryBuilder()
			.setBlockingAllowed(true)
			.setTimeout(new  DurationInt(6).seconds())
			.build();
		new  Atomic<Object>(factory)  {
			public  Object  atomically()  {
				if(cups.get()  <  numberOfCups)  {
					System.out.println("retry........  at  "  +
						(System.nanoTime()  -  start)/1.0e9);
					retry();
				}
				cups.swap(cups.get()  -  numberOfCups);
				System.out.println("filled  up...."  +  numberOfCups);
				System.out.println("........  at  "  +
					(System.nanoTime()  -  start)/1.0e9);
				return  null;
			}
		}.execute();
	}

在fillCup()函數中,我們將事務配置成blockingAllowed,並將事務完成的超時時間設為6秒。當發現當前沒有足夠數量的咖啡 時,fillCups()函數沒有簡單地返回一個錯誤碼,而是調用了StmUtil的retry()函數進行有意識地等待。這將使得當前事務進入阻塞狀 態,直到與之相關的cups引用發生變化為止。一旦有任何相關的引用發生改變,係統將啟動一個新事務將之前包含retry的原子性代碼進行重做。

下麵讓我們通過調用fillCup()函數來觀察retry()的實際效果:

	public  static  void  main(final  String[]  args)  {
		final  Timer  timer  =  new  Timer(true);
		timer.schedule(new  TimerTask()  {
			public  void  run()  {
				System.out.println("Refilling....  at  "  +
					(System.nanoTime()  -  start)/1.0e9);
				cups.swap(24);
			}
		},  5000);
		fillCup(20);
		fillCup(10);
		try  {
			fillCup(22);
		}  catch(Exception  ex)  {
			System.out.println("Failed:  "  +  ex.getMessage());
		}
	}
}

在main()函數中,我們啟動了一個每隔大約5秒就往咖啡壺重新裝填咖啡的定時任務。隨後,第一個跑去拿咖啡的同事A立即取走了20杯咖啡。緊接 著,當我們這邊自告奮勇去取咖啡的同事B想再取走10杯咖啡時,他的動作將被阻塞直至重新裝填任務完成為止,而這種等待要比不斷重試的方案高效得多。重新 裝填的事務完成之後,同事B的請求將被自動重試,而這一次他的請求成功完成了。如果重新裝填任務沒有在我們設定的超時時間內發生,則請求咖啡的事務將會失 敗,在上例的try代碼塊中的那個請求就屬於這種情況。我們可以通過輸出日誌來觀察到這一行為,同時也可以更深入地體會到retry()為我們帶來的便 利:

filled  up....20
........  at  0.423589
retry........  at  0.425385
retry........  at  0.427569
Refilling....  at  5.130381
filled  up....10
........  at  5.131149
retry........  at  5.131357
retry........  at  5.131521
Failed:  Transaction  DefaultTransaction  has  timed  with  a
total  timeout  of  6000000000  ns

從上述輸出結果中我們可以看到,第一個倒20杯咖啡的請求是在程序開始運行之後0.4秒左右完成的。而第一個請求完成之後,咖啡壺裏就僅剩餘4杯咖 啡了,所以第二個倒10杯咖啡的請求就隻能被阻塞,直到程序運行至5秒左右時重新裝填的任務完成為止。在重新裝填的任務完成之後,倒10杯咖啡的事務被重 新啟動,並在程序運行到5秒多一點的時候成功完成。最後一個倒22杯咖啡的任務則由於在規定的超時時間內沒有再次發生重新裝填而以失敗告終。

其實我們在日常工作中並不會經常用到retry(),隻有當程序邏輯需要執行某些操作、而這些操作又依賴於某些相關數據發生變化的情況下,我們才能受益於這個監控數據變化的特性。

在Scala中阻塞事務

在上麵Java版的示例中,我們使用了一個提供了很多STM相關的便利接口的StmUtils對象。而在Scala中,我們可以直接使用 StmUtils裏提供的各種特性(trait)。此外,我們在Scala中同樣可以用工廠方法來創建TransactionFactory。

object  CoffeePot  {
	val  start  =  System.nanoTime()
	val  cups  =  Ref(24)
	def  fillCup(numberOfCups  :  Int)  =  {
		val  factory  =  TransactionFactory(blockingAllowed  =  true,
			timeout  =  6  seconds)
		atomic(factory)  {
			if(cups.get()  <  numberOfCups)  {
				println("retry........  at  "  +  (System.nanoTime()  -  start)/1.0e9)
				retry()
			}
			cups.swap(cups.get()  -  numberOfCups)
			println("filled  up...."  +  numberOfCups)
			println("........  at  "  +  (System.nanoTime()  -  start)/1.0e9)
		}
	}
	def  main(args  :  Array[String])  :  Unit  =  {
		val  timer  =  new  Timer(true)
		timer.schedule(new  TimerTask()  {
			def  run()  {
				println("Refilling....  at  "  +  (System.nanoTime()  -  start)/1.0e9)
				cups.swap(24)
			}
		},  5000)
		fillCup(20)
		fillCup(10)
		try  {
			fillCup(22)
		}  catch  {
			case  ex  =>  println("Failed:  "  +  ex.getMessage())
		}
	}
}

在創建TransactionFactory對象時,我們並沒有直接使用DurationInt來配置事務的超時時間,而是用 intToDurationInt()函數來完成從int到DurationInt的隱式轉換。通過Scala隱式轉換所帶來的語法上的便利,我們在初始 化TransactionFactory對象時隻需簡單調用6 seconds即可。示例代碼中餘下的部分就隻是從Java到Scala的一個簡單的翻譯而已,其最終的結果輸出如下所示:

filled up....20 ........ at 0.325964 retry........ at 0.327425 retry........ at 0.329587 Refilling.... at 5.105191 filled up....10 ........ at 5.106074 retry........ at 5.106296 retry........ at 5.106466 Failed: Transaction DefaultTransaction has timed with a total timeout of 6000000000 ns


文章轉自 並發編程網-ifeve.com

最後更新:2017-05-22 16:02:35

  上一篇:go  非阻塞同步算法實戰(一)
  下一篇:go  JDateTime