375
京东网上商城
AKKA文档(java版)—类型化角色
3.2 类型化角色
Akka的类型化角色是活动对象(Active Object)模式的实现。Smalltalk诞生的时候,默认的方法调用由异步派发代替同步操作。
类型化角色由2部分组成,包括一个公共的接口和实现,如果你有“企业级”Java的开发经验,这对你来说会非常熟悉。与普通的角色一样,你有一个外部的API(公共接口实例),将异步的方法调用委托给实现类的一个私有实例。
类型化角色对比角色的优势是你可以有一个静态的约定,而不需要去定义你自己的消息,不好的一面就是它会限制你能做什么和不能做什么,比如你不能使用become/unbecome。
类型化角色是利用JDK Proxies 来实现的,它提供一个非常简单的API去调用拦截方法。
注意
正如普通的非类型化角色一样,类型化角色每次处理一次调用。
3.2.1 什么时候使用类型化角色
类型化角色是角色系统和非角色代码之间的美好桥梁,因为它们允许你在外部编写正常的面向对象代码。把它们想象成门:它们实际上是公共部分和私有部分之间的接口,但你并不想你的房子有很多的门,不是吗?你可以通过this blog post查看更详细的讨论。
更多的背景:TypedActor很容易作为RPC被滥用,因此TypedActor并不是我们一开始想象中的那样,能够更加容易的去正确编写高可扩展的并发软件。我们要在合适的地方使用它们。
3.2.2 工具箱
在创建第一个类型化角色之前,我们先了解一下这个工具,掌握它的功能,它位于akka.actor.TypedActor。
02 |
TypedActorExtension extension = TypedActor.get(system); //系统是一个ActorSystem对象</p>
|
04 |
TypedActor.get(system).isTypedActor(someReference); |
06 |
//返回一个外部类型化角色代理的AKKA角色 |
07 |
TypedActor.get(system).getActorRefFor(someReference); |
10 |
//方法仅在一个TypedActor的方法实现中有效 |
11 |
ActorContext context = TypedActor.context(); |
14 |
//方法只在TypedActor的方法实现中有效 |
15 |
Squarer sq = TypedActor.<Squarer>self(); |
18 |
//这意味着如果你使用它创建了其它的类型化角色实例 |
20 |
TypedActor.get(TypedActor.context()); |
警告
类型化角色和akka角色一样不暴露this引用,这一点很重要。你应该通过外部代理引用,它可以通过TypedActor.self来获得,这是你的对外身份标识,就像akka角色的对外身份标识是ActorRef一样。
3.2.3 创建类型化角色
创建类型化角色需要有一个以上的接口和一个实现接口的类。
假设入口如下所示:
001 |
import akka.actor.TypedActor;
|
004 |
import akka.dispatch.Futures;</p>
|
005 |
import scala.concurrent.Await;
|
006 |
import scala.concurrent.Future;
|
007 |
import scala.concurrent.duration.Duration;
|
008 |
import java.util.concurrent.TimeUnit;
|
010 |
import java.util.List;
|
011 |
import java.util.ArrayList;
|
012 |
import java.util.Random;
|
013 |
import akka.routing.RoundRobinGroup;
|
014 |
public class TypedActorDocTest {
|
015 |
Object someReference = null ;
|
016 |
ActorSystem system = null ;
|
018 |
static public interface Squarer {
|
019 |
void squareDontCare( int i); //fire-forget(审校者注:这个词怎么翻译?)
|
020 |
Future<Integer> square( int i); //非阻塞send-request-reply
|
021 |
Option<Integer> squareNowPlease( int i); //阻塞send-request-reply
|
022 |
int squareNow( int i); //阻塞send-request-reply
|
025 |
static class SquarerImpl implements Squarer {
|
027 |
public SquarerImpl() {
|
028 |
this .name = "default" ;
|
031 |
public SquarerImpl(String name) {
|
035 |
public void squareDontCare( int i) {
|
039 |
public Future<Integer> square( int i) {
|
040 |
return Futures.successful(i * i);
|
043 |
public Option<Integer> squareNowPlease( int i) {
|
044 |
return Option.some(i * i);
|
047 |
public int squareNow( int i) {
|
052 |
@Test public void mustGetTheTypedActorExtension() {
|
055 |
TypedActorExtension extension = TypedActor.get(system); //系统是一个ActorSystem实例
|
058 |
TypedActor.get(system).isTypedActor(someReference);
|
061 |
TypedActor.get(system).getActorRefFor(someReference);
|
064 |
// 方法只在TypedActor方法实现内部有效
|
065 |
ActorContext context = TypedActor.context();
|
068 |
// 方法只在TypedActor方法实现内部有效</pre>
|
069 |
Squarer sq = TypedActor.<Squarer>self();
|
072 |
//这意味着如果你用它创建了其它类型化角色
|
074 |
TypedActor.get(TypedActor.context());
|
075 |
} catch (Exception e) {
|
079 |
@Test public void createATypedActor() {
|
081 |
Squarer mySquarer = TypedActor.get(system).typedActorOf(
|
082 |
new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class ));
|
083 |
Squarer otherSquarer = TypedActor.get(system).typedActorOf(
|
084 |
new TypedProps<SquarerImpl>(Squarer. class , new Creator<SquarerImpl>() {
|
085 |
public SquarerImpl create() { return new SquarerImpl( "foo" ); }
|
088 |
mySquarer.squareDontCare( 10 );
|
089 |
Future<Integer> fSquare = mySquarer.square( 10 ); //A Future[Int]
|
090 |
Option<Integer> oSquare = mySquarer.squareNowPlease( 10 ); //Option[Int]
|
091 |
int iSquare = mySquarer.squareNow( 10 ); //Int
|
093 |
assertEquals( 100 , Await.result(fSquare, Duration.create( 3 , TimeUnit.SECONDS)).intValue());
|
094 |
assertEquals( 100 , oSquare.get().intValue());
|
095 |
assertEquals( 100 , iSquare);
|
097 |
TypedActor.get(system).stop(mySquarer);
|
098 |
TypedActor.get(system).poisonPill(otherSquarer);
|
099 |
} catch (Exception e) {
|
104 |
@Test public void createHierarchies() {
|
106 |
Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
|
107 |
new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class )
|
109 |
//Use "childSquarer" as a Squarer
|
110 |
} catch (Exception e) {
|
115 |
@Test public void proxyAnyActorRef() {
|
117 |
final ActorRef actorRefToRemoteActor = system.deadLetters();
|
118 |
Squarer typedActor = TypedActor.get(system).typedActorOf(
|
119 |
new TypedProps<Squarer>(Squarer. class ),actorRefToRemoteActor);
|
120 |
//Use "typedActor" as a FooBar
|
121 |
} catch (Exception e) {
|
130 |
class Named implements HasName {
|
131 |
private int id = new Random().nextInt( 1024 );
|
132 |
@Override public String name() { return "name-" + id; }
|
135 |
@Test public void typedRouterPattern() {
|
138 |
TypedActorExtension typed = TypedActor.get(system);
|
139 |
Named named1 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
140 |
Named named2 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
142 |
List<Named> routees = new ArrayList<Named>();
|
146 |
List<String> routeePaths = new ArrayList<String>();
|
147 |
routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
|
148 |
routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
|
150 |
// prepare untyped router
|
151 |
ActorRef router = system.actorOf( new RoundRobinGroup(routeePaths).props(), "router" );
|
153 |
//准备类型化代理,向“router”转发方法调用消息
|
154 |
Named typedRouter = typed.typedActorOf( new TypedProps<Named>(Named. class ), router);
|
156 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
157 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
158 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
159 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
161 |
typed.poisonPill(named1);
|
162 |
typed.poisonPill(named2);
|
163 |
typed.poisonPill(typedRouter);
|
164 |
} catch (Exception e) {
|
接口的例子:
1 |
public interface Squarer {
|
接口的实现类:
01 |
class SquarerImpl implements Squarer {
|
03 |
public SquarerImpl() {
|
04 |
this .name = "default" ;
|
07 |
public SquarerImpl(String name) {
|
创建Squarer的类型化角色最简单的方式如下:
1 |
Squarer mySquarer = TypedActor.get(system).typedActorOf( new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class ));
|
第一种类型是代理类型,第二种是代理类型的实现。如果你需要去调用一个特殊的构造器,你可以这样做:
1 |
Squarer otherSquarer = TypedActor.get(system).typedActorOf( new TypedProps<SquarerImpl>(Squarer. class , new Creator<SquarerImpl>() {
|
2 |
public SquarerImpl create() { return new SquarerImpl( "foo" ); }
|
既然提供一个Props,你可以指定使用哪一个调度程序,应该使用缺省的timeout或者别的。目前,Squarer没有定义任何方法,我们可以添加如下这些方法。
1 |
public interface Squarer {
|
2 |
void squareDontCare( int i); //fire-forget
|
3 |
Future<Integer> square( int i); //non-blocking send-request-reply
|
4 |
Option<Integer> squareNowPlease( int i); //blocking send-request-reply
|
5 |
int squareNow( int i); //blocking send-request-reply
|
那好,现在我们可以调用这些方法了,不过他们需要在SquarerImpl中实现。
01 |
class SquarerImpl implements Squarer {
|
03 |
public SquarerImpl() {
|
04 |
this .name = "default" ;
|
07 |
public SquarerImpl(String name) {
|
11 |
public void squareDontCare( int i) {
|
15 |
public Future<Integer> square( int i) {
|
16 |
return Futures.successful(i * i);
|
19 |
public Option<Integer> squareNowPlease( int i) {
|
20 |
return Option.some(i * i);
|
23 |
public int squareNow( int i) {
|
很好,现在我们已经有一个接口和它的实现类,并且知道怎么去创建一个类型化角色了,让我们了解下这些方法。
3.2.4 方法调度语义
方法返回:
1. void会被fire-and-forget语义调度,和ActorRef.tell完全一样
2. scala.concurrent.Future<?>会使用send-request-reply语义,和ActorRef.ask完全一样。
3. akka.japi.Option<?>会使用send-request-reply语义,但会堵塞等待一个应答,并且如果在timeout内没有回复,就会返回akka.japi.Option.None,或者,相反的返回akka.japi.Option.Some<?>。此调用过程中被抛出的任何异常都将被重新抛出。
4. 任何别的类型值会使用send-request-reply语义,但会阻塞等待一个回答,如果在一个timeout内抛出异常或者在调用过程中出现重新抛异常的情况,就会抛出java.util.concurrent.TimeoutException。注意,基于Java异常和反射机制,一个TimeoutException会包装在一个java.lang.reflect.UndeclaredThrowableException里,除非接口方法明确的描述TimeoutException作为一个受检异常抛出。
3.2.5 消息和不可变性
虽然Akka不能强制转换类型化角色方法的参数为不可变的,但是我们强烈建议把参数设置为不可变的。
3.2.5.1 单向(One-way)消息发送
1 |
mySquarer.squareDontCare( 10 );
|
就像上面这么简单,方法会在另一个线程里异步的执行。
3.2.5.2 双向(Request-reply)消息发送
1 |
Option<Integer> oSquare = mySquare.squareNowPlease( 10 ); //Option[Int]
|
如果需要,阻塞的时长可以配置类型化角色的Props的timeout。如果超时,它会返回None。
1 |
int iSquare = mySquarer.squareNow( 10 ); //Int
|
如果需要,阻塞的时长可以配置类型化角色的Props的timeout。如果超时,它会抛出一个java.util.concurrent.TimeoutException。这里需要注意一下,通过Java的反射机制,这样一个TimeoutException会被包装在一个java.lang.reflect.UndeclaredThrowableException中,因为接口方法没有明确描述TimeoutException作为一个受检异常抛出。为了直接得到TimeoutException,可以在接口方法中添加throws java.util.concurrent.TimeoutException。
3.2.5.3 (Request-reply-with-future) 消息发送
1 |
Future<Integer> fSquare = mySquarer.square( 10 ); //一个Future对象[Int]
|
这个调用是异步的,并且future的返回可以用于异步成分。
3.2.6 停止类型化角色
一旦Akka的类型化角色被Akka角色阻塞,当不再需要它们,必须被停掉。
1 |
TypedActor.get(system).stop(mySquarer); |
这个异步的方法会尽快的停掉类型化角色关联的代理。
1 |
TypedActor.get(system).poisonPill(otherSquarer); |
这个异步的方法会在所有调用都完成之后停掉类型化角色关联的代理。
3.2.7 类型化角色层次结构
既然你可以通过传递一个ActorContext来获得一个上下文的类型化角色,你可以通过在它上面调用typeActorOf来创建子类型化角色。
1 |
Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf( |
2 |
new TypedProps<SquarerImpl>(Squarer. class , SquarerImpl. class )
|
4 |
//Use "childSquarer" as a Squarer |
你可以通过常规的Akka角色,把UntypedActorContext作为TypedActor.get的输入参数创建一个子类型化角色。
3.2.8 监管者策略
通过你的类型化角色实现类实现TypedActor.supervisor。你可以定义策略去监管子角色,就像监管与监控(Supervision and Monitoring) and 容错(Fault Tolerance)中描述的一样。
3.2.9 接收任意的消息
如果你的TypedActor的实现类继承akka.actor.TypedActor.Receiver,所有非方法调用的消息都会传递到onReceive方法。
这允许你处理DeathWatch的Terminated消息和别的类型的消息,例如当与非类型化角色进行交互的场景。
3.2.10 生命周期回调
通过你的类型化角色实现类实现如下任何一个或所有的方法:
- TypedActor.PreStart
- TypedActor.PostStop
- TypedActor.PreRestart
- TypedActor.PostRestart
你可以用钩子方法连接到你的类型化角色的生命周期。
3.2.11 代理
你可以使用带TypedProps和Actor引用参数的 typedActorOf以代理的方式将ActorRef引用转换成类型化角色。如果你想和其他机器上的TypedActor进行远程交互,只要把ActorRef传给typedActorOf即可。
3.2.12 查找和远程处理
既然TypedActor是基于Akka Actor的,你可以用typedActorOf去代理有可能在远程节点上的ActorRef。
1 |
Squarer typedActor = TypedActor.get(system).typedActorOf( |
2 |
new TypedProps<Squarer>(Squarer. class ),actorRefToRemoteActor);
|
3 |
//Use "typedActor" as a FooBar |
3.2.13 类型化路由模式
有时候你想在多个角色之间传递消息。在Akka中最简单的实现方法是用一个路由(Router),它可以实现一个特定的路由逻辑,例如最小邮箱(smallest-mailbox)或者一致性哈希(consistent-hashing)等等。
路由没有直接提供给类型化角色,但可以通过利用一个非类型化路由和类型化代理来实现它。为了展示这一点让我们创建一些类型化角色并给他们指派随机的id,然后我们会看到路由把消息发送给了不同的角色:
01 |
@Test public void typedRouterPattern() {
|
04 |
TypedActorExtension typed = TypedActor.get(system);
|
06 |
Named named1 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
07 |
Named named2 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
09 |
List<Named> routees = new ArrayList<Named>();
|
13 |
List<String> routeePaths = new ArrayList<String>();
|
14 |
routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
|
15 |
routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
|
18 |
ActorRef router = system.actorOf( new RoundRobinGroup(routeePaths).props(), "router" );
|
20 |
// 准备类型化代理,向路由转发MethodCall消息
|
21 |
Named typedRouter = typed.typedActorOf( new TypedProps<Named>(Named. class ), router);
|
23 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
24 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
25 |
System.out.println( "actor was: " + typedRouter.name()); // name-243
|
26 |
System.out.println( "actor was: " + typedRouter.name()); // name-614
|
28 |
typed.poisonPill(named1);
|
29 |
typed.poisonPill(named2);
|
30 |
typed.poisonPill(typedRouter);
|
32 |
} catch (Exception e) {
|
为了在这样的角色实例中间轮询,你可以创建一个简单的非类型化路由,并用一个TypedActor实现这个路由的外观模式,就像下面的例子展示的。这样会起作用是因为类型化角色利用和普通角色相同的通讯机制,以及它们的方法调用被转化为MethodCall消息。
02 |
TypedActorExtension typed = TypedActor.get(system); |
04 |
Named named1 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
06 |
Named named2 = typed.typedActorOf( new TypedProps<Named>(Named. class ));
|
08 |
List<Named> routees = new ArrayList<Named>();
|
12 |
List<String> routeePaths = new ArrayList<String>();
|
13 |
routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress()); |
14 |
routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress()); |
最后更新:2017-05-23 16:02:29