閱讀375 返回首頁    go 阿裏雲 go 技術社區[雲棲]


AKKA文檔(java版)—類型化角色

3.2 類型化角色

Akka的類型化角色是活動對象(Active Object)模式的實現。Smalltalk誕生的時候,默認的方法調用由異步派發代替同步操作。
類型化角色由2部分組成,包括一個公共的接口和實現,如果你有“企業級”Java的開發經驗,這對你來說會非常熟悉。與普通的角色一樣,你有一個外部的API(公共接口實例),將異步的方法調用委托給實現類的一個私有實例。

類型化角色對比角色的優勢是你可以有一個靜態的約定,而不需要去定義你自己的消息,不好的一麵就是它會限製你能做什麼和不能做什麼,比如你不能使用become/unbecome。
類型化角色是利用JDK Proxies 來實現的,它提供一個非常簡單的API去調用攔截方法。
注意
正如普通的非類型化角色一樣,類型化角色每次處理一次調用。
3.2.1 什麼時候使用類型化角色
類型化角色是角色係統和非角色代碼之間的美好橋梁,因為它們允許你在外部編寫正常的麵向對象代碼。把它們想象成門:它們實際上是公共部分和私有部分之間的接口,但你並不想你的房子有很多的門,不是嗎?你可以通過this blog post查看更詳細的討論。
更多的背景:TypedActor很容易作為RPC被濫用,因此TypedActor並不是我們一開始想象中的那樣,能夠更加容易的去正確編寫高可擴展的並發軟件。我們要在合適的地方使用它們。

3.2.2 工具箱
在創建第一個類型化角色之前,我們先了解一下這個工具,掌握它的功能,它位於akka.actor.TypedActor。

01 //返回類型化角色的表達式
02 TypedActorExtension extension = TypedActor.get(system); //係統是一個ActorSystem對象</p>
03 //返回引用是否是一個類型化角色代理
04 TypedActor.get(system).isTypedActor(someReference);
05  
06 //返回一個外部類型化角色代理的AKKA角色
07 TypedActor.get(system).getActorRefFor(someReference);
08  
09 //返回當前ActorContext
10 //方法僅在一個TypedActor的方法實現中有效
11 ActorContext context = TypedActor.context();
12  
13 //返回當前類型化角色的外部代理
14 //方法隻在TypedActor的方法實現中有效
15 Squarer sq = TypedActor.<Squarer>self();
16  
17 //返回一個類型化角色的上下文實例
18 //這意味著如果你使用它創建了其它的類型化角色實例
19 //它們將是當前這個類型化角色的子角色
20 TypedActor.get(TypedActor.context());

警告

        類型化角色和akka角色一樣不暴露this引用,這一點很重要。你應該通過外部代理引用,它可以通過TypedActor.self來獲得,這是你的對外身份標識,就像akka角色的對外身份標識是ActorRef一樣。

3.2.3 創建類型化角色

創建類型化角色需要有一個以上的接口和一個實現接口的類。

假設入口如下所示:

