127
技術社區[雲棲]
AKKA文檔(java版)—角色(一)
角色
角色模型對編寫並發、分布式係統進行了高度抽象。它減輕了開發者必須對互斥鎖與線程管理的負擔,更容易編寫出正確的並發與並行係統。早在1973 年 Carl Hewitt 發表的論文中定義了角色,但一直流行於Erlang 語言中,隨後被愛立信公司應用於建立高並發、可靠通信係統,取得了巨大成功。
Akka 框架裏麵角色的API 跟Scala 框架裏麵角色相似,後者一些語法曾經模仿Erlang語言。
創建角色
注意:由於Akka強迫父級監管者監督每一個角色和(潛在的子級)監管者,建議你熟悉角色係統、監管、監控,這將可能幫助你閱讀角色參考、路徑和地址。
定義一個角色類
在Java裏麵,角色是通過繼承UntypedActor 類及實現onReceive方法來實現的.這個方法將message作為參數。
這裏有個例子:
01 |
import akka.actor.UntypedActor;
|
02 |
import akka.event.Logging;
|
03 |
import akka.event.LoggingAdapter;
|
04 |
05 |
public class MyUntypedActor extends UntypedActor {
|
06 |
LoggingAdapter log = Logging.getLogger(getContext().system(), this );
|
07 |
08 |
public void onReceive(Object message) throws Exception {
|
09 |
if (message instanceof String) {
|
10 |
log.info( "Received String message: {}" , message);
|
11 |
getSender().tell(message, getSelf());
|
12 |
} else
|
13 |
unhandled(message);
|
14 |
}
|
15 |
} |
Props
Props 是一個配置類,它的作用是對創建角色確認選項。把它作為不可變的、因此可自由共享規則對創建一個角色包括相關部署信息(例如:使用調度,詳見下文)。下麵是如何創建一個Props 實例的一些例子:
1 |
import akka.actor.Props;
|
2 |
import akka.japi.Creator;
|
1 |
static class MyActorC implements Creator<MyActor> {
|
2 |
@Override public MyActor create() {
|
3 |
return new MyActor( "..." );
|
4 |
}
|
5 |
} |
6 |
7 |
Props props1 = Props.create(MyUntypedActor. class );
|
8 |
Props props2 = Props.create(MyActor. class , "..." );
|
9 |
Props props3 = Props.create( new MyActorC());
|
第二行顯示如何傳遞構造參數給Actor去創建。在構建Props對象時,存在匹配的構造是被驗證的,如果發現不存在或者存在多個匹配構造,會導致一個IllegalArgumentEception。
第三行驗證Creator使用。用來驗證Props構造的Creator必須是靜態。類型參數是用來判斷生成角色類的,如果充分擦除,將落回到Actor類,一個參數化工廠例子,可以是:
1 |
static class ParametricCreator<T extends MyActor> implements Creator<T> {
|
2 |
@Override public T create() {
|
3 |
// ... fabricate actor here
|
4 |
}
|
5 |
} |
注意:
由於郵箱要求——如使用雙端隊列為基礎的郵箱使用的隱藏角色——被拾起,在創建之前,角色類型需要已知的,這是Creator類型參數允許的。因此對你用到角色一定盡可能使用特定類型。
建議準則
這是個好的主意在UntypedActor類裏麵提供靜態工廠方法,該方法幫助創建盡可能接近角色定義的合適Props 類。這也允許使用基於Creator方法,該方法靜態驗證所使用的構造函數確實存在,而不是隻在運行時檢查依賴。
01 |
public class DemoActor extends UntypedActor {
|
02 |
03 |
/**
|
04 |
* Create Props for an actor of this type.
|
05 |
* @param magicNumber The magic number to be passed to this actor’s constructor.
|
06 |
* @return a Props for creating this actor, which can then be further configured
|
07 |
* (e.g. calling `.withDispatcher()` on it)
|
08 |
*/
|
09 |
public static Props props( final int magicNumber) {
|
10 |
return Props.create( new Creator<DemoActor>() {
|
11 |
private static final long serialVersionUID = 1L;
|
12 |
13 |
@Override
|
14 |
public DemoActor create() throws Exception {
|
15 |
return new DemoActor(magicNumber);
|
16 |
}
|
17 |
});
|
18 |
}
|
19 |
20 |
final int magicNumber;
|
21 |
22 |
public DemoActor( int magicNumber) {
|
23 |
this .magicNumber = magicNumber;
|
24 |
}
|
25 |
26 |
@Override
|
27 |
public void onReceive(Object msg) {
|
28 |
// some behavior here
|
29 |
}
|
30 |
31 |
} |
32 |
33 |
system.actorOf(DemoActor.props( 42 ), "demo" );
|
用Props創建角色
角色通過傳入Props實例進入actorOf 工廠方法,該工廠方法在ActorSystem 和ActorContext類中提供使用。
1 |
import akka.actor.ActorRef;
|
2 |
import akka.actor.ActorSystem;
|
1 |
// ActorSystem is a heavy object: create only one per application |
2 |
final ActorSystem system = ActorSystem.create( "MySystem" );
|
3 |
final ActorRef myActor = system.actorOf(Props.create(MyUntypedActor. class ),
|
4 |
"myactor" );
|
使用ActorSystem 將創建頂級角色,由角色係統提供守護的角色監管,同時使用一個角色的上下文將創建一個子角色。
1 |
class A extends UntypedActor {
|
2 |
final ActorRef child =
|
3 |
getContext().actorOf(Props.create(MyUntypedActor. class ), "myChild" );
|
4 |
// plus some behavior ...
|
5 |
} |
建議創建一個包含子類、超子類等等的層次結構,使得它適合具有邏輯性故障處理應用程序結構,詳見Actor Systems。
actorOf 方法調用返回ActorRef實例。這是對角色實例處理,並與它進行交互的唯一途徑。該ActorRef是不可變的,並有一個與它代表的一對一關係角色。該ActorRef是可序列化的和具備網絡意識的。這意味著,你可以把它進行序列化,將它通過網絡發送,在遠程主機上使用它,它仍然代表著在原始的節點上相同的角色,橫跨網絡。
名稱參數是可選的,但是你應該給你的角色起個更好名稱,因為這是用在日誌消息裏麵,並確定角色。該名稱不能為空或以$開頭,但它可能包含URL編碼的字符(例如,%20代表空格)。如果給定的名稱已被相同父類中的其他子類使用,那將拋出InvalidActorNameException異常。
角色是自動異步啟動當被創建時候。
依賴注入
如果你的未類型化的角色有一個攜帶參數的構造函數,然後那些需要Prosp的一部分,以及,如上所述。但在某些情況下,必須使用一個工廠方法,例如當實際構造函數參數由一個依賴注入框架決定時。
1 |
import akka.actor.Actor;
|
2 |
import akka.actor.IndirectActorProducer;
|
01 |
class DependencyInjector implements IndirectActorProducer {
|
02 |
final Object applicationContext;
|
03 |
final String beanName;
|
04 |
05 |
public DependencyInjector(Object applicationContext, String beanName) {
|
06 |
this .applicationContext = applicationContext;
|
07 |
this .beanName = beanName;
|
08 |
}
|
09 |
10 |
@Override
|
11 |
public Class<? extends Actor> actorClass() {
|
12 |
return MyActor. class ;
|
13 |
}
|
14 |
15 |
@Override
|
16 |
public MyActor produce() {
|
17 |
MyActor result;
|
18 |
// obtain fresh Actor instance from DI framework ...
|
19 |
return result;
|
20 |
}
|
21 |
} |
22 |
23 |
final ActorRef myActor = getContext().actorOf(
|
24 |
Props.create(DependencyInjector. class , applicationContext, "MyActor" ),
|
25 |
"myactor3" );
|
警告:
你可能有時會傾向於提供一個IndirectActorProducer它總是返回相同的實例,例如:通過使用一個靜態字段。這是不支持的,因為它違背了一個角色重啟含義,這是這裏所描述的含義:什麼重新啟動方式。當使用一個依賴注入框架時,角色Beans 一定不能是單例模式範圍。
依賴注入和依賴注入框架集成技術更深入地介紹了使用Akka與依賴注入指導方針和在類型安全的活化劑方麵的Akka Java Spring 指導。
Inbox
當寫在角色外麵的代碼,應與角色進行溝通,在ask模式可以是一個解決方案(見下文),但有兩個事情不能做:接收多個回複(例如:通過訂閱的ActorRef到通知服務)和監控其他角色的生命周期。為了這些目的這裏有個Inbox 類:
1 |
final Inbox inbox = Inbox.create(system);
|
2 |
inbox.send(target, "hello" );
|
3 |
assert inbox.receive(Duration.create( 1 , TimeUnit.SECONDS)).equals( "world" );
|
send方法包裝一個標準的tell和提供一個內部的角色引用作為發送者。在最後一行將允許該回複被接收。監控一個角色同時也十分簡單。
1 |
final Inbox inbox = Inbox.create(system);
|
2 |
inbox.watch(target); |
3 |
target.tell(PoisonPill.getInstance(), ActorRef.noSender()); |
4 |
assert inbox.receive(Duration.create( 1 , TimeUnit.SECONDS)) instanceof Terminated;
|
UntypedActor 應用程序接口
UntypedActor 類僅僅定義一個抽象方法,就是上麵提到onReceive(Object message)方法,該方法實現了角色行為。如果當前角色行為不匹配一個接收信息,建議調用unhandled 方法,該方法默認將發出一個new akka.actor.UnhandledMessage(message, sender, recipient)在係統角色事件流中(設置配置項akka.actor.debug.unhandled 到on 讓它們轉化為實際調試信息)。另外,它提供:
- getSelf 角色ActorRef的引用
- getSender 最後接收到信息角色發送者角色引用,典型使用方法將在Reply to messages裏麵介紹。
- supervisorStrategy 用戶可重寫定義的策略,用來監管子角色。這種策略通常是角色中聲明,以便在裁決者函數中使用了角色的內部狀態:由於一個消息未能傳達發送到監管者並處理,像其他的消息(盡管是正常的行為之外),在角色中所有的值和變量都是可用的,就像getSender引用(這將由直接的子類報告故障,如果在一個深層次後代類發生原始故障,則仍立即向上一級報告故障)。
-
getContext 對角色與當前信息暴露上下文信息
- 創建一個子角色工廠方法actorOf
- 屬於角色的係統
- 父級監管者
- 監管的子類
- 監控生命周期
- 熱插拔行為棧,在熱插拔將介紹
剩餘的可見的方法是用戶可重寫的生命周期鉤子,將在以下描述:
01 |
public void preStart() {
|
02 |
} |
03 |
04 |
public void preRestart(Throwable reason, scala.Option<Object> message) {
|
05 |
for (ActorRef each : getContext().getChildren()) {
|
06 |
getContext().unwatch(each);
|
07 |
getContext().stop(each);
|
08 |
}
|
09 |
postStop();
|
10 |
} |
11 |
12 |
public void postRestart(Throwable reason) {
|
13 |
preStart();
|
14 |
} |
15 |
16 |
public void postStop() {
|
17 |
} |
上麵顯示實現是默認由UntypedActor 類提供。
角色生命周期
在角色係統中的路徑代表一個“地方”,這可能被一個存活著的的角色占用著。最初,(除了係統初始化角色)的路徑是空的。當actorOf()被調用時,指定一個由通過Props 描述給定的路徑角色的化身。一個角色化身由路徑和一個UID確定。重新啟動僅僅交換Props 定義的Actor 實例,但化身與UID依然是相同的。
當該角色停止時,化身的生命周期也相應結束了。在這一刻時間上相對應的生命周期事件也將被調用和監管角色也被通知終止結束。化身被停止之後,路徑也可以重複被通過actorOf() 方法創建的角色使用。在這種情況下,新的化身的名稱跟與前一個將是相同的而是UIDs將會有所不同。
一個ActorRef 總是代表一個化身(路徑和UID)而不隻是一個給定的路徑。因此,如果一個角色停止,一個新的具有相同名稱創建的舊化身的ActorRef不會指向新的。
在另一方麵ActorSelection 指向該路徑(或多個路徑在使用通配符時),並且是完全不知道其化身當前占用著它。 由於這個原因導致ActorSelection 不能被監視到。通過發送識別信息到將被回複包含正確地引用(見通過角色選擇集識別角色)的 ActorIdentity 的ActorSelection 來解決當前化身ActorRef 存在該路徑之下。這也可以用ActorSelection 類的resolveOne方法來解決,這將返回一個匹配ActorRef 的Future 。
生命周期監控又名臨終看護
當另一個角色終止時,為了通知被通知(即永久性地停止,而不是暫時的失敗和重新啟動),一個角色可以自己注冊為接收在終止上層的其他角色發送的終止消息,其他演員出動(請參閱停止演員)。這項服務是由角色係統的臨終看護組件提供。
注冊一個監視器是很容易的(見第四行,剩下的就是用於展示整個功能):
1 |
import akka.actor.Terminated;
|
01 |
public class WatchActor extends UntypedActor {
|
02 |
final ActorRef child = this .getContext().actorOf(Props.empty(), "child" );
|
03 |
{
|
04 |
this .getContext().watch(child); // <-- the only call needed for registration
|
05 |
}
|
06 |
ActorRef lastSender = getContext().system().deadLetters();
|
07 |
08 |
@Override
|
09 |
public void onReceive(Object message) {
|
10 |
if (message.equals( "kill" )) {
|
11 |
getContext().stop(child);
|
12 |
lastSender = getSender();
|
13 |
} else if (message instanceof Terminated) {
|
14 |
final Terminated t = (Terminated) message;
|
15 |
if (t.getActor() == child) {
|
16 |
lastSender.tell( "finished" , getSelf());
|
17 |
}
|
18 |
} else {
|
19 |
unhandled(message);
|
20 |
}
|
21 |
}
|
22 |
} |
但是應當注意的是,產生的終止消息獨立於注冊和終止發生的順序。特別是,監控角色將接收一個終止信息即使被監控角色已經被終止在注冊時候。
注冊多次並不必然導致對多個消息被產生,但不保證隻有一個對應這樣的消息被接收:如果被監控角色終止已經發生和發送的消息排隊等候著,在另一個注冊完成之前,該消息已經處理完,然後第二消息將會排隊,是因為已經結束角色的監控的注冊導致終止信息立刻產生。
使用getContext().unwatch(target)方法從監控另一個角色生命活力撤銷下來也是有可能的。這個工作即使已終止消息已經排隊於郵箱中,在調用unwatch方法後對於那個角色將沒有終止消息被處理。
啟動鉤子
在正確啟動角色之後,preStart方法被調用。
1 |
@Override |
2 |
public void preStart() {
|
3 |
child = getContext().actorOf(Props.empty());
|
4 |
} |
第一次創建角色時,該方法被調用。在重新啟動期間,它被postRestart的默認實現調用,這意味著通過重寫該方法,你可以選擇此方法中初始化代碼是否被調用,對這個角色或每次重啟僅隻調用一次。在一個角色類的實例創建時,角色的構造函數的一部分的初始化代碼將每次都被調用,這發生在每次重啟時。
重啟鉤子
所有角色被監督著,即用故障處理策略鏈接到另一個角色。當處理一個消息是,拋出一個異常的情況下,演員可能重新啟動(見監管與監控)拋出一個異常。這重啟涉及上述提到鉤子:
1. 舊角色是通過調用preRestart方法進行通知的,這伴隨著造成重啟的異常與綁定該異常的消息;處理一個消息沒有造成這個重啟發生,則後者可能也沒有發生,例如,當一個監管者不捕獲該異常,則由其監管者重啟又或者如果由於一個同類的失敗,一個角色將被重新啟動。如果消息是可用的,那麼該消息的發件人也可以通過正常方式訪問的(即通過調用getSender())。
這個方法用在這些地方時最好的,例如:清除,準備交到新的角色實例等等。默認它停止所有子實例和調用postStop方法。
2. 來自actorOf方法調用的初始化工廠用來產生新的實例。
3. 新角色的postRestart方法被調用時這引起了重啟異常。默認情況下,preStart 是被調用,就如同在正常啟動的情況下。
一個角色重啟僅替換實際角色的對象;郵箱中的內容是不受重啟影響,所以消息的處理將在postRestart鉤子返回後恢複。引發異常的消息將不會再接收。當重啟時候,發送到角色的任何消息將像平常一樣排隊到它的郵箱。
注意:要知道,相關用戶失敗消息的順序是不確定的。特別是,一個父類可能會重新啟動其子類之前它已經處理了在失敗之前子類發送故障的的最後消息。見討論:消息順序的詳細信息。
終止鉤子
終止一個角色之後,其postStop鉤子被調用時,其可能用於例如從其他服務注銷這個角色。在這個角色的消息隊列已禁用之後,這個鉤子仍保證運行,即送到已終止角色的信息將被重定向到ActorSystem的deadLetters。
識別角色通過角色選擇集
作為角色的引用,路徑和地址描述,每個角色都有一個唯一的邏輯路徑,這是由以下的子類到父類直到達到角色係統的根的角色的鏈得到的,它有一個物理路徑,如果監管鏈包括任何遠程監管者,這可能會有所不同。這些路徑是由係統使用來查找角色,如當接收到一個遠程的消息和收件人進行搜索,但他們也有更直接用法:角色可以查找其他角色通過指定絕對或相對路徑,邏輯或物理,並接收返回的結果的ActorSelection:
1 |
// will look up this absolute path |
2 |
getContext().actorSelection( "/user/serviceA/actor" );
|
3 |
// will look up sibling beneath same supervisor |
4 |
getContext().actorSelection( "../joe" );
|
其中指定的路徑被解釋為一個java.net.URI, 它以 / 分隔成路徑段. 如果路徑以 /開始, 表示一個絕對路徑,從根監管者 ( “/user”的父親)開始查找; 否則是從當前角色開始。如果某一個路徑段為 .., 會找到當前所遍曆到的角色的上一級, 否則則會向下一級尋找具有該名字的子角色。 必須注意的是角色路徑中的.. 總是表示邏輯結構,也就是其監管者。
一個角色選擇集的路徑元素可以包含通配符,允許消息額廣播到該選擇集:
1 |
// will look all children to serviceB with names starting with worker |
2 |
getContext().actorSelection( "/user/serviceB/worker*" );
|
3 |
// will look up all siblings beneath same supervisor |
4 |
getContext().actorSelection( "../*" );
|
信息可以通過ActorSelection發送和當傳送的每個消息時,查找ActorSelection的路徑。如果選擇集不匹配任何角色的消息將被丟棄。
為了獲得一個ActorSelection的ActorRef,你需要發送一個消息到選擇集和使用來自橘色的回複的getSender引用。有一個內置的識別信息,即所有角色都理解並自動回複一個包含ActorRef的ActorIdentity消息。此消息由該角色特殊處理,在這個意義上說是穿越的,如果一個具體的名稱查找失敗(即非通配符路徑元素不符合一個存在的角色)然後產生一個消極結果。請注意,這並不意味著傳遞的答複是有保障的,但它仍然是一個正常的消息。
1 |
import akka.actor.ActorIdentity;
|
2 |
import akka.actor.ActorSelection;
|
3 |
import akka.actor.Identify;
|
01 |
public class Follower extends UntypedActor {
|
02 |
final String identifyId = "1" ;
|
03 |
{
|
04 |
ActorSelection selection =
|
05 |
getContext().actorSelection( "/user/another" );
|
06 |
selection.tell( new Identify(identifyId), getSelf());
|
07 |
}
|
08 |
ActorRef another;
|
09 |
10 |
final ActorRef probe;
|
11 |
public Follower(ActorRef probe) {
|
12 |
this .probe = probe;
|
13 |
}
|
14 |
15 |
@Override
|
16 |
public void onReceive(Object message) {
|
17 |
if (message instanceof ActorIdentity) {
|
18 |
ActorIdentity identity = (ActorIdentity) message;
|
19 |
if (identity.correlationId().equals(identifyId)) {
|
20 |
ActorRef ref = identity.getRef();
|
21 |
if (ref == null )
|
22 |
getContext().stop(getSelf());
|
23 |
else {
|
24 |
another = ref;
|
25 |
getContext().watch(another);
|
26 |
probe.tell(ref, getSelf());
|
27 |
}
|
28 |
}
|
29 |
} else if (message instanceof Terminated) {
|
30 |
final Terminated t = (Terminated) message;
|
31 |
if (t.getActor().equals(another)) {
|
32 |
getContext().stop(getSelf());
|
33 |
}
|
34 |
} else {
|
35 |
unhandled(message);
|
36 |
}
|
37 |
}
|
38 |
} |
您也可以取得一個ActorSelection的ActorRef通過ActorSelection的resolveOne方法。它返回匹配ActorRef的Future,如果這樣一個角色存在。如果沒有這樣的角色存在或鑒定所提供的超時時間內沒有完成,它將已失敗告終akka.actor.ActorNotFound。
遠程角色地址也可以查找,如果遠程被啟用:
1 |
getContext().actorSelection( "akka.tcp://app@otherhost:1234/user/serviceB" );
|
一個關於actor查找的示例見 遠程查找.
注意:在支持actorSelection,actorFor是被廢棄,因為用actorFor獲得的角色引用對本地與遠程角色表現不同。在一個本地角色引用的情況下,查找之前命名的演員需要存在,否則所獲取的引用將是一個EmptyLocalActorRef。即使在獲取角色引用之後,一個真實路徑的角色才被創建,這時也是可以獲取的。對於遠程角色引用通過actorFor來獲取的行為不同的,發送信息到該引用上將在覆蓋下通過在遠程係統給每一個消息發送的路徑查找角色。
最後更新:2017-05-23 14:35:58