674
阿裏雲
技術社區[雲棲]
軟件事務內存導論(四)創建事務
創建事務
我們創建事務的目的是為了協調針對多個托管引用的變更。事務將會保證這些變更是原子的,也就是說,所有的托管引用要麼全部被提交要麼全部被丟棄,所
以在事務之外我們將不會看到有任何局部變更(partial
changes)出現。此外,我們也可以用創建事務的方式來解決對單個ref先讀後寫所引發的相關問題。
Akka是用Scala開發出來的,所以如果我們工作中用的是Scala的話,就可以直接幸福地享用Akka簡潔明了的API了。對於那些日常工作
中不能使用Scala開發的程序員,Akka同樣也提供了一組方便的API,以幫助他們通過Java語言來使用該類庫的功能。本節我們將會看到如何利用
Akka在Java和Scala中創建事務。
首先我們需要選一個適合用事務來解決的例子。我們在第5章中重構的EnergySource類使用了顯式的加鎖和解鎖操作(其最終版本詳見5.7節),下麵讓就我們將這些顯式的加鎖/解鎖操作換用Akka的事務API來實現。
在Java中創建事務
為了將代碼邏輯封裝到一個事務中,我們需要創建一個Atomic類的實例並將代碼放到該類的atomically()函數裏。隨後,我們可以通過調用Atomic實例的execute()函數來執行事務代碼。類似於下麵這樣:
1 |
return new Atomic<Object>() {
|
2 |
public Object atomically() {
|
3 |
//code to run in a transaction...
|
調用execute()函數的線程將負責執行atomically()函數裏的代碼。然而如果調用者本身並沒有處在一個事務中的話,那麼這個調用將會被封裝在一個新的事務中。
下麵讓我們用Akka事務來重新實現EnergySource。首先,讓我們將不可變狀態封裝到可變的Akka托管引用中去。
1 |
public class EnergySource {
|
2 |
private final long MAXLEVEL = 100 ;
|
3 |
final Ref<Long> level = new Ref<Long>(MAXLEVEL);
|
4 |
final Ref<Long> usageCount = new Ref<Long>(0L);
|
5 |
final Ref<Boolean> keepRunning = new Ref<Boolean>( true );
|
6 |
private static final ScheduledExecutorService replenishTimer =
|
7 |
Executors.newScheduledThreadPool( 10 );
|
在這段變量定義的代碼中,level和usageCount都被聲明為Akka Ref,並且各自持有一個不可變的Long類型的值。於是在Java中我們就不能更改這些Long類型的值了,但我們仍然可以通過更改托管引用(即實體)使其安全地指向新值。
在EnergySource的上一個版本中,ScheduledExecutorService會周期性地(每秒鍾一次)調用
replenish()函數直至整個任務結束,這就要求stopEnergySource()必須是同步的。而在這個版本中,我們不用再周期性地調用
replenish()函數,而隻會在對象實例初始化的時候執行一下調度操作。在每次調用replenish()函數時,我們都會根據
keepRunning的值來決定該函數是否應該在1秒之後再次被調度執行。這一變化消除了stopEnergySource()函數和調度器/計時器
(timer)之間的耦合。相反地,stopEnergySource()函數現在隻依賴於keepRunning這個標誌,而該標誌可以很容易地通過
STM事務來行管理。
在這一版的代碼中,由於可以依賴事務來保證安全性,所以我們沒必要再對stopEnergySource()函數進行同步了。同時,由於swap()函數本身就是以事務方式執行的,所以我們也無需顯式地為其創建事務。
01 |
private EnergySource() {} |
03 |
replenishTimer.schedule(new Runnable() {
|
06 |
if (keepRunning.get()) replenishTimer.schedule(
|
07 |
this, 1, TimeUnit.SECONDS);
|
09 |
}, 1, TimeUnit.SECONDS);
|
11 |
public static EnergySource create() { |
12 |
final EnergySource energySource = new EnergySource();
|
16 |
public void stopEnergySource() { keepRunning.swap(false); } |
如下所示,返回當前電量和使用次數的方法將會用到托管引用,但也隻是需要調用一下get()函數而已。
1 |
public long getUnitsAvailable() { return level.get(); }
|
2 |
public long getUsageCount() { return usageCount.get(); }
|
在getUnitsAvailable()函數和getUsageCount()函數中,由於其中的get()函數都是以事務方式運行的,所以無需顯式地將它們封裝在事務裏。
由於我們會在useEnergy()函數中同時修改電量和使用次數,所以useEnergy()函數需要使用一個顯式的事務來完成這些操作。在這
裏,我們需要保證對所有被讀取的值的變更都能保持一致性,即確保對這兩個字段的變更是原子的。為了實現這一目標,我們將使用Atomic接口,並用
atomically()函數將我們的邏輯代碼封裝到一個事務中。
01 |
public boolean useEnergy( final long units) {
|
02 |
return new Atomic<Boolean>() {
|
03 |
public Boolean atomically() {
|
04 |
long currentLevel = level.get();
|
05 |
if (units > 0 && currentLevel >= units) {
|
06 |
level.swap(currentLevel - units);
|
07 |
usageCount.swap(usageCount.get() + 1 );
|
useEnergy()函數的功能是從當前電量中減掉所消耗的電量(即unit——譯者注)。為了實現這一目標,我們需要保證所涉及到的get和
set操作都在同一個事務中完成,所以我們把所有相關操作都用atomically()函數封裝了起來。最後,我們會調用execute()函數來啟動事
務並順序執行的所有操作。
除了上述方法之外,我們還需要關注一下負責給電源充電的replenish()函數。由於這個方法也需要使用事務,所以其實現代碼同樣需要用Atomic進行封裝。
01 |
private void replenish() {
|
03 |
public Object atomically() {
|
04 |
long currentLevel = level.get();
|
05 |
if (currentLevel < MAXLEVEL) level.swap(currentLevel + 1 );
|
下麵是針對EnergySource類的測試代碼。其主要功能是,用多個線程並發地使用電池,每使用一次消耗一格電,每個線程最多會消耗7格電量。
01 |
public class UseEnergySource {
|
02 |
private static final EnergySource energySource = EnergySource.create();
|
03 |
public static void main( final String[] args)
|
04 |
throws InterruptedException, ExecutionException {
|
05 |
System.out.println( "Energy level at start: " +
|
06 |
energySource.getUnitsAvailable());
|
07 |
List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
|
08 |
for ( int i = 0 ; i < 10 ; i++) {
|
09 |
tasks.add( new Callable<Object>() {
|
10 |
public Object call() {
|
11 |
for ( int j = 0 ; j < 7 ; j++) energySource.useEnergy( 1 );
|
16 |
final ExecutorService service = Executors.newFixedThreadPool( 10 );
|
17 |
service.invokeAll(tasks);
|
18 |
System.out.println( "Energy level at end: " +
|
19 |
energySource.getUnitsAvailable());
|
20 |
System.out.println( "Usage: " + energySource.getUsageCount());
|
21 |
energySource.stopEnergySource();
|
上述代碼需要把Akka相關的Jar添加到Java的classpath中才能編譯通過,所以首先我們需要創建一個標識jar位置的環境變量:
export AKKA_JARS="$AKKA_HOME/lib/scala-library.jar:\
$AKKA_HOME/lib/akka/akka-stm-1.1.3.jar:\
$AKKA_HOME/lib/akka/akka-actor-1.1.3.jar:\
$AKKA_HOME/lib/akka/multiverse-alpha-0.6.2.jar:\
$AKKA_HOME/config:\
."
Classpath的定義取決於你使用的操作係統以及Akka在你的操作係統中被安裝的位置。我們可以用javac編譯器來編譯代碼,並用java命令來負責執行,具體細節如下所示:
javac -classpath $AKKA_JARS -d . EnergySource.java UseEnergySource.java
java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseEnergySource
萬事俱備,下麵讓我們來編譯並執行這段代碼。通過代碼的實現邏輯我們知道,電源初始有100格電量,而我們創建的10個線程將會消耗掉其中的70格
電量,所以最後電源應該淨剩30格電量。但由於電池電量會每秒回複一格,所以每次運行結果可能會稍有不同,比如最後淨剩電量可能是31格而不是30格。
Energy level at start: 100
Energy level at end: 30
Usage: 70
默認情況下,Akka會將額外的日誌消息打印到標準輸出上。停掉這個默認的輸出也很容易,我們隻需要在$AKKA_HOME/config目錄下創
建一個名為logback.xml的文件,並在裏麵添加這項配置即可。由於這個文件位於classpath中,所以logger會自動找到這個文件、讀取
其中的配置並停掉消息輸出。除此之外,我們還可以在這個配置文件中設置很多其他有用的配置項。詳情請見https://logback.qos.ch/manual/configuration.html。
正如我們在本例中所看到的那樣,Akka是在後台默默地對事務進行管理的,所以請你多花些時間研究一下上述示例代碼,並對事務和線程的運作過程多做一些嚐試以便加深對這塊知識的理解。
在Scala中創建事務
我們之前已經看到了如何在Java中創建事務(並且我假設你已經閱讀過那一部分,所以這裏我們就不再贅述了),下麵我們將會在Scala中用更少的
代碼來完成同樣的功能。我們之所以能兼顧簡潔與功能,部分得益於Scala自身簡潔的特點,但更多還是由於Akka
API使用了閉包/函數值(closures/function values)的緣故。
相比Java的繁冗,我們在Scala中可以通過很簡潔的方法來創建事務。我們所需要做的隻是調用一下Stm的auomic()函數就行了,如下所示:
2 |
//code to run in a transaction....
|
3 |
/* return */ resultObject
|
其中,我們傳給atomic()的閉包/函數值僅在當前線程所運行的那個事務內可見。
下麵就是使用了Akka事務的Scala版本的EnergySource實現代碼:
01 |
class EnergySource private () {
|
02 |
private val MAXLEVEL = 100 L
|
03 |
val level = Ref(MAXLEVEL)
|
04 |
val usageCount = Ref( 0 L)
|
05 |
val keepRunning = Ref( true )
|
06 |
private def init() = {
|
07 |
EnergySource.replenishTimer.schedule( new Runnable() {
|
10 |
if (keepRunning.get) EnergySource.replenishTimer.schedule(
|
11 |
this , 1 , TimeUnit.SECONDS)
|
13 |
}, 1 , TimeUnit.SECONDS)
|
15 |
def stopEnergySource() = keepRunning.swap( false )
|
16 |
def getUnitsAvailable() = level.get
|
17 |
def getUsageCount() = usageCount.get
|
18 |
def useEnergy(units : Long) = {
|
20 |
val currentLevel = level.get
|
21 |
if (units > 0 && currentLevel > = units) {
|
22 |
level.swap(currentLevel - units)
|
23 |
usageCount.swap(usageCount.get + 1 )
|
28 |
private def replenish() =
|
29 |
atomic { if (level.get < MAXLEVEL) level.swap(level.get + 1 ) }
|
32 |
val replenishTimer = Executors.newScheduledThreadPool( 10 )
|
34 |
val energySource = new EnergySource
|
作為一個完全的麵向對象語言,Scala認為靜態方法是不適合放在類的定義中的,所以工廠方法create()就被移到其伴生對象裏麵去了。餘下的
代碼和Java版本非常相近,隻是較之更為簡潔。同時,由於使用了優雅的atomic()函數,我們就可以拋開Atomic類和execute()函數調
用了。
Scala版本的EnergySource的測試用例如下所示。在並發和線程控製的實現方麵,我們既可以像Java版本那樣采用JDK的ExecutorService來管理線程,也可以使用Scala的角色(actor)[1] 來為每個並發任務分配執行線程。這裏我們將采用第二種方式。當任務完成之後,每個任務都會給調用者返回一個響應,而調用者則需要等待所有任務結束之後才能繼續執行。
01 |
object UseEnergySource {
|
02 |
val energySource = EnergySource.create()
|
03 |
def main(args : Array[String]) {
|
04 |
println( "Energy level at start: " + energySource.getUnitsAvailable())
|
06 |
for (i <- 1 to 10 ) actor {
|
07 |
for (j <- 1 to 7 ) energySource.useEnergy( 1 )
|
10 |
for (i <- 1 to 10 ) { receiveWithin( 1000 ) { case message = > } }
|
11 |
println( "Energy level at end: " + energySource.getUnitsAvailable())
|
12 |
println( "Usage: " + energySource.getUsageCount())
|
13 |
energySource.stopEnergySource()
|
我們可以采用如下命令來引入Akka相關的Jar並編譯運行上述代碼,其中環境變量AKKA_JARS與我們在Java示例中的定義相同:
scalac -classpath $AKKA_JARS *.scala
java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseEnergySource
Scala版本代碼的輸出結果與我們在Java版本中所看到的沒什麼兩樣,並同樣依賴於電量恢複的節奏,即可能最終剩餘電量是31而不是30。
Energy level at start: 100
Energy level at end: 30
Usage: 70
[1]這裏提到Scala的角色(actor)僅僅是為了說明有這種方法可供使用。後麵我們還將會學習如何使用功能更為強大的Akka actor。
文章轉自 並發編程網-ifeve.com
最後更新:2017-05-22 16:37:31