001 import akka.actor.TypedActor;
002 import akka.actor.*;
003 import akka.japi.*;
004 import akka.dispatch.Futures;</p>
005 import scala.concurrent.Await;
006 import scala.concurrent.Future;
007 import scala.concurrent.duration.Duration;
008 import java.util.concurrent.TimeUnit;
009  
010 import java.util.List;
011 import java.util.ArrayList;
012 import java.util.Random;
013 import akka.routing.RoundRobinGroup;
014 public class TypedActorDocTest {
015     Object someReference = null;
016     ActorSystem system = null;
017  
018     static public interface Squarer {
019         void squareDontCare(int i); //fire-forget(審校者注:這個詞怎麼翻譯?)
020         Future<Integer> square(int i); //非阻塞send-request-reply
021         Option<Integer> squareNowPlease(int i);//阻塞send-request-reply
022         int squareNow(int i); //阻塞send-request-reply
023     }
024  
025     static class SquarerImpl implements Squarer {
026         private String name;
027         public SquarerImpl() {
028             this.name = "default";
029         }
030  
031         public SquarerImpl(String name) {
032             this.name = name;
033         }
034  
035         public void squareDontCare(int i) {
036             int sq = i * i; //Nobody cares <img src="https://ifeve.com/wp-includes/images/smilies/frownie.png" alt=":(" >
037         }
038  
039         public Future<Integer> square(int i) {
040             return Futures.successful(i * i);
041         }
042  
043         public Option<Integer> squareNowPlease(int i) {
044             return Option.some(i * i);
045         }
046  
047         public int squareNow(int i) {
048             return i * i;
049         }
050     }
051  
052     @Test public void mustGetTheTypedActorExtension() {
053         try {
054             //返回類型化角色的的表達式
055             TypedActorExtension extension = TypedActor.get(system); //係統是一個ActorSystem實例
056  
057             //返回引用是否是一個類型化角色代理
058             TypedActor.get(system).isTypedActor(someReference);
059  
060             //返回類型化角色代理的AKKA角色
061             TypedActor.get(system).getActorRefFor(someReference);
062  
063             //返回當前ActorContext
064             // 方法隻在TypedActor方法實現內部有效
065             ActorContext context = TypedActor.context();
066  
067             //返回當前類型化角色的外部代理
068             // 方法隻在TypedActor方法實現內部有效</pre>
069             Squarer sq = TypedActor.<Squarer>self();
070  
071             //返回類型化角色的上下文實例
072             //這意味著如果你用它創建了其它類型化角色
073             //它們就是當前類型化角色的子角色
074             TypedActor.get(TypedActor.context());
075         catch (Exception e) {
076             //dun care
077         }
078      }
079      @Test public void createATypedActor() {
080          try {
081              Squarer mySquarer = TypedActor.get(system).typedActorOf(
082                  new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));
083              Squarer otherSquarer = TypedActor.get(system).typedActorOf(
084                  new TypedProps<SquarerImpl>(Squarer.classnew Creator<SquarerImpl>() {
085                      public SquarerImpl create() { return new SquarerImpl("foo"); }
086              }),"name");
087  
088              mySquarer.squareDontCare(10);
089              Future<Integer> fSquare = mySquarer.square(10); //A Future[Int]
090              Option<Integer> oSquare = mySquarer.squareNowPlease(10); //Option[Int]
091              int iSquare = mySquarer.squareNow(10); //Int
092  
093              assertEquals(100, Await.result(fSquare, Duration.create(3, TimeUnit.SECONDS)).intValue());
094              assertEquals(100, oSquare.get().intValue());
095              assertEquals(100, iSquare);
096  
097              TypedActor.get(system).stop(mySquarer);
098              TypedActor.get(system).poisonPill(otherSquarer);
099         catch(Exception e) {
100              //忽略
101         }
102     }
103  
104     @Test public void createHierarchies() {
105         try {
106             Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
107                 new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)
108             );
109             //Use "childSquarer" as a Squarer
110         catch (Exception e) {
111             //dun care
112         }
113     }
114  
115     @Test public void proxyAnyActorRef() {
116         try {
117             final ActorRef actorRefToRemoteActor = system.deadLetters();
118             Squarer typedActor = TypedActor.get(system).typedActorOf(
119             new TypedProps<Squarer>(Squarer.class),actorRefToRemoteActor);
120             //Use "typedActor" as a FooBar
121         catch (Exception e) {
122             //dun care
123         }
124     }
125  
126     interface HasName {
127         String name();
128     }
129  
130    class Named implements HasName {
131         private int id = new Random().nextInt(1024);
132         @Override public String name() { return "name-" + id; }
133     }
134  
135     @Test public void typedRouterPattern() {
136        try {
137            // prepare routees
138            TypedActorExtension typed = TypedActor.get(system);
139            Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
140            Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));
141  
142            List<Named> routees = new ArrayList<Named>();
143            routees.add(named1);
144            routees.add(named2);
145  
146            List<String> routeePaths = new ArrayList<String>();
147            routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
148            routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
149  
150            // prepare untyped router
151            ActorRef router = system.actorOf(new RoundRobinGroup(routeePaths).props(), "router");
152  
153           //準備類型化代理,向“router”轉發方法調用消息
154           Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);
155  
156            System.out.println("actor was: " + typedRouter.name()); // name-243
157            System.out.println("actor was: " + typedRouter.name()); // name-614
158            System.out.println("actor was: " + typedRouter.name()); // name-243
159            System.out.println("actor was: " + typedRouter.name()); // name-614
160  
161            typed.poisonPill(named1);
162            typed.poisonPill(named2);
163            typed.poisonPill(typedRouter);
164        catch (Exception e) {
165            //dun care
166        }
167     }
168 }

接口的例子:

1     public interface Squarer {
2         //類型化的角色接口方法 ...
3     }

接口的實現類:

01 class SquarerImpl implements Squarer {
02     private String name;
03     public SquarerImpl() {
04         this.name = "default";
05     }
06  
07     public SquarerImpl(String name) {
08         this.name = name;
09     }
10  
11     //類型化的角色方法實現 ...
12 }

