阅读375 返回首页    go 京东网上商城


AKKA文档(java版)—类型化角色

3.2 类型化角色

Akka的类型化角色是活动对象(Active Object)模式的实现。Smalltalk诞生的时候,默认的方法调用由异步派发代替同步操作。
类型化角色由2部分组成,包括一个公共的接口和实现,如果你有“企业级”Java的开发经验,这对你来说会非常熟悉。与普通的角色一样,你有一个外部的API(公共接口实例),将异步的方法调用委托给实现类的一个私有实例。

类型化角色对比角色的优势是你可以有一个静态的约定,而不需要去定义你自己的消息,不好的一面就是它会限制你能做什么和不能做什么,比如你不能使用become/unbecome。
类型化角色是利用JDK Proxies 来实现的,它提供一个非常简单的API去调用拦截方法。
注意
正如普通的非类型化角色一样,类型化角色每次处理一次调用。
3.2.1 什么时候使用类型化角色
类型化角色是角色系统和非角色代码之间的美好桥梁,因为它们允许你在外部编写正常的面向对象代码。把它们想象成门:它们实际上是公共部分和私有部分之间的接口,但你并不想你的房子有很多的门,不是吗?你可以通过this blog post查看更详细的讨论。
更多的背景:TypedActor很容易作为RPC被滥用,因此TypedActor并不是我们一开始想象中的那样,能够更加容易的去正确编写高可扩展的并发软件。我们要在合适的地方使用它们。

3.2.2 工具箱
在创建第一个类型化角色之前,我们先了解一下这个工具,掌握它的功能,它位于akka.actor.TypedActor。

01 //返回类型化角色的表达式
02 TypedActorExtension extension = TypedActor.get(system); //系统是一个ActorSystem对象</p>
03 //返回引用是否是一个类型化角色代理
04 TypedActor.get(system).isTypedActor(someReference);
05  
06 //返回一个外部类型化角色代理的AKKA角色
07 TypedActor.get(system).getActorRefFor(someReference);
08  
09 //返回当前ActorContext
10 //方法仅在一个TypedActor的方法实现中有效
11 ActorContext context = TypedActor.context();
12  
13 //返回当前类型化角色的外部代理
14 //方法只在TypedActor的方法实现中有效
15 Squarer sq = TypedActor.<Squarer>self();
16  
17 //返回一个类型化角色的上下文实例
18 //这意味着如果你使用它创建了其它的类型化角色实例
19 //它们将是当前这个类型化角色的子角色
20 TypedActor.get(TypedActor.context());

警告

        类型化角色和akka角色一样不暴露this引用,这一点很重要。你应该通过外部代理引用,它可以通过TypedActor.self来获得,这是你的对外身份标识,就像akka角色的对外身份标识是ActorRef一样。

3.2.3 创建类型化角色

创建类型化角色需要有一个以上的接口和一个实现接口的类。

假设入口如下所示:

