阅读66 返回首页    go 阿里云 go 技术社区[云栖]


AKKA文档(java版)—角色(二)

发送信息

向actor发送消息是使用下列方法之一。

  • 意思是“fire-and-forget”, e.g. 异步发送一个消息并立即返回。也称为 tell.
  • 异步发送一条消息并返回一个 Future代表一个可能的回应。也称为 ask.

每一个消息发送者分别保证自己的消息的次序.

注意:使用ask会造成性能影响,因为当超时是,一些事情需要保持追踪。这需要一些东西来将一个Promise连接进入ActorRef,并且需要通过远程连接可到达的。所以总是使用tell更偏向性能,除非必须才用ask.

在所有这些方法你可以传递自己的ActorRef。让它这样做,因为这将允许接收的角色才能够回复您的邮件,因为发件人引用随该信息一起发送的。

Tell: Fire-forget

这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。

1 // don’t forget to think about who is the sender (2nd argument)
2 target.tell(message, getSelf());

发送者引用是伴随着消息传递的,在接收角色可用范围内,当处理该消息时,通过getSender方法。在一个角色内部通常是getSelf,这应该为发送者,但也可能是这种情况,回复被路由到一些其他角色即该父类的第二个参数tell将是不同的一个。在角色外部,如果没有回复,第二个参数可以为null;如果在角色外部需要一个回复,你可以使用问答模式描,下面描述..

Ask: Send-And-Receive-Future

ask 模式既包含actor也包含future, 所以它是作为一种使用模式,而不是ActorRef的方法:

1 import static akka.pattern.Patterns.ask;
2 import static akka.pattern.Patterns.pipe;
3 import scala.concurrent.Future;
4 import scala.concurrent.duration.Duration;
5 import akka.dispatch.Futures;
6 import akka.dispatch.Mapper;
7 import akka.util.Timeout;

 

01 final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
02  
03 final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
04 futures.add(ask(actorA, "request"1000)); // using 1000ms timeout
05 futures.add(ask(actorB, "another request", t)); // using timeout from
06                                                 // above
07  
08 final Future<Iterable<Object>> aggregate = Futures.sequence(futures,
09     system.dispatcher());
10  
11 final Future<Result> transformed = aggregate.map(
12     new Mapper<Iterable<Object>, Result>() {
13       public Result apply(Iterable<Object> coll) {
14         final Iterator<Object> it = coll.iterator();
15         final String x = (String) it.next();
16         final String s = (String) it.next();
17         return new Result(x, s);
18       }
19     }, system.dispatcher());
20  
21 pipe(transformed, system.dispatcher()).to(actorC);

上面的例子展示了将 ask与 future上的 pipe 模式一起使用,因为这是一种非常常用的组合。 请注意上面所有的调用都是完全非阻塞和异步的: ask 产生 Future, 两个通过Futures.sequencemap方法组合成一个新的Future,然后用 pipe future上安装一个 onComplete-处理器来完成将收集到的 Result 发送到其它actor的动作。

使用 ask 将会像tell 一样发送消息给接收方, 接收方必须通过getSender().tell(reply, getSelf()) 发送回应来为返回的 Future 填充数据。ask 操作包括创建一个内部actor来处理回应,必须为这个内部actor指定一个超时期限,过了超时期限内部actor将被销毁以防止内存泄露。详见下面:

注意:如果要以异常来填充future你需要发送一个 Failure 消息给发送方。这个操作不会在actor处理消息发生异常时自动完成。