創建Squarer的類型化角色最簡單的方式如下:

1 Squarer mySquarer = TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));

      第一種類型是代理類型,第二種是代理類型的實現。如果你需要去調用一個特殊的構造器,你可以這樣做:

1 Squarer otherSquarer = TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class,new Creator<SquarerImpl>() {
2     public SquarerImpl create() { return new SquarerImpl("foo"); }
3 }),"name");

 既然提供一個Props,你可以指定使用哪一個調度程序,應該使用缺省的timeout或者別的。目前,Squarer沒有定義任何方法,我們可以添加如下這些方法。

1 public interface Squarer {
2     void squareDontCare(int i); //fire-forget
3     Future<Integer> square(int i); //non-blocking send-request-reply
4     Option<Integer> squareNowPlease(int i);//blocking send-request-reply
5     int squareNow(int i); //blocking send-request-reply
6 }

那好,現在我們可以調用這些方法了,不過他們需要在SquarerImpl中實現。

01 class SquarerImpl implements Squarer {
02     private String name;
03     public SquarerImpl() {
04         this.name = "default";
05     }
06  
07     public SquarerImpl(String name) {
08         this.name = name;
09     }
10  
11     public void squareDontCare(int i) {
12         int sq = i * i; //Nobody cares <img src="https://ifeve.com/wp-includes/images/smilies/frownie.png" alt=":(" >
13     }
14  
15     public Future<Integer> square(int i) {
16         return Futures.successful(i * i);
17     }
18  
19     public Option<Integer> squareNowPlease(int i) {
20         return Option.some(i * i);
21     }
22  
23     public int squareNow(int i) {
24        return i * i;
25     }
26 }

很好,現在我們已經有一個接口和它的實現類,並且知道怎麼去創建一個類型化角色了,讓我們了解下這些方法。

3.2.4 方法調度語義

         方法返回:

        1. void會被fire-and-forget語義調度,和ActorRef.tell完全一樣

        2. scala.concurrent.Future<?>會使用send-request-reply語義,和ActorRef.ask完全一樣。

        3. akka.japi.Option<?>會使用send-request-reply語義,但會堵塞等待一個應答,並且如果在timeout內沒有回複,就會返回akka.japi.Option.None,或者,相反的返回akka.japi.Option.Some<?>。此調用過程中被拋出的任何異常都將被重新拋出。

        4. 任何別的類型值會使用send-request-reply語義,但會阻塞等待一個回答,如果在一個timeout內拋出異常或者在調用過程中出現重新拋異常的情況,就會拋出java.util.concurrent.TimeoutException。注意,基於Java異常和反射機製,一個TimeoutException會包裝在一個java.lang.reflect.UndeclaredThrowableException裏,除非接口方法明確的描述TimeoutException作為一個受檢異常拋出。

3.2.5 消息和不可變性

         雖然Akka不能強製轉換類型化角色方法的參數為不可變的,但是我們強烈建議把參數設置為不可變的。

3.2.5.1 單向(One-way)消息發送

1 mySquarer.squareDontCare(10);

就像上麵這麼簡單,方法會在另一個線程裏異步的執行。

3.2.5.2 雙向(Request-reply)消息發送

1 Option<Integer> oSquare = mySquare.squareNowPlease(10);//Option[Int]

如果需要,阻塞的時長可以配置類型化角色的Props的timeout。如果超時,它會返回None。

1 int iSquare = mySquarer.squareNow(10);//Int

如果需要,阻塞的時長可以配置類型化角色的Props的timeout。如果超時,它會拋出一個java.util.concurrent.TimeoutException。這裏需要注意一下,通過Java的反射機製,這樣一個TimeoutException會被包裝在一個java.lang.reflect.UndeclaredThrowableException中,因為接口方法沒有明確描述TimeoutException作為一個受檢異常拋出。為了直接得到TimeoutException,可以在接口方法中添加throws java.util.concurrent.TimeoutException。

3.2.5.3 (Request-reply-with-future) 消息發送

1 Future<Integer> fSquare = mySquarer.square(10);//一個Future對象[Int]

這個調用是異步的,並且future的返回可以用於異步成分。

 

3.2.6 停止類型化角色

一旦Akka的類型化角色被Akka角色阻塞,當不再需要它們,必須被停掉。

1 TypedActor.get(system).stop(mySquarer);
2 </span></span>

這個異步的方法會盡快的停掉類型化角色關聯的代理。