001 import akka.actor.TypedActor;
002 import akka.actor.*;
003 import akka.japi.*;
004 import akka.dispatch.Futures;</p>
005 import scala.concurrent.Await;
006 import scala.concurrent.Future;
007 import scala.concurrent.duration.Duration;
008 import java.util.concurrent.TimeUnit;
009  
010 import java.util.List;
011 import java.util.ArrayList;
012 import java.util.Random;
013 import akka.routing.RoundRobinGroup;
014 public class TypedActorDocTest {
015     Object someReference = null;
016     ActorSystem system = null;
017  
018     static public interface Squarer {
019         void squareDontCare(int i); //fire-forget(审校者注:这个词怎么翻译?)
020         Future<Integer> square(int i); //非阻塞send-request-reply
021         Option<Integer> squareNowPlease(int i);//阻塞send-request-reply
022         int squareNow(int i); //阻塞send-request-reply
023     }
024  
025     static class SquarerImpl implements Squarer {
026         private String name;
027         public SquarerImpl() {
028             this.name = "default";
029         }
030  
031         public SquarerImpl(String name) {
032             this.name = name;
033         }
034  
035         public void squareDontCare(int i) {
036             int sq = i * i; //Nobody cares <img src="https://ifeve.com/wp-includes/images/smilies/frownie.png" alt=":(" >
037         }
038  
039         public Future<Integer> square(int i) {
040             return Futures.successful(i * i);
041         }
042  
043         public Option<Integer> squareNowPlease(int i) {
044             return Option.some(i * i);
045         }
046  
047         public int squareNow(int i) {
048             return i * i;
049         }
050     }
051  
052     @Test public void mustGetTheTypedActorExtension() {
053         try {
054             //返回类型化角色的的表达式
055             TypedActorExtension extension = TypedActor.get(system); //系统是一个ActorSystem实例
056  
057             //返回引用是否是一个类型化角色代理
058             TypedActor.get(system).isTypedActor(someReference);
059  
060             //返回类型化角色代理的AKKA角色
061             TypedActor.get(system).getActorRefFor(someReference);
062  
063             //返回当前ActorContext
064             // 方法只在TypedActor方法实现内部有效
065             ActorContext context = TypedActor.context();
066  
067             //返回当前类型化角色的外部代理
068             // 方法只在TypedActor方法实现内部有效</pre>
069             Squarer sq = TypedActor.<Squarer>self();
070  
071             //返回类型化角色的上下文实例
072             //这意味着如果你用它创建了其它类型化角色
073             //它们就是当前类型化角色的子角色
074             TypedActor.get(TypedActor.context());
075         catch (Exception e) {
076             //dun care
077         }
078      }
079      @Test public void createATypedActor() {
080          try {
081              Squarer mySquarer = TypedActor.get(system).typedActorOf(
082                  new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));
083              Squarer otherSquarer = TypedActor.get(system).typedActorOf(
084                  new TypedProps<SquarerImpl>(Squarer.classnew Creator<SquarerImpl>() {
085                      public SquarerImpl create() { return new SquarerImpl("foo"); }
086              }),"name");
087  
088              mySquarer.squareDontCare(10);
089              Future<Integer> fSquare = mySquarer.square(10); //A Future[Int]
090              Option<Integer> oSquare = mySquarer.squareNowPlease(10); //Option[Int]
091              int iSquare = mySquarer.squareNow(10); //Int
092  
093              assertEquals(100, Await.result(fSquare, Duration.create(3, TimeUnit.SECONDS)).intValue());
094              assertEquals(100, oSquare.get().intValue());
095              assertEquals(100, iSquare);
096  
097              TypedActor.get(system).stop(mySquarer);
098              TypedActor.get(system).poisonPill(otherSquarer);
099         catch(Exception e) {
100              //忽略
101         }
102     }
103  
104     @Test public void createHierarchies() {
105         try {
106             Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
107                 new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)
108             );
109             //Use "childSquarer" as a Squarer
110         catch (Exception e) {
111             //dun care
112         }
113     }
114  
115     @Test public void proxyAnyActorRef() {
116         try {
117             final ActorRef actorRefToRemoteActor = system.deadLetters();
118             Squarer typedActor = TypedActor.get(system).typedActorOf(
119             new TypedProps<Squarer>(Squarer.class),actorRefToRemoteActor);
120             //Use "typedActor" as a FooBar
121         catch (Exception e) {
122             //dun care
123         }
124     }
125  
126     interface HasName {
127         String name();
128     }
129  
130    class Named implements HasName {
131         private int id = new Random().nextInt(1024);
132         @Override public String name() { return "name-" + id; }
133     }
134  
135     @Test public void typedRouterPattern() {
136        try {
137            // prepare routees
138            TypedActorExtension typed = TypedActor.get(system);
139            Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
140            Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));
141  
142            List<Named> routees = new ArrayList<Named>();
143            routees.add(named1);
144            routees.add(named2);
145  
146            List<String> routeePaths = new ArrayList<String>();
147            routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
148            routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
149  
150            // prepare untyped router
151            ActorRef router = system.actorOf(new RoundRobinGroup(routeePaths).props(), "router");
152  
153           //准备类型化代理,向“router”转发方法调用消息
154           Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);
155  
156            System.out.println("actor was: " + typedRouter.name()); // name-243
157            System.out.println("actor was: " + typedRouter.name()); // name-614
158            System.out.println("actor was: " + typedRouter.name()); // name-243
159            System.out.println("actor was: " + typedRouter.name()); // name-614
160  
161            typed.poisonPill(named1);
162            typed.poisonPill(named2);
163            typed.poisonPill(typedRouter);
164        catch (Exception e) {
165            //dun care
166        }
167     }
168 }

接口的例子:

1     public interface Squarer {
2         //类型化的角色接口方法 ...
3     }

接口的实现类:

01 class SquarerImpl implements Squarer {
02     private String name;
03     public SquarerImpl() {
04         this.name = "default";
05     }
06  
07     public SquarerImpl(String name) {
08         this.name = name;
09     }
10  
11     //类型化的角色方法实现 ...
12 }

