421
阿裏雲
技術社區[雲棲]
AKKA文檔(java版)—容錯
正如角色係統這一章中解釋的一樣,每一個角色都是它孩子的監管者,並且像這樣的角色都會定義錯誤處理監管策略。這個策略在成為角色係統結構的一個完整部分之後是不能被改變的。
錯誤處理實踐
首先,讓我們看一個處理數據存儲錯誤的例子,它是實踐應用中一個典型的錯誤根源。當然它基於真實應用,這個應用的數據存儲有可能是無效的,但我們在這個例子中會用一個最有效的重連方法來實現。
讀下麵的源代碼。內嵌的注釋解釋了錯誤處理的不同塊和為什麼要添加它們。強烈的建議去運行這個例子,這樣才能更簡單的去跟蹤這個日誌輸出,來了解運行的時候發生了什麼。
容錯例子的圖解(Diagrams of the Fault Tolerance Sample)
容錯例子的全部源代碼(Full Source Code of the Fault Tolerance Sample)
創建一個監管策略
下麵的章節解釋了錯誤處理機製和更深入的選擇。
根據示範的目的,讓我們來考慮如下策略:
01 |
private static SupervisorStrategy strategy =
|
02 |
new OneForOneStrategy( 10 , Duration.create( "1 minute" ),
|
03 |
new Function<Throwable, Directive>() {
|
05 |
public Directive apply(Throwable t) {
|
06 |
if (t instanceof ArithmeticException) {
|
08 |
} else if (t instanceof NullPointerException) {
|
10 |
} else if (t instanceof IllegalArgumentException) {
|
19 |
public SupervisorStrategy supervisorStrategy() {
|
我選擇了一些大家熟知的異常類型,是為了展示在監管和監視章節中描述的錯誤處理指令的應用。首先,它是一個一對一的策略,意味著每一個孩子都是分開處理的(多對一的策略工作非常類似,唯一的不同就是任何決策都會應用到監管者的所有孩子,不僅僅是發生錯誤的那個)。在重啟頻率上會有一些限製,最大是每分鍾重啟10次。-1和Duration.Inf()意味著限製沒有應用,拋開這個可能性,去指定一個絕對的上限或者去讓這個重啟的工作沒有上限。當超出這個限製,孩子角色就被停止。
注意:如果策略在監管角色(而不是一個單獨的類)中描述了,它的決策者可以在線程安全的形勢下訪問角色的所有內部狀態,包括獲得當前發生錯誤的孩子的引用(例如錯誤消息的getSender)。
默認監管策略
如果定義的策略沒有覆蓋拋出的異常,Escalate會被使用。當沒有為一個角色定義監管策略,如下的異常會按照默認來處理:
1. ActorInitializationException會停止發生錯誤的子角色
2. ActorKilledException會停止發生錯誤的子角色
3. Exception會重啟發生錯誤的子角色
4. 別的拋出類型會升級到父角色
如果異常升級到根監管者,會按上述的默認策略處理。
停止監管策略
跟Erlang方式類似的策略是當它們失敗的時候隻停止子角色,以及當DeathWatch通知丟失的子角色的時候會對監管者采取正確的動作。
記錄角色失敗的信息
默認SupervisorStrategy會記錄失敗信息除非它們被向上升級。建議在更高層次的結構中處理上升的錯誤,並潛在的記錄下來。
在初始化的時候你可以通過設置SupervisorStrategy的loggingEnabled為false來去掉默認的日誌。可以在Decider裏定製日誌。注意如getSender一樣,當SupervisorStrategy在監管角色中描述,當前失敗的子角色引用是有效的。
你可以通重寫logFailure方法在你自己的SupervisorStrategy實現中定製化日誌。
最高層次角色的監管
最高層次角色意味著它們是通過system.actorOf()創建的,並且它們是User Guardian的孩子。在這種情況下沒有特定的規則,守護者僅僅應用配置策略。
測試應用
下麵章節展示了實踐中不同指令的效果,wherefor測試啟動是需要的。首先,我們需要一個匹配的監管者。
01 |
public class Supervisor extends UntypedActor {
|
03 |
private static SupervisorStrategy strategy =
|
04 |
new OneForOneStrategy( 10 , Duration.create( "1 minute" ),
|
05 |
new Function<Throwable, Directive>() {
|
07 |
public Directive apply(Throwable t) {
|
08 |
if (t instanceof ArithmeticException) {
|
10 |
} else if (t instanceof NullPointerException) {
|
12 |
} else if (t instanceof IllegalArgumentException) {
|
21 |
public SupervisorStrategy supervisorStrategy() {
|
25 |
public void onReceive(Object o) {
|
26 |
if (o instanceof Props) {
|
27 |
getSender().tell(getContext().actorOf((Props) o), getSelf()); |
這個監管者會被用於創建子角色,我們可以實驗一下:
01 |
public class Child extends UntypedActor {
|
04 |
public void onReceive(Object o) throws Exception {
|
05 |
if (o instanceof Exception) {
|
07 |
} else if (o instanceof Integer) {
|
09 |
} else if (o.equals( "get" )) {
|
10 |
getSender().tell(state, getSelf()); |
測試使用Testing Actor Systems裏介紹的實用工具會比較簡單,TestProbe提供一個Actor Ref用於接收和檢查消息回複。
01 |
import akka.actor.ActorRef;
|
02 |
import akka.actor.ActorSystem;
|
03 |
import akka.actor.SupervisorStrategy;
|
04 |
import static akka.actor.SupervisorStrategy.resume;
|
05 |
import static akka.actor.SupervisorStrategy.restart;
|
06 |
import static akka.actor.SupervisorStrategy.stop;
|
07 |
import static akka.actor.SupervisorStrategy.escalate;
|
08 |
import akka.actor.SupervisorStrategy.Directive;
|
09 |
import akka.actor.OneForOneStrategy;
|
10 |
import akka.actor.Props;
|
11 |
import akka.actor.Terminated;
|
12 |
import akka.actor.UntypedActor;
|
13 |
import scala.collection.immutable.Seq;
|
14 |
import scala.concurrent.Await;
|
15 |
import static akka.pattern.Patterns.ask;
|
16 |
import scala.concurrent.duration.Duration;
|
17 |
import akka.testkit.AkkaSpec;
|
18 |
import akka.testkit.TestProbe;
|
20 |
public class FaultHandlingTest {
|
21 |
static ActorSystem system;
|
22 |
Duration timeout = Duration.create( 5 , SECONDS);
|
25 |
public static void start() {
|
26 |
system = ActorSystem.create( "test" , AkkaSpec.testConf());
|
30 |
public static void cleanup() {
|
31 |
JavaTestKit.shutdownActorSystem(system); |
36 |
public void mustEmploySupervisorStrategy() throws Exception {
|
讓我們創建角色
1 |
Props superprops = Props.create(Supervisor. class );
|
2 |
ActorRef supervisor = system.actorOf(superprops, "supervisor" );
|
3 |
ActorRef child = (ActorRef) Await.result(ask(supervisor, |
4 |
Props.create(Child. class ), 5000 ), timeout);
|
第一個測試會演示Resume指令,所以我們嚐試通過把角色的狀態設置成非初始化狀態並讓它失敗:
1 |
child.tell( 42 , ActorRef.noSender());
|
2 |
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 42 );
|
3 |
child.tell( new ArithmeticException(), ActorRef.noSender());
|
4 |
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 42 );
|
你可以看到值42讓錯誤處理指令存活下來。現在,如果我們把錯誤改成一個更嚴重的NullPointerException異常,則將不會是這種情況:
1 |
child.tell( new NullPointerException(), ActorRef.noSender());
|
2 |
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 );
|
發生IllegalArgumentException致命異常的情況下,最終會導致子角色會被監管者中斷:
1 |
final TestProbe probe = new TestProbe(system);
|
3 |
child.tell( new IllegalArgumentException(), ActorRef.noSender());
|
4 |
probe.expectMsgClass(Terminated. class );
|
到目前為止,監管者還沒有完全受到子角色失敗的影響,因為指令集會處理它。萬一拋出一個異常,監管者會向上拋出錯誤。
1 |
child = (ActorRef) Await.result(ask(supervisor, |
2 |
Props.create(Child. class ), 5000 ), timeout);
|
4 |
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 );
|
5 |
child.tell( new Exception(), ActorRef.noSender());
|
6 |
probe.expectMsgClass(Terminated. class );
|
監管者自己是由ActorSystem提供的最高等級的角色監管的,在所有的異常(注意這兩個異常
ActorInitializationException和ActorKilledException)情況下,默認策略是重新啟動。一旦默認的指令重啟去殺死所有的孩子,我們期望我們的窮孩子不要幸存這個錯誤。
這不是所希望的(這依賴於用例),我們需要去用一個不同的監管者重寫這個行為。
01 |
public class Supervisor2 extends UntypedActor {
|
03 |
private static SupervisorStrategy strategy = new OneForOneStrategy( 10 ,
|
04 |
Duration.create( "1 minute" ),
|
05 |
new Function<Throwable, Directive>() {
|
07 |
public Directive apply(Throwable t) {
|
08 |
if (t instanceof ArithmeticException) {
|
10 |
} else if (t instanceof NullPointerException) {
|
12 |
} else if (t instanceof IllegalArgumentException) {
|
21 |
public SupervisorStrategy supervisorStrategy() {
|
25 |
public void onReceive(Object o) {
|
26 |
if (o instanceof Props) {
|
27 |
getSender().tell(getContext().actorOf((Props) o), getSelf()); |
34 |
public void preRestart(Throwable cause, Option<Object> msg) {
|
35 |
// do not kill all children, which is the default here |
通過父親,孩子角色幸存向上升級重啟,如最後一段測試代碼所演示的:
1 |
superprops = Props.create(Supervisor2. class );
|
2 |
supervisor = system.actorOf(superprops); |
3 |
child = (ActorRef) Await.result(ask(supervisor, |
4 |
Props.create(Child. class ), 5000 ), timeout);
|
5 |
child.tell( 23 , ActorRef.noSender());
|
6 |
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 23 );
|
7 |
child.tell( new Exception(), ActorRef.noSender());
|
8 |
assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 );
|
最後更新:2017-05-23 10:02:47