66
技術社區[雲棲]
AKKA文檔(java版)—角色(二)
發送信息
向actor發送消息是使用下列方法之一。
- 意思是“fire-and-forget”, e.g. 異步發送一個消息並立即返回。也稱為 tell.
- 異步發送一條消息並返回一個 Future代表一個可能的回應。也稱為 ask.
每一個消息發送者分別保證自己的消息的次序.
注意:使用ask會造成性能影響,因為當超時是,一些事情需要保持追蹤。這需要一些東西來將一個Promise連接進入ActorRef,並且需要通過遠程連接可到達的。所以總是使用tell更偏向性能,除非必須才用ask.
在所有這些方法你可以傳遞自己的ActorRef。讓它這樣做,因為這將允許接收的角色才能夠回複您的郵件,因為發件人引用隨該信息一起發送的。
Tell: Fire-forget
這是發送消息的推薦方式。 不會阻塞地等待消息。它擁有最好的並發性和可擴展性。
1 |
// don’t forget to think about who is the sender (2nd argument) |
2 |
target.tell(message, getSelf()); |
發送者引用是伴隨著消息傳遞的,在接收角色可用範圍內,當處理該消息時,通過getSender方法。在一個角色內部通常是getSelf,這應該為發送者,但也可能是這種情況,回複被路由到一些其他角色即該父類的第二個參數tell將是不同的一個。在角色外部,如果沒有回複,第二個參數可以為null;如果在角色外部需要一個回複,你可以使用問答模式描,下麵描述..
Ask: Send-And-Receive-Future
ask 模式既包含actor也包含future, 所以它是作為一種使用模式,而不是ActorRef的方法:
1 |
import static akka.pattern.Patterns.ask;
|
2 |
import static akka.pattern.Patterns.pipe;
|
3 |
import scala.concurrent.Future;
|
4 |
import scala.concurrent.duration.Duration;
|
5 |
import akka.dispatch.Futures;
|
6 |
import akka.dispatch.Mapper;
|
7 |
import akka.util.Timeout;
|
01 |
final Timeout t = new Timeout(Duration.create( 5 , TimeUnit.SECONDS));
|
02 |
03 |
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
|
04 |
futures.add(ask(actorA, "request" , 1000 )); // using 1000ms timeout
|
05 |
futures.add(ask(actorB, "another request" , t)); // using timeout from
|
06 |
// above
|
07 |
08 |
final Future<Iterable<Object>> aggregate = Futures.sequence(futures,
|
09 |
system.dispatcher());
|
10 |
11 |
final Future<Result> transformed = aggregate.map(
|
12 |
new Mapper<Iterable<Object>, Result>() {
|
13 |
public Result apply(Iterable<Object> coll) {
|
14 |
final Iterator<Object> it = coll.iterator();
|
15 |
final String x = (String) it.next();
|
16 |
final String s = (String) it.next();
|
17 |
return new Result(x, s);
|
18 |
}
|
19 |
}, system.dispatcher());
|
20 |
21 |
pipe(transformed, system.dispatcher()).to(actorC); |
上麵的例子展示了將 ask與 future上的 pipe 模式一起使用,因為這是一種非常常用的組合。 請注意上麵所有的調用都是完全非阻塞和異步的: ask 產生 Future, 兩個通過Futures.sequence和map方法組合成一個新的Future,然後用 pipe 在future上安裝一個 onComplete-處理器來完成將收集到的 Result 發送到其它actor的動作。
使用 ask 將會像tell 一樣發送消息給接收方, 接收方必須通過getSender().tell(reply, getSelf()) 發送回應來為返回的 Future 填充數據。ask 操作包括創建一個內部actor來處理回應,必須為這個內部actor指定一個超時期限,過了超時期限內部actor將被銷毀以防止內存泄露。詳見下麵:
注意:如果要以異常來填充future你需要發送一個 Failure 消息給發送方。這個操作不會在actor處理消息發生異常時自動完成。
1 |
try {
|
2 |
String result = operation();
|
3 |
getSender().tell(result, getSelf());
|
4 |
} catch (Exception e) {
|
5 |
getSender().tell( new akka.actor.Status.Failure(e), getSelf());
|
6 |
throw e;
|
7 |
} |
如果一個actor 沒有完成future , 它會在超時時限到來時過期, 明確作為一個參數傳給ask方法,以 AskTimeoutException來完成Future。
關於如何等待或查詢一個future,更多信息請見Futures 。
Future的onComplete, onResult, 或 onTimeout 方法可以用來注冊一個回調,以便在Future完成時得到通知。從而提供一種避免阻塞的方法。
在使用future回調時,在角色內部你要小心避免關閉該角色的引用, 即不要在回調中調用該角色的方法或訪問其可變狀態。這會破壞角色的封裝,會引用同步bugbug和race condition, 因為回調會與此角色一同被並發調度。 不幸的是目前還沒有一種編譯時的方法能夠探測到這種非法訪問。 參閱: 角色與共享可變狀態
轉發消息
你可以將消息從一個角色轉發給另一個。雖然經過了一個‘中轉’,但最初的發送者地址/引用將保持不變。當實現功能類似路由器、負載均衡器、備份等的角色時會很有用。同時你需要傳遞你的上下文變量。
1 |
target.forward(result, getContext()); |
接收信息
當一個角色收到被傳遞到onReceive方法的消息,這是在需要被定義的UntypedActor基類的抽象方法。
下麵是個例子:
01 |
import akka.actor.UntypedActor;
|
02 |
import akka.event.Logging;
|
03 |
import akka.event.LoggingAdapter;
|
04 |
05 |
public class MyUntypedActor extends UntypedActor {
|
06 |
LoggingAdapter log = Logging.getLogger(getContext().system(), this );
|
07 |
08 |
public void onReceive(Object message) throws Exception {
|
09 |
if (message instanceof String) {
|
10 |
log.info( "Received String message: {}" , message);
|
11 |
getSender().tell(message, getSelf());
|
12 |
} else
|
13 |
unhandled(message);
|
14 |
}
|
15 |
} |
除了使用IF-instanceof檢查,還有一種方法是使用Apache Commons MethodUtils調用指定的參數類型相匹配的消息類型方法。
回複信息
如果你需要一個用來發送回應消息的目標,可以使用 getSender, 它是一個角色引用。 你可以用 getSender().tell(replyMsg, getSelf())向這個引用發送回應消息。 你也可以將這個ActorRef保存起來將來再作回應。如果沒有sender (不是從actor發送的消息或者沒有future上下文) 那麼 sender 缺省為 ‘死信’ 角色的引用。
1 |
@Override |
2 |
public void onReceive(Object msg) {
|
3 |
Object result =
|
4 |
// calculate result ...
|
5 |
6 |
// do not forget the second argument!
|
7 |
getSender().tell(result, getSelf());
|
8 |
} |
接收超時
在一個ReceiveTimeout消息發送觸發之後,該UntypedActorContext setReceiveTimeout定義不活動超時時間。當指定時,接收功能應該能夠處理一個akka.actor.ReceiveTimeout消息。 1毫秒為最小支持超時。
請注意,接受超時可能會觸發和在另一條消息是入隊後,該ReceiveTimeout消息將重排隊;因此,它不能保證在接收到接收超時的一定有預先通過該方法所配置的空閑時段。
一旦設置,接收超時保持有效(即持續重複觸發超過不活動時間後)。傳遞Duration.Undefined關掉此功能。
01 |
import akka.actor.ActorRef;
|
02 |
import akka.actor.ReceiveTimeout;
|
03 |
import akka.actor.UntypedActor;
|
04 |
import scala.concurrent.duration.Duration;
|
05 |
06 |
public class MyReceiveTimeoutUntypedActor extends UntypedActor {
|
07 |
08 |
public MyReceiveTimeoutUntypedActor() {
|
09 |
// To set an initial delay
|
10 |
getContext().setReceiveTimeout(Duration.create( "30 seconds" ));
|
11 |
}
|
12 |
13 |
public void onReceive(Object message) {
|
14 |
if (message.equals( "Hello" )) {
|
15 |
// To set in a response to a message
|
16 |
getContext().setReceiveTimeout(Duration.create( "1 second" ));
|
17 |
} else if (message instanceof ReceiveTimeout) {
|
18 |
// To turn it off
|
19 |
getContext().setReceiveTimeout(Duration.Undefined());
|
20 |
} else {
|
21 |
unhandled(message);
|
22 |
}
|
23 |
}
|
24 |
} |
終止角色
通過調用ActorRefFactory 即 ActorContext 或 ActorSystem 的 stop 方法來終止一個角色, 通常 context 用來終止子角色,而 system 用來終止頂級角色. 實際的終止操作是異步執行的, 即stop 可能在角色被終止之前返回。
如果當前有正在處理的消息,對該消息的處理將在actor被終止之前完成,但是郵箱中的後續消息將不會被處理。缺省情況下這些消息會被送到 ActorSystem 的 死信, 但是這取決於郵箱的實現。
角色的終止分兩步: 第一步角色將停止對郵箱的處理,向所有子角色發送終止命令,然後處理來自子角色的終止消息直到所有的子角色都完成終止, 最後終止自己 (調用 postStop, 銷毀郵箱, 向 DeathWatch 發布 Terminated , 通知其監管者). 這個過程保證角色係統中的子樹以一種有序的方式終止, 將終止命令傳播到葉子結點並收集它們回送的確認消息給被終止的監管者。如果其中某個角色沒有響應 (即由於處理消息用了太長時間以至於沒有收到終止命令), 整個過程將會被阻塞。
在 ActorSystem.shutdown被調用時, 係統根監管角色會被終止,以上的過程將保證整個係統的正確終止。
postStop hook 是在角色被完全終止以後調用的。這是為了清理資源:
1 |
@Override |
2 |
public void postStop() {
|
3 |
// clean up resources here ...
|
4 |
} |
注意:由於角色的終止是異步的, 你不能馬上使用你剛剛終止的子角色的名字;這會導致 InvalidActorNameException. 你應該 watch 正在終止的 介紹而在最終到達的 Terminated消息的處理中創建它的替代者。
PoisonPill
你也可以向角色發送 akka.actor.PoisonPill 消息, 這個消息處理完成後角色會被終止。 PoisonPill 與普通消息一樣被放進隊列,因此會在已經入隊列的其它消息之後被執行。
像下麵使用:
1 |
myActor.tell(akka.actor.PoisonPill.getInstance(), sender); |
優雅地終止
如果你想等待終止過程的結束,或者組合若幹actor的終止次序,可以使用gracefulStop:
1 |
import static akka.pattern.Patterns.gracefulStop;
|
2 |
import scala.concurrent.Await;
|
3 |
import scala.concurrent.Future;
|
4 |
import scala.concurrent.duration.Duration;
|
5 |
import akka.pattern.AskTimeoutException;
|
1 |
try {
|
2 |
Future<Boolean> stopped =
|
3 |
gracefulStop(actorRef, Duration.create( 5 , TimeUnit.SECONDS), Manager.SHUTDOWN);
|
4 |
Await.result(stopped, Duration.create( 6 , TimeUnit.SECONDS));
|
5 |
// the actor has been stopped
|
6 |
} catch (AskTimeoutException e) {
|
7 |
// the actor wasn't stopped within 5 seconds
|
8 |
} |
01 |
public class Manager extends UntypedActor {
|
02 |
03 |
public static final String SHUTDOWN = "shutdown" ;
|
04 |
05 |
ActorRef worker = getContext().watch(getContext().actorOf(
|
06 |
Props.create(Cruncher. class ), "worker" ));
|
07 |
08 |
public void onReceive(Object message) {
|
09 |
if (message.equals( "job" )) {
|
10 |
worker.tell( "crunch" , getSelf());
|
11 |
} else if (message.equals(SHUTDOWN)) {
|
12 |
worker.tell(PoisonPill.getInstance(), getSelf());
|
13 |
getContext().become(shuttingDown);
|
14 |
}
|
15 |
}
|
16 |
17 |
Procedure<Object> shuttingDown = new Procedure<Object>() {
|
18 |
@Override
|
19 |
public void apply(Object message) {
|
20 |
if (message.equals( "job" )) {
|
21 |
getSender().tell( "service unavailable, shutting down" , getSelf());
|
22 |
} else if (message instanceof Terminated) {
|
23 |
getContext().stop(getSelf());
|
24 |
}
|
25 |
}
|
26 |
};
|
27 |
} |
當gracefulStop()成功返回時,角色的postStop()鉤子將被執行:存在一個情況,happens-before 邊緣在postStop()結尾和gracefulStop()返回之間。
在上麵的例子中一個自定義的Manager.SHUTDOWN消息被發送到目標角色為了初始化正在終止角色的處理。您可以使用PoisonPill為這一點,但在阻止目標的角色之前,你擁有很少機會與其他角色進行交互。簡單的清除任務可以在postStop中處理。
注意:請記住,一個角色終止和它的名字被注銷是互相異步發生的獨立事件。因此,在gracefulStop()後返回,它可能是你會發現名稱仍然在使用。為了保證正確的注銷,隻能重複使用來自你控製監管者內與一個終止的消息的回應的名稱,即不屬於頂級的角色。
熱插拔
升級
Akka支持在運行時對角色消息循環 (例如它的的實現)進行實時替換: 在角色中調用getContext.become 方法。 熱替換的代碼被存在一個棧中,可以被pushed(replacing 或 adding 在頂部)和popped。
注意:請注意角色被其監管者重啟後將恢複其最初的行為。
熱替換角色使用getContext().become:
1 |
import akka.japi.Procedure;
|
01 |
public class HotSwapActor extends UntypedActor {
|
02 |
03 |
Procedure<Object> angry = new Procedure<Object>() {
|
04 |
@Override
|
05 |
public void apply(Object message) {
|
06 |
if (message.equals( "bar" )) {
|
07 |
getSender().tell( "I am already angry?" , getSelf());
|
08 |
} else if (message.equals( "foo" )) {
|
09 |
getContext().become(happy);
|
10 |
}
|
11 |
}
|
12 |
};
|
13 |
14 |
Procedure<Object> happy = new Procedure<Object>() {
|
15 |
@Override
|
16 |
public void apply(Object message) {
|
17 |
if (message.equals( "bar" )) {
|
18 |
getSender().tell( "I am already happy :-)" , getSelf());
|
19 |
} else if (message.equals( "foo" )) {
|
20 |
getContext().become(angry);
|
21 |
}
|
22 |
}
|
23 |
};
|
24 |
25 |
public void onReceive(Object message) {
|
26 |
if (message.equals( "bar" )) {
|
27 |
getContext().become(angry);
|
28 |
} else if (message.equals( "foo" )) {
|
29 |
getContext().become(happy);
|
30 |
} else {
|
31 |
unhandled(message);
|
32 |
}
|
33 |
}
|
34 |
} |
become 方法還有很多其它的用處,一個特別好的例子是用它來實現一個有限狀態機(FSM)。這將代替當前行為(即行為棧頂部),這意味著你不用使用unbecome,而是下一個行為將明確被安裝。
使用become另一個方式:不代替而是添加到行為棧頂部。這種情況是必須要保證在長期運行中“pop”操作(即unbecome)數目匹配“push”數目,否則這個數目將導致內存泄露(這就是該行為不是默認原因)。
01 |
public class UntypedActorSwapper {
|
02 |
03 |
public static class Swap {
|
04 |
public static Swap SWAP = new Swap();
|
05 |
06 |
private Swap() {
|
07 |
}
|
08 |
}
|
09 |
10 |
public static class Swapper extends UntypedActor {
|
11 |
LoggingAdapter log = Logging.getLogger(getContext().system(), this );
|
12 |
13 |
public void onReceive(Object message) {
|
14 |
if (message == SWAP) {
|
15 |
log.info( "Hi" );
|
16 |
getContext().become( new Procedure<Object>() {
|
17 |
@Override
|
18 |
public void apply(Object message) {
|
19 |
log.info( "Ho" );
|
20 |
getContext().unbecome(); // resets the latest 'become'
|
21 |
}
|
22 |
}, false ); // this signals stacking of the new behavior
|
23 |
} else {
|
24 |
unhandled(message);
|
25 |
}
|
26 |
}
|
27 |
}
|
28 |
29 |
public static void main(String... args) {
|
30 |
ActorSystem system = ActorSystem.create( "MySystem" );
|
31 |
ActorRef swap = system.actorOf(Props.create(Swapper. class ));
|
32 |
swap.tell(SWAP, ActorRef.noSender()); // logs Hi
|
33 |
swap.tell(SWAP, ActorRef.noSender()); // logs Ho
|
34 |
swap.tell(SWAP, ActorRef.noSender()); // logs Hi
|
35 |
swap.tell(SWAP, ActorRef.noSender()); // logs Ho
|
36 |
swap.tell(SWAP, ActorRef.noSender()); // logs Hi
|
37 |
swap.tell(SWAP, ActorRef.noSender()); // logs Ho
|
38 |
}
|
39 |
40 |
} |
貯藏
該UntypedActorWithStash類使一個角色臨時藏匿不能或不應該使用角色的當前行為處理的消息。在改變角色的消息處理函數,即調用getContext().become()或getContext().unbecome(),所有藏匿的消息可以“unstashed”,因此前麵加上他們角色的郵箱。這樣一來,藏消息可以在他們已經收到原先相同的順序進行處理。擴展UntypedActorWithStash角色將自動獲得一個雙端隊列為基礎的郵箱。
注意:抽象類UntypedActorWithStash實現標記接口RequiresMessageQueue這要求係統能夠為該角色自動選擇基於雙端隊列的郵箱實現。如果你想更多的控製權郵箱,請見郵箱文檔:郵箱
。
這裏是UntypedActorWithStash類中操作的示例:
1 |
import akka.actor.UntypedActorWithStash;
|
01 |
public class ActorWithProtocol extends UntypedActorWithStash {
|
02 |
public void onReceive(Object msg) {
|
03 |
if (msg.equals( "open" )) {
|
04 |
unstashAll();
|
05 |
getContext().become( new Procedure<Object>() {
|
06 |
public void apply(Object msg) throws Exception {
|
07 |
if (msg.equals( "write" )) {
|
08 |
// do writing...
|
09 |
} else if (msg.equals( "close" )) {
|
10 |
unstashAll();
|
11 |
getContext().unbecome();
|
12 |
} else {
|
13 |
stash();
|
14 |
}
|
15 |
}
|
16 |
}, false ); // add behavior on top instead of replacing
|
17 |
} else {
|
18 |
stash();
|
19 |
}
|
20 |
}
|
21 |
} |
調用stash()將當前的消息(即角色最後收到的消息)到角色的藏匿處。當在處理默認情況下在角色的消息處理函數來隱藏那些沒有被其他案件處理的情況時,這是典型調用。同一消息的兩次是非法藏匿;這樣做會導致一個IllegalStateException被拋出。藏匿也可以此情況下,調用stath()能會導致容量違規,這導致StashOverflowException。藏匿的容量可以使用郵箱的配置的藏匿容量設置(一個Int類型)進行配置。
調用unstashAll()從藏匿到角色的郵箱進入隊列消息,直到信箱(如果有的話)已經達到的能力(請注意,從藏匿處的消息前加上郵箱)。如果一個有界的郵箱溢出,一個MessageQueueAppendFailedException被拋出。在調用unstashAll()後,藏匿保證為空。
藏匿由scala.collection.immutable.Vector支持。這樣一來,即使是非常大量的消息在不會對性能產生重大影響下被藏匿。
注意,藏匿是短暫的角色狀態的一部分,該郵箱不像。因此,應該像具有相同屬性的角色狀態的其他部分進行管理。該preRestart的UntypedActorWithStash的實現將調用unstashAll(),它通常是所期望的行為。
注意:如果要強製執行,你的角色隻能用一個無上限stash進行工作,那麼你應該使用UntypedActorWithUnboundedStash類代替。
殺死一個角色
您可以通過發送一個
kill消息殺一個角色。這將導致角色拋出一個
ActorKilledException,引發故障。角色將暫停運作,其監管這將被要求如何處理失敗,這可能意味著恢複的角色,重新啟動,或完全終止它。請見 監管手段以獲取更多信息。
使用Kill像下麵:
1 |
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender()); |
角色與異常
在消息被actor處理的過程中可能會拋出異常,例如數據庫異常。
消息會怎樣
如果消息處理過程中(即從郵箱中取出並交給receive後)發生了異常,這個消息將被丟失。必須明白它不會被放回到郵箱中。所以如果你希望重試對消息的處理,你需要自己抓住異常然後在異常處理流程中重試. 請確保你限製重試的次數,因為你不會希望係統產生活鎖 (從而消耗大量CPU而於事無補)。另一種可能性請查看 PeekMailbox pattern
郵箱會怎樣
如果消息處理過程中發生異常,郵箱沒有任何變化。如果actor被重啟,郵箱會被保留。郵箱中的所有消息不會丟失。
角色會怎樣
如果角色內代碼拋出了異常,那麼角色將被暫停,接著監管者處理開始(見監管與監控)。依賴監管者決策角色將被恢複(就像什麼事情沒發生),重啟(擦除內部狀態重新開始)或終止。
初始化模式
角色鉤子的豐富的生命周期提供了實現各種初始化模式的有用工具。在一個ActorRef的生命周期,一個角色可能會經曆多次重新啟動後,當舊的實例替換為新的,對外麵觀察這是不可見的,僅僅看見ActorRef。
有人可能會想到“化身”的新實例。初始化可能需要一個角色的每一個化身,但有時人們需要初始化僅發生在第一個實例誕生時,當ActorRef被創建。以下各節提供的模式為不同的初始化需求。
通過構造函數初始化
使用構造函數初始化有著各種好處。首先,它使得有可能使用的val字段來存儲任何狀態,這並不在角色實例的生命周期中變化,使得角色實現更加健壯。該構造函數被角色的每一個化身調用,所以角色的內部總是可以認為正確的初始化發生。這也是這種方法的缺點,因為有當一個人想避免在重新啟動時重新初始化的內部情況。例如,在重啟過程,保持整個子角色通常是有用。以下部分提供了這種情況下的模式。
通過preStart初始化
在的第一個實例的初始化過程中,一個角色的preStart()方法僅僅被直接調用一次,那就是,在ActorRef的創建。在重新啟動的情況下,preStart()從postRestart()被調用,因此,如果不重寫,preStart()被每一個化身調用。然而,覆蓋postRestart(),可以禁用此行為,並確保隻調用一次preStart()。
這種模式的一個有用的用法是在重新啟動時禁止創建子類新的ActorRef。這可以通過覆蓋preRestart()來實現:
01 |
@Override |
02 |
public void preStart() {
|
03 |
// Initialize children here
|
04 |
} |
05 |
06 |
// Overriding postRestart to disable the call to preStart() |
07 |
// after restarts |
08 |
@Override |
09 |
public void postRestart(Throwable reason) {
|
10 |
} |
11 |
12 |
// The default implementation of preRestart() stops all the children |
13 |
// of the actor. To opt-out from stopping the children, we |
14 |
// have to override preRestart() |
15 |
@Override |
16 |
public void preRestart(Throwable reason, Option<Object> message)
|
17 |
throws Exception {
|
18 |
// Keep the call to postStop(), but no stopping of children
|
19 |
postStop();
|
20 |
} |
請注意,該子角色還在重新啟動,但不會創建新的ActorRef。對子類可以遞歸應用相同的原則,確保他們的preStart()方法被隻在創建自己的引用時調用。
了解更多信息,請參閱What Restarting Means:
通過消息傳遞初始化
有這樣的情況,在構造函數中,當它是不可能傳遞所需的所有角色初始化的信息,例如,在存在循環的依賴關係。在這種情況下,角色應該聽一個初始化消息,並利用become()或有限狀態機的狀態對角色的初始化和未初始化的狀態進行編碼。
01 |
private String initializeMe = null ;
|
02 |
03 |
@Override |
04 |
public void onReceive(Object message) throws Exception {
|
05 |
if (message.equals( "init" )) {
|
06 |
initializeMe = "Up and running" ;
|
07 |
getContext().become( new Procedure<Object>() {
|
08 |
@Override
|
09 |
public void apply(Object message) throws Exception {
|
10 |
if (message.equals( "U OK?" ))
|
11 |
getSender().tell(initializeMe, getSelf());
|
12 |
}
|
13 |
});
|
14 |
}
|
15 |
} |
如果在初始化之前,角色可以接收消息,一個有用的工具可能是Stash,可以保存消息直到初始化完成,在角色初始化之後重新放回。
注意:這個模式應小心使用,並且當上述的模式都不適用才使用。其中一個潛在的問題是,消息可能會在發送給遠程角色丟失。另外,在未初始化狀態發布一個ActorRef可能導致在其接收用戶信息的初始化之前已經完成。