AKKA文档(java版)—角色(一)
角色
角色模型对编写并发、分布式系统进行了高度抽象。它减轻了开发者必须对互斥锁与线程管理的负担,更容易编写出正确的并发与并行系统。早在1973 年 Carl Hewitt 发表的论文中定义了角色,但一直流行于Erlang 语言中,随后被爱立信公司应用于建立高并发、可靠通信系统,取得了巨大成功。
Akka 框架里面角色的API 跟Scala 框架里面角色相似,后者一些语法曾经模仿Erlang语言。
创建角色
注意:由于Akka强迫父级监管者监督每一个角色和(潜在的子级)监管者,建议你熟悉角色系统、监管、监控,这将可能帮助你阅读角色参考、路径和地址。
定义一个角色类
在Java里面,角色是通过继承UntypedActor 类及实现onReceive方法来实现的.这个方法将message作为参数。
这里有个例子:
01 |
import akka.actor.UntypedActor;
|
02 |
import akka.event.Logging;
|
03 |
import akka.event.LoggingAdapter;
|
04 |
05 |
public class MyUntypedActor extends UntypedActor {
|
06 |
LoggingAdapter log = Logging.getLogger(getContext().system(), this );
|
07 |
08 |
public void onReceive(Object message) throws Exception {
|
09 |
if (message instanceof String) {
|
10 |
log.info( "Received String message: {}" , message);
|
11 |
getSender().tell(message, getSelf());
|
12 |
} else
|
13 |
unhandled(message);
|
14 |
}
|
15 |
} |
Props
Props 是一个配置类,它的作用是对创建角色确认选项。把它作为不可变的、因此可自由共享规则对创建一个角色包括相关部署信息(例如:使用调度,详见下文)。下面是如何创建一个Props 实例的一些例子:
1 |
import akka.actor.Props;
|
2 |
import akka.japi.Creator;
|
1 |
static class MyActorC implements Creator<MyActor> {
|
2 |
@Override public MyActor create() {
|
3 |
return new MyActor( "..." );
|
4 |
}
|
5 |
} |
6 |
7 |
Props props1 = Props.create(MyUntypedActor. class );
|
8 |
Props props2 = Props.create(MyActor. class , "..." );
|
9 |
Props props3 = Props.create( new MyActorC());
|
第二行显示如何传递构造参数给Actor去创建。在构建Props对象时,存在匹配的构造是被验证的,如果发现不存在或者存在多个匹配构造,会导致一个IllegalArgumentEception。
第三行验证Creator使用。用来验证Props构造的Creator必须是静态。类型参数是用来判断生成角色类的,如果充分擦除,将落回到Actor类,一个参数化工厂例子,可以是:
1 |
static class ParametricCreator<T extends MyActor> implements Creator<T> {
|
2 |
@Override public T create() {
|
3 |
// ... fabricate actor here
|
4 |
}
|
5 |
} |
注意:
由于邮箱要求——如使用双端队列为基础的邮箱使用的隐藏角色——被拾起,在创建之前,角色类型需要已知的,这是Creator类型参数允许的。因此对你用到角色一定尽可能使用特定类型。
建议准则
这是个好的主意在UntypedActor类里面提供静态工厂方法,该方法帮助创建尽可能接近角色定义的合适Props 类。这也允许使用基于Creator方法,该方法静态验证所使用的构造函数确实存在,而不是只在运行时检查依赖。
01 |
public class DemoActor extends UntypedActor {
|
02 |
03 |
/**
|
04 |
* Create Props for an actor of this type.
|
05 |
* @param magicNumber The magic number to be passed to this actor’s constructor.
|
06 |
* @return a Props for creating this actor, which can then be further configured
|
07 |
* (e.g. calling `.withDispatcher()` on it)
|
08 |
*/
|
09 |
public static Props props( final int magicNumber) {
|
10 |
return Props.create( new Creator<DemoActor>() {
|
11 |
private static final long serialVersionUID = 1L;
|
12 |
13 |
@Override
|
14 |
public DemoActor create() throws Exception {
|
15 |
return new DemoActor(magicNumber);
|
16 |
}
|
17 |
});
|
18 |
}
|
19 |
20 |
final int magicNumber;
|
21 |
22 |
public DemoActor( int magicNumber) {
|
23 |
this .magicNumber = magicNumber;
|
24 |
}
|
25 |
26 |
@Override
|
27 |
public void onReceive(Object msg) {
|
28 |
// some behavior here
|
29 |
}
|
30 |
31 |
} |
32 |
33 |
system.actorOf(DemoActor.props( 42 ), "demo" );
|
用Props创建角色
角色通过传入Props实例进入actorOf 工厂方法,该工厂方法在ActorSystem 和ActorContext类中提供使用。
1 |
import akka.actor.ActorRef;
|
2 |
import akka.actor.ActorSystem;
|
1 |
// ActorSystem is a heavy object: create only one per application |
2 |
final ActorSystem system = ActorSystem.create( "MySystem" );
|
3 |
final ActorRef myActor = system.actorOf(Props.create(MyUntypedActor. class ),
|
4 |
"myactor" );
|
使用ActorSystem 将创建顶级角色,由角色系统提供守护的角色监管,同时使用一个角色的上下文将创建一个子角色。
1 |
class A extends UntypedActor {
|
2 |
final ActorRef child =
|
3 |
getContext().actorOf(Props.create(MyUntypedActor. class ), "myChild" );
|
4 |
// plus some behavior ...
|
5 |
} |
建议创建一个包含子类、超子类等等的层次结构,使得它适合具有逻辑性故障处理应用程序结构,详见Actor Systems。
actorOf 方法调用返回ActorRef实例。这是对角色实例处理,并与它进行交互的唯一途径。该ActorRef是不可变的,并有一个与它代表的一对一关系角色。该ActorRef是可序列化的和具备网络意识的。这意味着,你可以把它进行序列化,将它通过网络发送,在远程主机上使用它,它仍然代表着在原始的节点上相同的角色,横跨网络。
名称参数是可选的,但是你应该给你的角色起个更好名称,因为这是用在日志消息里面,并确定角色。该名称不能为空或以$开头,但它可能包含URL编码的字符(例如,%20代表空格)。如果给定的名称已被相同父类中的其他子类使用,那将抛出InvalidActorNameException异常。
角色是自动异步启动当被创建时候。
依赖注入
如果你的未类型化的角色有一个携带参数的构造函数,然后那些需要Prosp的一部分,以及,如上所述。但在某些情况下,必须使用一个工厂方法,例如当实际构造函数参数由一个依赖注入框架决定时。
1 |
import akka.actor.Actor;
|
2 |
import akka.actor.IndirectActorProducer;
|
01 |
class DependencyInjector implements IndirectActorProducer {
|
02 |
final Object applicationContext;
|
03 |
final String beanName;
|
04 |
05 |
public DependencyInjector(Object applicationContext, String beanName) {
|
06 |
this .applicationContext = applicationContext;
|
07 |
this .beanName = beanName;
|
08 |
}
|
09 |
10 |
@Override
|
11 |
public Class<? extends Actor> actorClass() {
|
12 |
return MyActor. class ;
|
13 |
}
|
14 |
15 |
@Override
|
16 |
public MyActor produce() {
|
17 |
MyActor result;
|
18 |
// obtain fresh Actor instance from DI framework ...
|
19 |
return result;
|
20 |
}
|
21 |
} |
22 |
23 |
final ActorRef myActor = getContext().actorOf(
|
24 |
Props.create(DependencyInjector. class , applicationContext, "MyActor" ),
|
25 |
"myactor3" );
|
警告:
你可能有时会倾向于提供一个IndirectActorProducer它总是返回相同的实例,例如:通过使用一个静态字段。这是不支持的,因为它违背了一个角色重启含义,这是这里所描述的含义:什么重新启动方式。当使用一个依赖注入框架时,角色Beans 一定不能是单例模式范围。
依赖注入和依赖注入框架集成技术更深入地介绍了使用Akka与依赖注入指导方针和在类型安全的活化剂方面的Akka Java Spring 指导。
Inbox
当写在角色外面的代码,应与角色进行沟通,在ask模式可以是一个解决方案(见下文),但有两个事情不能做:接收多个回复(例如:通过订阅的ActorRef到通知服务)和监控其他角色的生命周期。为了这些目的这里有个Inbox 类:
1 |
final Inbox inbox = Inbox.create(system);
|
2 |
inbox.send(target, "hello" );
|
3 |
assert inbox.receive(Duration.create( 1 , TimeUnit.SECONDS)).equals( "world" );
|
send方法包装一个标准的tell和提供一个内部的角色引用作为发送者。在最后一行将允许该回复被接收。监控一个角色同时也十分简单。
1 |
final Inbox inbox = Inbox.create(system);
|
2 |
inbox.watch(target); |
3 |
target.tell(PoisonPill.getInstance(), ActorRef.noSender()); |
4 |
assert inbox.receive(Duration.create( 1 , TimeUnit.SECONDS)) instanceof Terminated;
|
UntypedActor 应用程序接口
UntypedActor 类仅仅定义一个抽象方法,就是上面提到onReceive(Object message)方法,该方法实现了角色行为。如果当前角色行为不匹配一个接收信息,建议调用unhandled 方法,该方法默认将发出一个new akka.actor.UnhandledMessage(message, sender, recipient)在系统角色事件流中(设置配置项akka.actor.debug.unhandled 到on 让它们转化为实际调试信息)。另外,它提供:
- getSelf 角色ActorRef的引用
- getSender 最后接收到信息角色发送者角色引用,典型使用方法将在Reply to messages里面介绍。
- supervisorStrategy 用户可重写定义的策略,用来监管子角色。这种策略通常是角色中声明,以便在裁决者函数中使用了角色的内部状态:由于一个消息未能传达发送到监管者并处理,像其他的消息(尽管是正常的行为之外),在角色中所有的值和变量都是可用的,就像getSender引用(这将由直接的子类报告故障,如果在一个深层次后代类发生原始故障,则仍立即向上一级报告故障)。
-
getContext 对角色与当前信息暴露上下文信息
- 创建一个子角色工厂方法actorOf
- 属于角色的系统
- 父级监管者
- 监管的子类
- 监控生命周期
- 热插拔行为栈,在热插拔将介绍
剩余的可见的方法是用户可重写的生命周期钩子,将在以下描述:
01 |
public void preStart() {
|
02 |
} |
03 |
04 |
public void preRestart(Throwable reason, scala.Option<Object> message) {
|
05 |
for (ActorRef each : getContext().getChildren()) {
|
06 |
getContext().unwatch(each);
|
07 |
getContext().stop(each);
|
08 |
}
|
09 |
postStop();
|
10 |
} |
11 |
12 |
public void postRestart(Throwable reason) {
|
13 |
preStart();
|
14 |
} |
15 |
16 |
public void postStop() {
|
17 |
} |
上面显示实现是默认由UntypedActor 类提供。
角色生命周期
在角色系统中的路径代表一个“地方”,这可能被一个存活着的的角色占用着。最初,(除了系统初始化角色)的路径是空的。当actorOf()被调用时,指定一个由通过Props 描述给定的路径角色的化身。一个角色化身由路径和一个UID确定。重新启动仅仅交换Props 定义的Actor 实例,但化身与UID依然是相同的。
当该角色停止时,化身的生命周期也相应结束了。在这一刻时间上相对应的生命周期事件也将被调用和监管角色也被通知终止结束。化身被停止之后,路径也可以重复被通过actorOf() 方法创建的角色使用。在这种情况下,新的化身的名称跟与前一个将是相同的而是UIDs将会有所不同。
一个ActorRef 总是代表一个化身(路径和UID)而不只是一个给定的路径。因此,如果一个角色停止,一个新的具有相同名称创建的旧化身的ActorRef不会指向新的。
在另一方面ActorSelection 指向该路径(或多个路径在使用通配符时),并且是完全不知道其化身当前占用着它。 由于这个原因导致ActorSelection 不能被监视到。通过发送识别信息到将被回复包含正确地引用(见通过角色选择集识别角色)的 ActorIdentity 的ActorSelection 来解决当前化身ActorRef 存在该路径之下。这也可以用ActorSelection 类的resolveOne方法来解决,这将返回一个匹配ActorRef 的Future 。
生命周期监控又名临终看护
当另一个角色终止时,为了通知被通知(即永久性地停止,而不是暂时的失败和重新启动),一个角色可以自己注册为接收在终止上层的其他角色发送的终止消息,其他演员出动(请参阅停止演员)。这项服务是由角色系统的临终看护组件提供。
注册一个监视器是很容易的(见第四行,剩下的就是用于展示整个功能):
1 |
import akka.actor.Terminated;
|
01 |
public class WatchActor extends UntypedActor {
|
02 |
final ActorRef child = this .getContext().actorOf(Props.empty(), "child" );
|
03 |
{
|
04 |
this .getContext().watch(child); // <-- the only call needed for registration
|
05 |
}
|
06 |
ActorRef lastSender = getContext().system().deadLetters();
|
07 |
08 |
@Override
|
09 |
public void onReceive(Object message) {
|
10 |
if (message.equals( "kill" )) {
|
11 |
getContext().stop(child);
|
12 |
lastSender = getSender();
|
13 |
} else if (message instanceof Terminated) {
|
14 |
final Terminated t = (Terminated) message;
|
15 |
if (t.getActor() == child) {
|
16 |
lastSender.tell( "finished" , getSelf());
|
17 |
}
|
18 |
} else {
|
19 |
unhandled(message);
|
20 |
}
|
21 |
}
|
22 |
} |
但是应当注意的是,产生的终止消息独立于注册和终止发生的顺序。特别是,监控角色将接收一个终止信息即使被监控角色已经被终止在注册时候。
注册多次并不必然导致对多个消息被产生,但不保证只有一个对应这样的消息被接收:如果被监控角色终止已经发生和发送的消息排队等候着,在另一个注册完成之前,该消息已经处理完,然后第二消息将会排队,是因为已经结束角色的监控的注册导致终止信息立刻产生。
使用getContext().unwatch(target)方法从监控另一个角色生命活力撤销下来也是有可能的。这个工作即使已终止消息已经排队于邮箱中,在调用unwatch方法后对于那个角色将没有终止消息被处理。
启动钩子
在正确启动角色之后,preStart方法被调用。
1 |
@Override |
2 |
public void preStart() {
|
3 |
child = getContext().actorOf(Props.empty());
|
4 |
} |
第一次创建角色时,该方法被调用。在重新启动期间,它被postRestart的默认实现调用,这意味着通过重写该方法,你可以选择此方法中初始化代码是否被调用,对这个角色或每次重启仅只调用一次。在一个角色类的实例创建时,角色的构造函数的一部分的初始化代码将每次都被调用,这发生在每次重启时。
重启钩子
所有角色被监督着,即用故障处理策略链接到另一个角色。当处理一个消息是,抛出一个异常的情况下,演员可能重新启动(见监管与监控)抛出一个异常。这重启涉及上述提到钩子:
1. 旧角色是通过调用preRestart方法进行通知的,这伴随着造成重启的异常与绑定该异常的消息;处理一个消息没有造成这个重启发生,则后者可能也没有发生,例如,当一个监管者不捕获该异常,则由其监管者重启又或者如果由于一个同类的失败,一个角色将被重新启动。如果消息是可用的,那么该消息的发件人也可以通过正常方式访问的(即通过调用getSender())。
这个方法用在这些地方时最好的,例如:清除,准备交到新的角色实例等等。默认它停止所有子实例和调用postStop方法。
2. 来自actorOf方法调用的初始化工厂用来产生新的实例。
3. 新角色的postRestart方法被调用时这引起了重启异常。默认情况下,preStart 是被调用,就如同在正常启动的情况下。
一个角色重启仅替换实际角色的对象;邮箱中的内容是不受重启影响,所以消息的处理将在postRestart钩子返回后恢复。引发异常的消息将不会再接收。当重启时候,发送到角色的任何消息将像平常一样排队到它的邮箱。
注意:要知道,相关用户失败消息的顺序是不确定的。特别是,一个父类可能会重新启动其子类之前它已经处理了在失败之前子类发送故障的的最后消息。见讨论:消息顺序的详细信息。
终止钩子
终止一个角色之后,其postStop钩子被调用时,其可能用于例如从其他服务注销这个角色。在这个角色的消息队列已禁用之后,这个钩子仍保证运行,即送到已终止角色的信息将被重定向到ActorSystem的deadLetters。
识别角色通过角色选择集
作为角色的引用,路径和地址描述,每个角色都有一个唯一的逻辑路径,这是由以下的子类到父类直到达到角色系统的根的角色的链得到的,它有一个物理路径,如果监管链包括任何远程监管者,这可能会有所不同。这些路径是由系统使用来查找角色,如当接收到一个远程的消息和收件人进行搜索,但他们也有更直接用法:角色可以查找其他角色通过指定绝对或相对路径,逻辑或物理,并接收返回的结果的ActorSelection:
1 |
// will look up this absolute path |
2 |
getContext().actorSelection( "/user/serviceA/actor" );
|
3 |
// will look up sibling beneath same supervisor |
4 |
getContext().actorSelection( "../joe" );
|
其中指定的路径被解释为一个java.net.URI, 它以 / 分隔成路径段. 如果路径以 /开始, 表示一个绝对路径,从根监管者 ( “/user”的父亲)开始查找; 否则是从当前角色开始。如果某一个路径段为 .., 会找到当前所遍历到的角色的上一级, 否则则会向下一级寻找具有该名字的子角色。 必须注意的是角色路径中的.. 总是表示逻辑结构,也就是其监管者。
一个角色选择集的路径元素可以包含通配符,允许消息额广播到该选择集:
1 |
// will look all children to serviceB with names starting with worker |
2 |
getContext().actorSelection( "/user/serviceB/worker*" );
|
3 |
// will look up all siblings beneath same supervisor |
4 |
getContext().actorSelection( "../*" );
|
信息可以通过ActorSelection发送和当传送的每个消息时,查找ActorSelection的路径。如果选择集不匹配任何角色的消息将被丢弃。
为了获得一个ActorSelection的ActorRef,你需要发送一个消息到选择集和使用来自橘色的回复的getSender引用。有一个内置的识别信息,即所有角色都理解并自动回复一个包含ActorRef的ActorIdentity消息。此消息由该角色特殊处理,在这个意义上说是穿越的,如果一个具体的名称查找失败(即非通配符路径元素不符合一个存在的角色)然后产生一个消极结果。请注意,这并不意味着传递的答复是有保障的,但它仍然是一个正常的消息。
1 |
import akka.actor.ActorIdentity;
|
2 |
import akka.actor.ActorSelection;
|
3 |
import akka.actor.Identify;
|
01 |
public class Follower extends UntypedActor {
|
02 |
final String identifyId = "1" ;
|
03 |
{
|
04 |
ActorSelection selection =
|
05 |
getContext().actorSelection( "/user/another" );
|
06 |
selection.tell( new Identify(identifyId), getSelf());
|
07 |
}
|
08 |
ActorRef another;
|
09 |
10 |
final ActorRef probe;
|
11 |
public Follower(ActorRef probe) {
|
12 |
this .probe = probe;
|
13 |
}
|
14 |
15 |
@Override
|
16 |
public void onReceive(Object message) {
|
17 |
if (message instanceof ActorIdentity) {
|
18 |
ActorIdentity identity = (ActorIdentity) message;
|
19 |
if (identity.correlationId().equals(identifyId)) {
|
20 |
ActorRef ref = identity.getRef();
|
21 |
if (ref == null )
|
22 |
getContext().stop(getSelf());
|
23 |
else {
|
24 |
another = ref;
|
25 |
getContext().watch(another);
|
26 |
probe.tell(ref, getSelf());
|
27 |
}
|
28 |
}
|
29 |
} else if (message instanceof Terminated) {
|
30 |
final Terminated t = (Terminated) message;
|
31 |
if (t.getActor().equals(another)) {
|
32 |
getContext().stop(getSelf());
|
33 |
}
|
34 |
} else {
|
35 |
unhandled(message);
|
36 |
}
|
37 |
}
|
38 |
} |
您也可以取得一个ActorSelection的ActorRef通过ActorSelection的resolveOne方法。它返回匹配ActorRef的Future,如果这样一个角色存在。如果没有这样的角色存在或鉴定所提供的超时时间内没有完成,它将已失败告终akka.actor.ActorNotFound。
远程角色地址也可以查找,如果远程被启用:
1 |
getContext().actorSelection( "akka.tcp://app@otherhost:1234/user/serviceB" );
|
一个关于actor查找的示例见 远程查找.
注意:在支持actorSelection,actorFor是被废弃,因为用actorFor获得的角色引用对本地与远程角色表现不同。在一个本地角色引用的情况下,查找之前命名的演员需要存在,否则所获取的引用将是一个EmptyLocalActorRef。即使在获取角色引用之后,一个真实路径的角色才被创建,这时也是可以获取的。对于远程角色引用通过actorFor来获取的行为不同的,发送信息到该引用上将在覆盖下通过在远程系统给每一个消息发送的路径查找角色。
最后更新:2017-05-23 14:35:58