1 try {
2   String result = operation();
3   getSender().tell(result, getSelf());
4 catch (Exception e) {
5   getSender().tell(new akka.actor.Status.Failure(e), getSelf());
6   throw e;
7 }

如果一个actor 没有完成future , 它会在超时时限到来时过期, 明确作为一个参数传给ask方法,以 AskTimeoutException来完成Future

关于如何等待或查询一个future,更多信息请见Futures 。

FutureonCompleteonResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。

在使用future回调时,在角色内部你要小心避免关闭该角色的引用, 即不要在回调中调用该角色的方法或访问其可变状态。这会破坏角色的封装,会引用同步bugbug和race condition, 因为回调会与此角色一同被并发调度。 不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。 参阅: 角色与共享可变状态

转发消息

你可以将消息从一个角色转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类似路由器、负载均衡器、备份等的角色时会很有用。同时你需要传递你的上下文变量。

1 target.forward(result, getContext());

接收信息

当一个角色收到被传递到onReceive方法的消息,这是在需要被定义的UntypedActor基类的抽象方法。

下面是个例子:

01 import akka.actor.UntypedActor;
02 import akka.event.Logging;
03 import akka.event.LoggingAdapter;
04  
05 public class MyUntypedActor extends UntypedActor {
06   LoggingAdapter log = Logging.getLogger(getContext().system(), this);
07  
08   public void onReceive(Object message) throws Exception {
09     if (message instanceof String) {
10       log.info("Received String message: {}", message);
11       getSender().tell(message, getSelf());
12     else
13       unhandled(message);
14   }
15 }

除了使用IF-instanceof检查,还有一种方法是使用Apache Commons MethodUtils调用指定的参数类型相匹配的消息类型方法。

回复信息

如果你需要一个用来发送回应消息的目标,可以使用 getSender, 它是一个角色引用。 你可以用 getSender().tell(replyMsg, getSelf())向这个引用发送回应消息。 你也可以将这个ActorRef保存起来将来再作回应。如果没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为 ‘死信’ 角色的引用。

1 @Override
2 public void onReceive(Object msg) {
3   Object result =
4       // calculate result ...
5  
6   // do not forget the second argument!
7   getSender().tell(result, getSelf());
8 }

接收超时

在一个ReceiveTimeout消息发送触发之后,该UntypedActorContext setReceiveTimeout定义不活动超时时间。当指定时,接收功能应该能够处理一个akka.actor.ReceiveTimeout消息。 1毫秒为最小支持超时。

请注意,接受超时可能会触发和在另一条消息是入队后,该ReceiveTimeout消息将重排队;因此,它不能保证在接收到接收超时的一定有预先通过该方法所配置的空闲时段。

一旦设置,接收超时保持有效(即持续重复触发超过不活动时间后)。传递Duration.Undefined关掉此功能。

01 import akka.actor.ActorRef;
02 import akka.actor.ReceiveTimeout;
03 import akka.actor.UntypedActor;
04 import scala.concurrent.duration.Duration;
05  
06 public class MyReceiveTimeoutUntypedActor extends UntypedActor {
07  
08   public MyReceiveTimeoutUntypedActor() {
09     // To set an initial delay
10     getContext().setReceiveTimeout(Duration.create("30 seconds"));
11   }
12  
13   public void onReceive(Object message) {
14     if (message.equals("Hello")) {
15       // To set in a response to a message
16       getContext().setReceiveTimeout(Duration.create("1 second"));
17     else if (message instanceof ReceiveTimeout) {
18       // To turn it off
19       getContext().setReceiveTimeout(Duration.Undefined());
20     else {
21       unhandled(message);
22     }
23   }
24 }

终止角色

通过调用ActorRefFactory 即 ActorContext 或 ActorSystem 的 stop 方法来终止一个角色, 通常 context 用来终止子角色,而 system 用来终止顶级角色. 实际的终止操作是异步执行的, 即stop 可能在角色被终止之前返回。

如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem 的 死信, 但是这取决于邮箱的实现。

角色的终止分两步: 第一步角色将停止对邮箱的处理,向所有子角色发送终止命令,然后处理来自子角色的终止消息直到所有的子角色都完成终止, 最后终止自己 (调用 postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated , 通知其监管者). 这个过程保证角色系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个角色没有响应 (即由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。

在 ActorSystem.shutdown被调用时, 系统根监管角色会被终止,以上的过程将保证整个系统的正确终止。

postStop hook 是在角色被完全终止以后调用的。这是为了清理资源:

1 @Override
2 public void postStop() {
3   // clean up resources here ...
4 }

注意:由于角色的终止是异步的, 你不能马上使用你刚刚终止的子角色的名字;这会导致 InvalidActorNameException. 你应该 watch 正在终止的 介绍而在最终到达的 Terminated消息的处理中创建它的替代者。

PoisonPill

你也可以向角色发送 akka.actor.PoisonPill 消息, 这个消息处理完成后角色会被终止。 PoisonPill 与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。

像下面使用:

1 myActor.tell(akka.actor.PoisonPill.getInstance(), sender);

优雅地终止

如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:

1 import static akka.pattern.Patterns.gracefulStop;
2 import scala.concurrent.Await;
3 import scala.concurrent.Future;
4 import scala.concurrent.duration.Duration;
5 import akka.pattern.AskTimeoutException;

 

1 try {
2   Future<Boolean> stopped =
3     gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN);
4   Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
5   // the actor has been stopped
6 catch (AskTimeoutException e) {
7   // the actor wasn't stopped within 5 seconds
8 }

 

01 public class Manager extends UntypedActor {
02  
03   public static final String SHUTDOWN = "shutdown";
04  
05   ActorRef worker = getContext().watch(getContext().actorOf(
06       Props.create(Cruncher.class), "worker"));
07  
08   public void onReceive(Object message) {
09     if (message.equals("job")) {
10       worker.tell("crunch", getSelf());
11     else if (message.equals(SHUTDOWN)) {
12       worker.tell(PoisonPill.getInstance(), getSelf());
13       getContext().become(shuttingDown);
14     }
15   }
16  
17   Procedure<Object> shuttingDown = new Procedure<Object>() {
18     @Override
19     public void apply(Object message) {
20       if (message.equals("job")) {
21         getSender().tell("service unavailable, shutting down", getSelf());
22       else if (message instanceof Terminated) {
23         getContext().stop(getSelf());
24       }
25     }
26   };
27 }

gracefulStop()成功返回时,角色的postStop()钩子将被执行:存在一个情况,happens-before 边缘在postStop()结尾和gracefulStop()返回之间。

在上面的例子中一个自定义的Manager.SHUTDOWN消息被发送到目标角色为了初始化正在终止角色的处理。您可以使用PoisonPill为这一点,但在阻止目标的角色之前,你拥有很少机会与其他角色进行交互。简单的清除任务可以在postStop中处理。

注意:请记住,一个角色终止和它的名字被注销是互相异步发生的独立事件。因此,在gracefulStop()后返回,它可能是你会发现名称仍然在使用。为了保证正确的注销,只能重复使用来自你控制监管者内与一个终止的消息的回应的名称,即不属于顶级的角色。

热插拔

升级

Akka支持在运行时对角色消息循环 (例如它的的实现)进行实时替换: 在角色中调用getContext.become 方法。 热替换的代码被存在一个栈中,可以被pushed(replacing 或 adding 在顶部)和popped。

注意:请注意角色被其监管者重启后将恢复其最初的行为。

热替换角色使用getContext().become:

1 import akka.japi.Procedure;

 

01 public class HotSwapActor extends UntypedActor {
02  
03   Procedure<Object> angry = new Procedure<Object>() {
04     @Override
05     public void apply(Object message) {
06       if (message.equals("bar")) {
07         getSender().tell("I am already angry?", getSelf());
08       else if (message.equals("foo")) {
09         getContext().become(happy);
10       }
11     }
12   };
13  
14   Procedure<Object> happy = new Procedure<Object>() {
15     @Override
16     public void apply(Object message) {
17       if (message.equals("bar")) {
18         getSender().tell("I am already happy :-)", getSelf());
19       else if (message.equals("foo")) {
20         getContext().become(angry);
21       }
22     }
23   };
24  
25   public void onReceive(Object message) {
26     if (message.equals("bar")) {
27       getContext().become(angry);
28     else if (message.equals("foo")) {
29       getContext().become(happy);
30     else {
31       unhandled(message);
32     }
33   }
34 }

become 方法还有很多其它的用处,一个特别好的例子是用它来实现一个有限状态机(FSM)。这将代替当前行为(即行为栈顶部),这意味着你不用使用unbecome,而是下一个行为将明确被安装。

使用become另一个方式:不代替而是添加到行为栈顶部。这种情况是必须要保证在长期运行中“pop”操作(即unbecome)数目匹配“push”数目,否则这个数目将导致内存泄露(这就是该行为不是默认原因)。

01 public class UntypedActorSwapper {
02  
03   public static class Swap {
04     public static Swap SWAP = new Swap();
05  
06     private Swap() {
07     }
08   }
09  
10   public static class Swapper extends UntypedActor {
11     LoggingAdapter log = Logging.getLogger(getContext().system(), this);
12  
13     public void onReceive(Object message) {
14       if (message == SWAP) {
15         log.info("Hi");
16         getContext().become(new Procedure<Object>() {
17           @Override
18           public void apply(Object message) {
19             log.info("Ho");
20             getContext().unbecome(); // resets the latest 'become'
21           }
22         }, false); // this signals stacking of the new behavior
23       else {
24         unhandled(message);
25       }
26     }
27   }
28  
29   public static void main(String... args) {
30     ActorSystem system = ActorSystem.create("MySystem");
31     ActorRef swap = system.actorOf(Props.create(Swapper.class));
32     swap.tell(SWAP, ActorRef.noSender()); // logs Hi
33     swap.tell(SWAP, ActorRef.noSender()); // logs Ho
34     swap.tell(SWAP, ActorRef.noSender()); // logs Hi
35     swap.tell(SWAP, ActorRef.noSender()); // logs Ho
36     swap.tell(SWAP, ActorRef.noSender()); // logs Hi
37     swap.tell(SWAP, ActorRef.noSender()); // logs Ho
38   }
39  
40 }

贮藏

该UntypedActorWithStash类使一个角色临时藏匿不能或不应该使用角色的当前行为处理的消息。在改变角色的消息处理函数,即调用getContext().become()或getContext().unbecome(),所有藏匿的消息可以“unstashed”,因此前面加上他们角色的邮箱。这样一来,藏消息可以在他们已经收到原先相同的顺序进行处理。扩展UntypedActorWithStash角色将自动获得一个双端队列为基础的邮箱。

注意:抽象类UntypedActorWithStash实现标记接口RequiresMessageQueue这要求系统能够为该角色自动选择基于双端队列的邮箱实现。如果你想更多的控制权邮箱,请见邮箱文档:邮箱

这里是UntypedActorWithStash类中操作的示例:

1 import akka.actor.UntypedActorWithStash;

 

01 public class ActorWithProtocol extends UntypedActorWithStash {
02   public void onReceive(Object msg) {
03     if (msg.equals("open")) {
04       unstashAll();
05       getContext().become(new Procedure<Object>() {
06         public void apply(Object msg) throws Exception {
07           if (msg.equals("write")) {
08             // do writing...
09           else if (msg.equals("close")) {
10             unstashAll();
11             getContext().unbecome();
12           else {
13             stash();
14           }
15         }
16       }, false); // add behavior on top instead of replacing
17     else {
18       stash();
19     }
20   }
21 }

调用stash()将当前的消息(即角色最后收到的消息)到角色的藏匿处。当在处理默认情况下在角色的消息处理函数来隐藏那些没有被其他案件处理的情况时,这是典型调用。同一消息的两次是非法藏匿;这样做会导致一个IllegalStateException被抛出。藏匿也可以此情况下,调用stath()能会导致容量违规,这导致StashOverflowException。藏匿的容量可以使用邮箱的配置的藏匿容量设置(一个Int类型)进行配置。

调用unstashAll()从藏匿到角色的邮箱进入队列消息,直到信箱(如果有的话)已经达到的能力(请注意,从藏匿处的消息前加上邮箱)。如果一个有界的邮箱溢出,一个MessageQueueAppendFailedException被抛出。在调用unstashAll()后,藏匿保证为空。

藏匿由scala.collection.immutable.Vector支持。这样一来,即使是非常大量的消息在不会对性能产生重大影响下被藏匿。

注意,藏匿是短暂的角色状态的一部分,该邮箱不像。因此,应该像具有相同属性的角色状态的其他部分进行管理。该preRestart的UntypedActorWithStash的实现将调用unstashAll(),它通常是所期望的行为。

注意:如果要强制执行,你的角色只能用一个无上限stash进行工作,那么你应该使用UntypedActorWithUnboundedStash类代替。

杀死一个角色

您可以通过发送一个
kill消息杀一个角色。这将导致角色抛出一个
ActorKilledException,引发故障。角色将暂停运作,其监管这将被要求如何处理失败,这可能意味着恢复的角色,重新启动,或完全终止它。请见 监管手段以获取更多信息。

使用Kill像下面:

1 victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());

角色与异常

在消息被actor处理的过程中可能会抛出异常,例如数据库异常。

消息会怎样

如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。另一种可能性请查看 PeekMailbox pattern

邮箱会怎样

如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。

角色会怎样

如果角色内代码抛出了异常,那么角色将被暂停,接着监管者处理开始(见监管与监控)。依赖监管者决策角色将被恢复(就像什么事情没发生),重启(擦除内部状态重新开始)或终止。

初始化模式

角色钩子的丰富的生命周期提供了实现各种初始化模式的有用工具。在一个ActorRef的生命周期,一个角色可能会经历多次重新启动后,当旧的实例替换为新的,对外面观察这是不可见的,仅仅看见ActorRef。

有人可能会想到“化身”的新实例。初始化可能需要一个角色的每一个化身,但有时人们需要初始化仅发生在第一个实例诞生时,当ActorRef被创建。以下各节提供的模式为不同的初始化需求。

通过构造函数初始化

使用构造函数初始化有着各种好处。首先,它使得有可能使用的val字段来存储任何状态,这并不在角色实例的生命周期中变化,使得角色实现更加健壮。该构造函数被角色的每一个化身调用,所以角色的内部总是可以认为正确的初始化发生。这也是这种方法的缺点,因为有当一个人想避免在重新启动时重新初始化的内部情况。例如,在重启过程,保持整个子角色通常是有用。以下部分提供了这种情况下的模式。

通过preStart初始化

在的第一个实例的初始化过程中,一个角色的preStart()方法仅仅被直接调用一次,那就是,在ActorRef的创建。在重新启动的情况下,preStart()从postRestart()被调用,因此,如果不重写,preStart()被每一个化身调用。然而,覆盖postRestart(),可以禁用此行为,并确保只调用一次preStart()。

这种模式的一个有用的用法是在重新启动时禁止创建子类新的ActorRef。这可以通过覆盖preRestart()来实现:

01 @Override
02 public void preStart() {
03   // Initialize children here
04 }
05  
06 // Overriding postRestart to disable the call to preStart()
07 // after restarts
08 @Override
09 public void postRestart(Throwable reason) {
10 }
11  
12 // The default implementation of preRestart() stops all the children
13 // of the actor. To opt-out from stopping the children, we
14 // have to override preRestart()
15 @Override
16 public void preRestart(Throwable reason, Option<Object> message)
17   throws Exception {
18   // Keep the call to postStop(), but no stopping of children
19   postStop();
20 }

请注意,该子角色还在重新启动,但不会创建新的ActorRef。对子类可以递归应用相同的原则,确保他们的preStart()方法被只在创建自己的引用时调用。

了解更多信息,请参阅What Restarting Means:

通过消息传递初始化

有这样的情况,在构造函数中,当它是不可能传递所需的所有角色初始化的信息,例如,在存在循环的依赖关系。在这种情况下,角色应该听一个初始化消息,并利用become()或有限状态机的状态对角色的初始化和未初始化的状态进行编码。

01 private String initializeMe = null;
02  
03 @Override
04 public void onReceive(Object message) throws Exception {
05   if (message.equals("init")) {
06     initializeMe = "Up and running";
07     getContext().become(new Procedure<Object>() {
08       @Override
09       public void apply(Object message) throws Exception {
10         if (message.equals("U OK?"))
11           getSender().tell(initializeMe, getSelf());
12       }
13     });
14   }
15 }

如果在初始化之前,角色可以接收消息,一个有用的工具可能是Stash,可以保存消息直到初始化完成,在角色初始化之后重新放回。

注意:这个模式应小心使用,并且当上述的模式都不适用才使用。其中一个潜在的问题是,消息可能会在发送给远程角色丢失。另外,在未初始化状态发布一个ActorRef可能导致在其接收用户信息的初始化之前已经完成。