閱讀421 返回首頁    go 阿裏雲 go 技術社區[雲棲]


AKKA文檔(java版)—容錯

正如角色係統這一章中解釋的一樣,每一個角色都是它孩子的監管者,並且像這樣的角色都會定義錯誤處理監管策略。這個策略在成為角色係統結構的一個完整部分之後是不能被改變的。

錯誤處理實踐

首先,讓我們看一個處理數據存儲錯誤的例子,它是實踐應用中一個典型的錯誤根源。當然它基於真實應用,這個應用的數據存儲有可能是無效的,但我們在這個例子中會用一個最有效的重連方法來實現。
讀下麵的源代碼。內嵌的注釋解釋了錯誤處理的不同塊和為什麼要添加它們。強烈的建議去運行這個例子,這樣才能更簡單的去跟蹤這個日誌輸出,來了解運行的時候發生了什麼。

容錯例子的圖解(Diagrams of the Fault Tolerance Sample)
容錯例子的全部源代碼(Full Source Code of the Fault Tolerance Sample)

創建一個監管策略

下麵的章節解釋了錯誤處理機製和更深入的選擇。
根據示範的目的,讓我們來考慮如下策略:

01 private static SupervisorStrategy strategy =
02 new OneForOneStrategy(10, Duration.create("1 minute"),
03 new Function<Throwable, Directive>() {
04 @Override
05 public Directive apply(Throwable t) {
06 if (t instanceof ArithmeticException) {
07 return resume();
08 else if (t instanceof NullPointerException) {
09 return restart();
10 else if (t instanceof IllegalArgumentException) {
11 return stop();
12 else {
13 return escalate();
14 }
15 }
16 });
17  
18 @Override
19 public SupervisorStrategy supervisorStrategy() {
20 return strategy;
21 }

我選擇了一些大家熟知的異常類型,是為了展示在監管和監視章節中描述的錯誤處理指令的應用。首先,它是一個一對一的策略,意味著每一個孩子都是分開處理的(多對一的策略工作非常類似,唯一的不同就是任何決策都會應用到監管者的所有孩子,不僅僅是發生錯誤的那個)。在重啟頻率上會有一些限製,最大是每分鍾重啟10次。-1和Duration.Inf()意味著限製沒有應用,拋開這個可能性,去指定一個絕對的上限或者去讓這個重啟的工作沒有上限。當超出這個限製,孩子角色就被停止。

注意:如果策略在監管角色(而不是一個單獨的類)中描述了,它的決策者可以在線程安全的形勢下訪問角色的所有內部狀態,包括獲得當前發生錯誤的孩子的引用(例如錯誤消息的getSender)。

默認監管策略
如果定義的策略沒有覆蓋拋出的異常,Escalate會被使用。當沒有為一個角色定義監管策略,如下的異常會按照默認來處理:
1. ActorInitializationException會停止發生錯誤的子角色
2. ActorKilledException會停止發生錯誤的子角色
3. Exception會重啟發生錯誤的子角色
4. 別的拋出類型會升級到父角色
如果異常升級到根監管者,會按上述的默認策略處理。

停止監管策略

跟Erlang方式類似的策略是當它們失敗的時候隻停止子角色,以及當DeathWatch通知丟失的子角色的時候會對監管者采取正確的動作。

記錄角色失敗的信息

默認SupervisorStrategy會記錄失敗信息除非它們被向上升級。建議在更高層次的結構中處理上升的錯誤,並潛在的記錄下來。
在初始化的時候你可以通過設置SupervisorStrategy的loggingEnabled為false來去掉默認的日誌。可以在Decider裏定製日誌。注意如getSender一樣,當SupervisorStrategy在監管角色中描述,當前失敗的子角色引用是有效的。
你可以通重寫logFailure方法在你自己的SupervisorStrategy實現中定製化日誌。

最高層次角色的監管

最高層次角色意味著它們是通過system.actorOf()創建的,並且它們是User Guardian的孩子。在這種情況下沒有特定的規則,守護者僅僅應用配置策略。

測試應用

下麵章節展示了實踐中不同指令的效果,wherefor測試啟動是需要的。首先,我們需要一個匹配的監管者。

01 public class Supervisor extends UntypedActor {
02  
03 private static SupervisorStrategy strategy =
04 new OneForOneStrategy(10, Duration.create("1 minute"),
05 new Function<Throwable, Directive>() {
06 @Override
07 public Directive apply(Throwable t) {
08 if (t instanceof ArithmeticException) {
09 return resume();
10 else if (t instanceof NullPointerException) {
11 return restart();
12 else if (t instanceof IllegalArgumentException) {
13 return stop();
14 else {
15 return escalate();
16 }
17 }
18 });
19  
20 @Override
21 public SupervisorStrategy supervisorStrategy() {
22 return strategy;
23 }
24  
25 public void onReceive(Object o) {
26 if (o instanceof Props) {
27 getSender().tell(getContext().actorOf((Props) o), getSelf());
28 else {
29 unhandled(o);
30 }
31 }
32 }

這個監管者會被用於創建子角色,我們可以實驗一下:

01 public class Child extends UntypedActor {
02 int state = 0;
03  
04 public void onReceive(Object o) throws Exception {
05 if (o instanceof Exception) {
06 throw (Exception) o;
07 else if (o instanceof Integer) {
08 state = (Integer) o;
09 else if (o.equals("get")) {
10 getSender().tell(state, getSelf());
11 else {
12 unhandled(o);
13 }
14 }
15 }

測試使用Testing Actor Systems裏介紹的實用工具會比較簡單,TestProbe提供一個Actor Ref用於接收和檢查消息回複。

01 import akka.actor.ActorRef;
02 import akka.actor.ActorSystem;
03 import akka.actor.SupervisorStrategy;
04 import static akka.actor.SupervisorStrategy.resume;
05 import static akka.actor.SupervisorStrategy.restart;
06 import static akka.actor.SupervisorStrategy.stop;
07 import static akka.actor.SupervisorStrategy.escalate;
08 import akka.actor.SupervisorStrategy.Directive;
09 import akka.actor.OneForOneStrategy;
10 import akka.actor.Props;
11 import akka.actor.Terminated;
12 import akka.actor.UntypedActor;
13 import scala.collection.immutable.Seq;
14 import scala.concurrent.Await;
15 import static akka.pattern.Patterns.ask;
16 import scala.concurrent.duration.Duration;
17 import akka.testkit.AkkaSpec;
18 import akka.testkit.TestProbe;
19  
20 public class FaultHandlingTest {
21 static ActorSystem system;
22 Duration timeout = Duration.create(5, SECONDS);
23  
24 @BeforeClass
25 public static void start() {
26 system = ActorSystem.create("test", AkkaSpec.testConf());
27 }
28  
29 @AfterClass
30 public static void cleanup() {
31 JavaTestKit.shutdownActorSystem(system);
32 system = null;
33 }
34  
35 @Test
36 public void mustEmploySupervisorStrategy() throws Exception {
37 // code here
38 }
39  
40 }

讓我們創建角色

1 Props superprops = Props.create(Supervisor.class);
2 ActorRef supervisor = system.actorOf(superprops, "supervisor");
3 ActorRef child = (ActorRef) Await.result(ask(supervisor,
4 Props.create(Child.class), 5000), timeout);

第一個測試會演示Resume指令,所以我們嚐試通過把角色的狀態設置成非初始化狀態並讓它失敗:

1 child.tell(42, ActorRef.noSender());
2 assert Await.result(ask(child, "get"5000), timeout).equals(42);
3 child.tell(new ArithmeticException(), ActorRef.noSender());
4 assert Await.result(ask(child, "get"5000), timeout).equals(42);

你可以看到值42讓錯誤處理指令存活下來。現在,如果我們把錯誤改成一個更嚴重的NullPointerException異常,則將不會是這種情況:

1 child.tell(new NullPointerException(), ActorRef.noSender());
2 assert Await.result(ask(child, "get"5000), timeout).equals(0);

發生IllegalArgumentException致命異常的情況下,最終會導致子角色會被監管者中斷:

1 final TestProbe probe = new TestProbe(system);
2 probe.watch(child);
3 child.tell(new IllegalArgumentException(), ActorRef.noSender());
4 probe.expectMsgClass(Terminated.class);

到目前為止,監管者還沒有完全受到子角色失敗的影響,因為指令集會處理它。萬一拋出一個異常,監管者會向上拋出錯誤。

1 child = (ActorRef) Await.result(ask(supervisor,
2 Props.create(Child.class), 5000), timeout);
3 probe.watch(child);
4 assert Await.result(ask(child, "get"5000), timeout).equals(0);
5 child.tell(new Exception(), ActorRef.noSender());
6 probe.expectMsgClass(Terminated.class);

監管者自己是由ActorSystem提供的最高等級的角色監管的,在所有的異常(注意這兩個異常

ActorInitializationException和ActorKilledException)情況下,默認策略是重新啟動。一旦默認的指令重啟去殺死所有的孩子,我們期望我們的窮孩子不要幸存這個錯誤。

這不是所希望的(這依賴於用例),我們需要去用一個不同的監管者重寫這個行為。

01 public class Supervisor2 extends UntypedActor {
02  
03 private static SupervisorStrategy strategy = new OneForOneStrategy(10,
04 Duration.create("1 minute"),
05 new Function<Throwable, Directive>() {
06 @Override
07 public Directive apply(Throwable t) {
08 if (t instanceof ArithmeticException) {
09 return resume();
10 else if (t instanceof NullPointerException) {
11 return restart();
12 else if (t instanceof IllegalArgumentException) {
13 return stop();
14 else {
15 return escalate();
16 }
17 }
18 });
19  
20 @Override
21 public SupervisorStrategy supervisorStrategy() {
22 return strategy;
23 }
24  
25 public void onReceive(Object o) {
26 if (o instanceof Props) {
27 getSender().tell(getContext().actorOf((Props) o), getSelf());
28 else {
29 unhandled(o);
30 }
31 }
32  
33 @Override
34 public void preRestart(Throwable cause, Option<Object> msg) {
35 // do not kill all children, which is the default here
36 }
37 }

通過父親,孩子角色幸存向上升級重啟,如最後一段測試代碼所演示的:

1 superprops = Props.create(Supervisor2.class);
2 supervisor = system.actorOf(superprops);
3 child = (ActorRef) Await.result(ask(supervisor,
4 Props.create(Child.class), 5000), timeout);
5 child.tell(23, ActorRef.noSender());
6 assert Await.result(ask(child, "get"5000), timeout).equals(23);
7 child.tell(new Exception(), ActorRef.noSender());
8 assert Await.result(ask(child, "get"5000), timeout).equals(0);

最後更新:2017-05-23 10:02:47

  上一篇:go  並發網服務器遷移
  下一篇:go  如何使用Disruptor(二)如何從Ringbuffer讀取