1 TypedActor.get(system).poisonPill(otherSquarer);

這個異步的方法會在所有調用都完成之後停掉類型化角色關聯的代理。

 

3.2.7 類型化角色層次結構

既然你可以通過傳遞一個ActorContext來獲得一個上下文的類型化角色,你可以通過在它上麵調用typeActorOf來創建子類型化角色。

1 Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
2     new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)
3 );
4 //Use "childSquarer" as a Squarer

你可以通過常規的Akka角色,把UntypedActorContext作為TypedActor.get的輸入參數創建一個子類型化角色。

 

3.2.8 監管者策略

通過你的類型化角色實現類實現TypedActor.supervisor。你可以定義策略去監管子角色,就像監管與監控(Supervision and Monitoring) and 容錯(Fault Tolerance)中描述的一樣。

 

3.2.9 接收任意的消息

如果你的TypedActor的實現類繼承akka.actor.TypedActor.Receiver,所有非方法調用的消息都會傳遞到onReceive方法。

這允許你處理DeathWatch的Terminated消息和別的類型的消息,例如當與非類型化角色進行交互的場景。

 

3.2.10 生命周期回調

通過你的類型化角色實現類實現如下任何一個或所有的方法:

  • TypedActor.PreStart
  • TypedActor.PostStop
  • TypedActor.PreRestart
  • TypedActor.PostRestart

你可以用鉤子方法連接到你的類型化角色的生命周期。

 

3.2.11 代理

你可以使用帶TypedProps和Actor引用參數的 typedActorOf以代理的方式將ActorRef引用轉換成類型化角色。如果你想和其他機器上的TypedActor進行遠程交互,隻要把ActorRef傳給typedActorOf即可。

 

3.2.12 查找和遠程處理

既然TypedActor是基於Akka Actor的,你可以用typedActorOf去代理有可能在遠程節點上的ActorRef。

1 Squarer typedActor = TypedActor.get(system).typedActorOf(
2     new TypedProps<Squarer>(Squarer.class),actorRefToRemoteActor);
3 //Use "typedActor" as a FooBar

3.2.13 類型化路由模式

有時候你想在多個角色之間傳遞消息。在Akka中最簡單的實現方法是用一個路由(Router),它可以實現一個特定的路由邏輯,例如最小郵箱(smallest-mailbox)或者一致性哈希(consistent-hashing)等等。

路由沒有直接提供給類型化角色,但可以通過利用一個非類型化路由和類型化代理來實現它。為了展示這一點讓我們創建一些類型化角色並給他們指派隨機的id,然後我們會看到路由把消息發送給了不同的角色:

01 @Test public void typedRouterPattern() {
02     try {
03         // prepare routees
04         TypedActorExtension typed = TypedActor.get(system);
05  
06         Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
07         Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));
08  
09         List<Named> routees = new ArrayList<Named>();
10         routees.add(named1);
11         routees.add(named2);
12  
13         List<String> routeePaths = new ArrayList<String>();
14         routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
15         routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
16  
17         // 準備類型化路由
18         ActorRef router = system.actorOf(new RoundRobinGroup(routeePaths).props(), "router");
19  
20         // 準備類型化代理,向路由轉發MethodCall消息
21         Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);
22  
23         System.out.println("actor was: " + typedRouter.name()); // name-243
24         System.out.println("actor was: " + typedRouter.name()); // name-614
25         System.out.println("actor was: " + typedRouter.name()); // name-243
26         System.out.println("actor was: " + typedRouter.name()); // name-614
27  
28         typed.poisonPill(named1);
29         typed.poisonPill(named2);
30         typed.poisonPill(typedRouter);
31  
32     catch (Exception e) {
33         //dun care
34     }
35 }

為了在這樣的角色實例中間輪詢,你可以創建一個簡單的非類型化路由,並用一個TypedActor實現這個路由的外觀模式,就像下麵的例子展示的。這樣會起作用是因為類型化角色利用和普通角色相同的通訊機製,以及它們的方法調用被轉化為MethodCall消息。

01 // prepare routees
02 TypedActorExtension typed = TypedActor.get(system);
03  
04 Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
05  
06 Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));
07  
08 List<Named> routees = new ArrayList<Named>();
09 routees.add(named1);
10 routees.add(named2);
11  
12 List<String> routeePaths = new ArrayList<String>();
13 routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
最後更新:2017-05-23 16:02:29

  上一篇:go  Bash漏洞那些事兒
  下一篇:go  NVIDIA發布首個基於AI的癌症分布式學習環境的框架——CANDLE


14 routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());