创建Squarer的类型化角色最简单的方式如下:

1 Squarer mySquarer = TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));

      第一种类型是代理类型,第二种是代理类型的实现。如果你需要去调用一个特殊的构造器,你可以这样做:

1 Squarer otherSquarer = TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class,new Creator<SquarerImpl>() {
2     public SquarerImpl create() { return new SquarerImpl("foo"); }
3 }),"name");

 既然提供一个Props,你可以指定使用哪一个调度程序,应该使用缺省的timeout或者别的。目前,Squarer没有定义任何方法,我们可以添加如下这些方法。

1 public interface Squarer {
2     void squareDontCare(int i); //fire-forget
3     Future<Integer> square(int i); //non-blocking send-request-reply
4     Option<Integer> squareNowPlease(int i);//blocking send-request-reply
5     int squareNow(int i); //blocking send-request-reply
6 }

那好,现在我们可以调用这些方法了,不过他们需要在SquarerImpl中实现。

01 class SquarerImpl implements Squarer {
02     private String name;
03     public SquarerImpl() {
04         this.name = "default";
05     }
06  
07     public SquarerImpl(String name) {
08         this.name = name;
09     }
10  
11     public void squareDontCare(int i) {
12         int sq = i * i; //Nobody cares <img src="https://ifeve.com/wp-includes/images/smilies/frownie.png" alt=":(" >
13     }
14  
15     public Future<Integer> square(int i) {
16         return Futures.successful(i * i);
17     }
18  
19     public Option<Integer> squareNowPlease(int i) {
20         return Option.some(i * i);
21     }
22  
23     public int squareNow(int i) {
24        return i * i;
25     }
26 }

很好,现在我们已经有一个接口和它的实现类,并且知道怎么去创建一个类型化角色了,让我们了解下这些方法。

3.2.4 方法调度语义

         方法返回:

        1. void会被fire-and-forget语义调度,和ActorRef.tell完全一样

        2. scala.concurrent.Future<?>会使用send-request-reply语义,和ActorRef.ask完全一样。

        3. akka.japi.Option<?>会使用send-request-reply语义,但会堵塞等待一个应答,并且如果在timeout内没有回复,就会返回akka.japi.Option.None,或者,相反的返回akka.japi.Option.Some<?>。此调用过程中被抛出的任何异常都将被重新抛出。

        4. 任何别的类型值会使用send-request-reply语义,但会阻塞等待一个回答,如果在一个timeout内抛出异常或者在调用过程中出现重新抛异常的情况,就会抛出java.util.concurrent.TimeoutException。注意,基于Java异常和反射机制,一个TimeoutException会包装在一个java.lang.reflect.UndeclaredThrowableException里,除非接口方法明确的描述TimeoutException作为一个受检异常抛出。

3.2.5 消息和不可变性

         虽然Akka不能强制转换类型化角色方法的参数为不可变的,但是我们强烈建议把参数设置为不可变的。

3.2.5.1 单向(One-way)消息发送

1 mySquarer.squareDontCare(10);

就像上面这么简单,方法会在另一个线程里异步的执行。

3.2.5.2 双向(Request-reply)消息发送

1 Option<Integer> oSquare = mySquare.squareNowPlease(10);//Option[Int]

如果需要,阻塞的时长可以配置类型化角色的Props的timeout。如果超时,它会返回None。

1 int iSquare = mySquarer.squareNow(10);//Int

如果需要,阻塞的时长可以配置类型化角色的Props的timeout。如果超时,它会抛出一个java.util.concurrent.TimeoutException。这里需要注意一下,通过Java的反射机制,这样一个TimeoutException会被包装在一个java.lang.reflect.UndeclaredThrowableException中,因为接口方法没有明确描述TimeoutException作为一个受检异常抛出。为了直接得到TimeoutException,可以在接口方法中添加throws java.util.concurrent.TimeoutException。

3.2.5.3 (Request-reply-with-future) 消息发送

1 Future<Integer> fSquare = mySquarer.square(10);//一个Future对象[Int]

这个调用是异步的,并且future的返回可以用于异步成分。

 

3.2.6 停止类型化角色

一旦Akka的类型化角色被Akka角色阻塞,当不再需要它们,必须被停掉。

1 TypedActor.get(system).stop(mySquarer);
2 </span></span>

这个异步的方法会尽快的停掉类型化角色关联的代理。

