375
阿裏雲
技術社區[雲棲]
AKKA文檔(java版)—類型化角色
3.2 類型化角色
Akka的類型化角色是活動對象(Active Object)模式的實現。Smalltalk誕生的時候,默認的方法調用由異步派發代替同步操作。
類型化角色由2部分組成,包括一個公共的接口和實現,如果你有“企業級”Java的開發經驗,這對你來說會非常熟悉。與普通的角色一樣,你有一個外部的API(公共接口實例),將異步的方法調用委托給實現類的一個私有實例。
類型化角色對比角色的優勢是你可以有一個靜態的約定,而不需要去定義你自己的消息,不好的一麵就是它會限製你能做什麼和不能做什麼,比如你不能使用become/unbecome。
類型化角色是利用JDK Proxies 來實現的,它提供一個非常簡單的API去調用攔截方法。
注意
正如普通的非類型化角色一樣,類型化角色每次處理一次調用。
3.2.1 什麼時候使用類型化角色
類型化角色是角色係統和非角色代碼之間的美好橋梁,因為它們允許你在外部編寫正常的麵向對象代碼。把它們想象成門:它們實際上是公共部分和私有部分之間的接口,但你並不想你的房子有很多的門,不是嗎?你可以通過this blog post查看更詳細的討論。
更多的背景:TypedActor很容易作為RPC被濫用,因此TypedActor並不是我們一開始想象中的那樣,能夠更加容易的去正確編寫高可擴展的並發軟件。我們要在合適的地方使用它們。
3.2.2 工具箱
在創建第一個類型化角色之前,我們先了解一下這個工具,掌握它的功能,它位於akka.actor.TypedActor。
02 |
TypedActorExtension extension = TypedActor.get(system); //係統是一個ActorSystem對象</p>
|
04 |
TypedActor.get(system).isTypedActor(someReference); |
06 |
//返回一個外部類型化角色代理的AKKA角色 |
07 |
TypedActor.get(system).getActorRefFor(someReference); |
10 |
//方法僅在一個TypedActor的方法實現中有效 |
11 |
ActorContext context = TypedActor.context(); |
14 |
//方法隻在TypedActor的方法實現中有效 |
15 |
Squarer sq = TypedActor.<Squarer>self(); |
18 |
//這意味著如果你使用它創建了其它的類型化角色實例 |
20 |
TypedActor.get(TypedActor.context()); |
警告
類型化角色和akka角色一樣不暴露this引用,這一點很重要。你應該通過外部代理引用,它可以通過TypedActor.self來獲得,這是你的對外身份標識,就像akka角色的對外身份標識是ActorRef一樣。
3.2.3 創建類型化角色
創建類型化角色需要有一個以上的接口和一個實現接口的類。
假設入口如下所示:
001 |
import akka.actor.TypedActor;
|
004 |
import akka.dispatch.Futures;</p>
|
005 |
import scala.concurrent.Await;
|
006 |
import scala.concurrent.Future;
|
007 |
import scala.concurrent.duration.Duration;
|
008 |
import java.util.concurrent.TimeUnit;
|
010 |
import java.util.List;
|
011 |
import java.util.ArrayList;
|
012 |
import java.util.Random;
|
013 |
import akka.routing.RoundRobinGroup;
|
014 |
public class TypedActorDocTest {
|
015 |
Object someReference = null ;
|
016 |
ActorSystem system = null ;
|
018 |
static public interface Squarer {
|
019 |
void squareDontCare( int i); //fire-forget(審校者注:這個詞怎麼翻譯?)
|
020 |
Future<Integer> square( int i); //非阻塞send-request-reply
|
021 |
Option<Integer> squareNowPlease( int i); //阻塞send-request-reply
|
022 |
int squareNow( int i); //阻塞send-request-reply
|
025 |
static class SquarerImpl implements Squarer {
|
027 |
public SquarerImpl() {
|
028 |
this .name = "default" ;
|
031 |
public SquarerImpl(String name) {
|
035 |
public void squareDontCare( int i) {
|
039 |
public Future<Integer> square( int i) {
|
040 |
return Futures.successful(i * i);
|
043 |
public Option<Integer> squareNowPlease( int i) {
|
044 |
return Option.some(i * i);
|
047 |
public int squareNow( int i) {
|
052 |
@Test public void mustGetTheTypedActorExtension() {
|
055 |
TypedActorExtension extension = TypedActor.get(system); //係統是一個ActorSystem實例
|
058 |
TypedActor.get(system).isTypedActor(someReference);
|
061 |
TypedActor.get(system).getActorRefFor(someReference);
|
064 |
// 方法隻在TypedActor方法實現內部有效
|
065 |
ActorContext context = TypedActor.context();
|
068 |
// 方法隻在TypedActor方法實現內部有效</pre>
|
069 |
Squarer sq = TypedActor.<Squarer>self();
|
072 |
//這意味著如果你用它創建了其它類型化角色
|
074 |
TypedActor.get(TypedActor.context());
|
075 |
} catch (Exception e) {
|
079 |
@Test public void createATypedActor() {
|
081 |
Squarer mySquarer = TypedActor.get(system).typedActorOf(
|
082 |
new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class ));
|
083 |
Squarer otherSquarer = TypedActor.get(system).typedActorOf(
|
084 |
new TypedProps<SquarerImpl>(Squarer. class , new Creator<SquarerImpl>() {
|
085 |
public SquarerImpl create() { return new SquarerImpl( "foo" ); }
|
088 |
mySquarer.squareDontCare( 10 );
|
089 |
Future<Integer> fSquare = mySquarer.square( 10 ); //A Future[Int]
|
090 |
Option<Integer> oSquare = mySquarer.squareNowPlease( 10 ); //Option[Int]
|
091 |
int iSquare = mySquarer.squareNow( 10 ); //Int
|
093 |
assertEquals( 100 , Await.result(fSquare, Duration.create( 3 , TimeUnit.SECONDS)).intValue());
|
094 |
assertEquals( 100 , oSquare.get().intValue());
|
095 |
assertEquals( 100 , iSquare);
|
097 |
TypedActor.get(system).stop(mySquarer);
|
098 |
TypedActor.get(system).poisonPill(otherSquarer);
|
099 |
} catch (Exception e) {
|
104 |
@Test public void createHierarchies() {
|
106 |
Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
|
107 |
new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class )
|
109 |
//Use "childSquarer" as a Squarer
|
110 |
} catch (Exception e) {
|
115 |
@Test public void proxyAnyActorRef() {
|
117 |
final ActorRef actorRefToRemoteActor = system.deadLetters();
|
118 |
Squarer typedActor = TypedActor.get(system).typedActorOf(
|
119 |
new TypedProps<Squarer>(Squarer. class ),actorRefToRemoteActor);
|
120 |
//Use "typedActor" as a FooBar
|
121 |
} catch (Exception e) {
|
130 |
class Named implements HasName {
|
131 |
private int id = new Random().nextInt( 1024 );
|
132 |
@Override public String name() { return "name-" + id; }
|
135 |
@Test public void typedRouterPattern() {
|
138 |
TypedActorExtension typed = TypedActor.get(system);
|
139 |
Named named1 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
140 |
Named named2 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
142 |
List<Named> routees = new ArrayList<Named>();
|
146 |
List<String> routeePaths = new ArrayList<String>();
|
147 |
routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
|
148 |
routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
|
150 |
// prepare untyped router
|
151 |
ActorRef router = system.actorOf( new RoundRobinGroup(routeePaths).props(), "router" );
|
153 |
//準備類型化代理,向“router”轉發方法調用消息
|
154 |
Named typedRouter = typed.typedActorOf( new TypedProps<Named>(Named. class ), router);
|
156 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
157 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
158 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
159 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
161 |
typed.poisonPill(named1);
|
162 |
typed.poisonPill(named2);
|
163 |
typed.poisonPill(typedRouter);
|
164 |
} catch (Exception e) {
|
接口的例子:
1 |
public interface Squarer {
|
接口的實現類:
01 |
class SquarerImpl implements Squarer {
|
03 |
public SquarerImpl() {
|
04 |
this .name = "default" ;
|
07 |
public SquarerImpl(String name) {
|
創建Squarer的類型化角色最簡單的方式如下:
1 |
Squarer mySquarer = TypedActor.get(system).typedActorOf( new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class ));
|
第一種類型是代理類型,第二種是代理類型的實現。如果你需要去調用一個特殊的構造器,你可以這樣做:
1 |
Squarer otherSquarer = TypedActor.get(system).typedActorOf( new TypedProps<SquarerImpl>(Squarer. class , new Creator<SquarerImpl>() {
|
2 |
public SquarerImpl create() { return new SquarerImpl( "foo" ); }
|
既然提供一個Props,你可以指定使用哪一個調度程序,應該使用缺省的timeout或者別的。目前,Squarer沒有定義任何方法,我們可以添加如下這些方法。
1 |
public interface Squarer {
|
2 |
void squareDontCare( int i); //fire-forget
|
3 |
Future<Integer> square( int i); //non-blocking send-request-reply
|
4 |
Option<Integer> squareNowPlease( int i); //blocking send-request-reply
|
5 |
int squareNow( int i); //blocking send-request-reply
|
那好,現在我們可以調用這些方法了,不過他們需要在SquarerImpl中實現。
01 |
class SquarerImpl implements Squarer {
|
03 |
public SquarerImpl() {
|
04 |
this .name = "default" ;
|
07 |
public SquarerImpl(String name) {
|
11 |
public void squareDontCare( int i) {
|
15 |
public Future<Integer> square( int i) {
|
16 |
return Futures.successful(i * i);
|
19 |
public Option<Integer> squareNowPlease( int i) {
|
20 |
return Option.some(i * i);
|
23 |
public int squareNow( int i) {
|
很好,現在我們已經有一個接口和它的實現類,並且知道怎麼去創建一個類型化角色了,讓我們了解下這些方法。
3.2.4 方法調度語義
方法返回:
1. void會被fire-and-forget語義調度,和ActorRef.tell完全一樣
2. scala.concurrent.Future<?>會使用send-request-reply語義,和ActorRef.ask完全一樣。
3. akka.japi.Option<?>會使用send-request-reply語義,但會堵塞等待一個應答,並且如果在timeout內沒有回複,就會返回akka.japi.Option.None,或者,相反的返回akka.japi.Option.Some<?>。此調用過程中被拋出的任何異常都將被重新拋出。
4. 任何別的類型值會使用send-request-reply語義,但會阻塞等待一個回答,如果在一個timeout內拋出異常或者在調用過程中出現重新拋異常的情況,就會拋出java.util.concurrent.TimeoutException。注意,基於Java異常和反射機製,一個TimeoutException會包裝在一個java.lang.reflect.UndeclaredThrowableException裏,除非接口方法明確的描述TimeoutException作為一個受檢異常拋出。
3.2.5 消息和不可變性
雖然Akka不能強製轉換類型化角色方法的參數為不可變的,但是我們強烈建議把參數設置為不可變的。
3.2.5.1 單向(One-way)消息發送
1 |
mySquarer.squareDontCare( 10 );
|
就像上麵這麼簡單,方法會在另一個線程裏異步的執行。
3.2.5.2 雙向(Request-reply)消息發送
1 |
Option<Integer> oSquare = mySquare.squareNowPlease( 10 ); //Option[Int]
|
如果需要,阻塞的時長可以配置類型化角色的Props的timeout。如果超時,它會返回None。
1 |
int iSquare = mySquarer.squareNow( 10 ); //Int
|
如果需要,阻塞的時長可以配置類型化角色的Props的timeout。如果超時,它會拋出一個java.util.concurrent.TimeoutException。這裏需要注意一下,通過Java的反射機製,這樣一個TimeoutException會被包裝在一個java.lang.reflect.UndeclaredThrowableException中,因為接口方法沒有明確描述TimeoutException作為一個受檢異常拋出。為了直接得到TimeoutException,可以在接口方法中添加throws java.util.concurrent.TimeoutException。
3.2.5.3 (Request-reply-with-future) 消息發送
1 |
Future<Integer> fSquare = mySquarer.square( 10 ); //一個Future對象[Int]
|
這個調用是異步的,並且future的返回可以用於異步成分。
3.2.6 停止類型化角色
一旦Akka的類型化角色被Akka角色阻塞,當不再需要它們,必須被停掉。
1 |
TypedActor.get(system).stop(mySquarer); |
這個異步的方法會盡快的停掉類型化角色關聯的代理。
1 |
TypedActor.get(system).poisonPill(otherSquarer); |
這個異步的方法會在所有調用都完成之後停掉類型化角色關聯的代理。
3.2.7 類型化角色層次結構
既然你可以通過傳遞一個ActorContext來獲得一個上下文的類型化角色,你可以通過在它上麵調用typeActorOf來創建子類型化角色。
1 |
Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf( |
2 |
new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class )
|
4 |
//Use "childSquarer" as a Squarer |
你可以通過常規的Akka角色,把UntypedActorContext作為TypedActor.get的輸入參數創建一個子類型化角色。
3.2.8 監管者策略
通過你的類型化角色實現類實現TypedActor.supervisor。你可以定義策略去監管子角色,就像監管與監控(Supervision and Monitoring) and 容錯(Fault Tolerance)中描述的一樣。
3.2.9 接收任意的消息
如果你的TypedActor的實現類繼承akka.actor.TypedActor.Receiver,所有非方法調用的消息都會傳遞到onReceive方法。
這允許你處理DeathWatch的Terminated消息和別的類型的消息,例如當與非類型化角色進行交互的場景。
3.2.10 生命周期回調
通過你的類型化角色實現類實現如下任何一個或所有的方法:
- TypedActor.PreStart
- TypedActor.PostStop
- TypedActor.PreRestart
- TypedActor.PostRestart
你可以用鉤子方法連接到你的類型化角色的生命周期。
3.2.11 代理
你可以使用帶TypedProps和Actor引用參數的 typedActorOf以代理的方式將ActorRef引用轉換成類型化角色。如果你想和其他機器上的TypedActor進行遠程交互,隻要把ActorRef傳給typedActorOf即可。
3.2.12 查找和遠程處理
既然TypedActor是基於Akka Actor的,你可以用typedActorOf去代理有可能在遠程節點上的ActorRef。
1 |
Squarer typedActor = TypedActor.get(system).typedActorOf( |
2 |
new TypedProps<Squarer>(Squarer. class ),actorRefToRemoteActor);
|
3 |
//Use "typedActor" as a FooBar |
3.2.13 類型化路由模式
有時候你想在多個角色之間傳遞消息。在Akka中最簡單的實現方法是用一個路由(Router),它可以實現一個特定的路由邏輯,例如最小郵箱(smallest-mailbox)或者一致性哈希(consistent-hashing)等等。
路由沒有直接提供給類型化角色,但可以通過利用一個非類型化路由和類型化代理來實現它。為了展示這一點讓我們創建一些類型化角色並給他們指派隨機的id,然後我們會看到路由把消息發送給了不同的角色:
01 |
@Test public void typedRouterPattern() {
|
04 |
TypedActorExtension typed = TypedActor.get(system);
|
06 |
Named named1 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
07 |
Named named2 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
09 |
List<Named> routees = new ArrayList<Named>();
|
13 |
List<String> routeePaths = new ArrayList<String>();
|
14 |
routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
|
15 |
routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
|
18 |
ActorRef router = system.actorOf( new RoundRobinGroup(routeePaths).props(), "router" );
|
20 |
// 準備類型化代理,向路由轉發MethodCall消息
|
21 |
Named typedRouter = typed.typedActorOf( new TypedProps<Named>(Named. class ), router);
|
23 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
24 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
25 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
26 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
28 |
typed.poisonPill(named1);
|
29 |
typed.poisonPill(named2);
|
30 |
typed.poisonPill(typedRouter);
|
32 |
} catch (Exception e) {
|
為了在這樣的角色實例中間輪詢,你可以創建一個簡單的非類型化路由,並用一個TypedActor實現這個路由的外觀模式,就像下麵的例子展示的。這樣會起作用是因為類型化角色利用和普通角色相同的通訊機製,以及它們的方法調用被轉化為MethodCall消息。
02 |
TypedActorExtension typed = TypedActor.get(system); |
04 |
Named named1 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
06 |
Named named2 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
08 |
List<Named> routees = new ArrayList<Named>();
|
12 |
List<String> routeePaths = new ArrayList<String>();
|
13 |
routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress()); |
14 |
routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress()); |
最後更新:2017-05-23 16:02:29