1 TypedActor.get(system).poisonPill(otherSquarer);

这个异步的方法会在所有调用都完成之后停掉类型化角色关联的代理。

 

3.2.7 类型化角色层次结构

既然你可以通过传递一个ActorContext来获得一个上下文的类型化角色,你可以通过在它上面调用typeActorOf来创建子类型化角色。

1 Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
2     new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)
3 );
4 //Use "childSquarer" as a Squarer

你可以通过常规的Akka角色,把UntypedActorContext作为TypedActor.get的输入参数创建一个子类型化角色。

 

3.2.8 监管者策略

通过你的类型化角色实现类实现TypedActor.supervisor。你可以定义策略去监管子角色,就像监管与监控(Supervision and Monitoring) and 容错(Fault Tolerance)中描述的一样。

 

3.2.9 接收任意的消息

如果你的TypedActor的实现类继承akka.actor.TypedActor.Receiver,所有非方法调用的消息都会传递到onReceive方法。

这允许你处理DeathWatch的Terminated消息和别的类型的消息,例如当与非类型化角色进行交互的场景。

 

3.2.10 生命周期回调

通过你的类型化角色实现类实现如下任何一个或所有的方法:

  • TypedActor.PreStart
  • TypedActor.PostStop
  • TypedActor.PreRestart
  • TypedActor.PostRestart

你可以用钩子方法连接到你的类型化角色的生命周期。

 

3.2.11 代理

你可以使用带TypedProps和Actor引用参数的 typedActorOf以代理的方式将ActorRef引用转换成类型化角色。如果你想和其他机器上的TypedActor进行远程交互,只要把ActorRef传给typedActorOf即可。

 

3.2.12 查找和远程处理

既然TypedActor是基于Akka Actor的,你可以用typedActorOf去代理有可能在远程节点上的ActorRef。

1 Squarer typedActor = TypedActor.get(system).typedActorOf(
2     new TypedProps<Squarer>(Squarer.class),actorRefToRemoteActor);
3 //Use "typedActor" as a FooBar

3.2.13 类型化路由模式

有时候你想在多个角色之间传递消息。在Akka中最简单的实现方法是用一个路由(Router),它可以实现一个特定的路由逻辑,例如最小邮箱(smallest-mailbox)或者一致性哈希(consistent-hashing)等等。

路由没有直接提供给类型化角色,但可以通过利用一个非类型化路由和类型化代理来实现它。为了展示这一点让我们创建一些类型化角色并给他们指派随机的id,然后我们会看到路由把消息发送给了不同的角色:

01 @Test public void typedRouterPattern() {
02     try {
03         // prepare routees
04         TypedActorExtension typed = TypedActor.get(system);
05  
06         Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
07         Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));
08  
09         List<Named> routees = new ArrayList<Named>();
10         routees.add(named1);
11         routees.add(named2);
12  
13         List<String> routeePaths = new ArrayList<String>();
14         routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
15         routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());
16  
17         // 准备类型化路由
18         ActorRef router = system.actorOf(new RoundRobinGroup(routeePaths).props(), "router");
19  
20         // 准备类型化代理,向路由转发MethodCall消息
21         Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);
22  
23         System.out.println("actor was: " + typedRouter.name()); // name-243
24         System.out.println("actor was: " + typedRouter.name()); // name-614
25         System.out.println("actor was: " + typedRouter.name()); // name-243
26         System.out.println("actor was: " + typedRouter.name()); // name-614
27  
28         typed.poisonPill(named1);
29         typed.poisonPill(named2);
30         typed.poisonPill(typedRouter);
31  
32     catch (Exception e) {
33         //dun care
34     }
35 }

为了在这样的角色实例中间轮询,你可以创建一个简单的非类型化路由,并用一个TypedActor实现这个路由的外观模式,就像下面的例子展示的。这样会起作用是因为类型化角色利用和普通角色相同的通讯机制,以及它们的方法调用被转化为MethodCall消息。

01 // prepare routees
02 TypedActorExtension typed = TypedActor.get(system);
03  
04 Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
05  
06 Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));
07  
08 List<Named> routees = new ArrayList<Named>();
09 routees.add(named1);
10 routees.add(named2);
11  
12 List<String> routeePaths = new ArrayList<String>();
13 routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
最后更新:2017-05-23 16:02:29

  上一篇:go  Bash漏洞那些事儿
  下一篇:go  NVIDIA发布首个基于AI的癌症分布式学习环境的框架——CANDLE


14 routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());