321
技術社區[雲棲]
Akka筆記之消息傳遞
在Akka筆記第一篇的介紹中,我們大致介紹了下Akka工具箱中的Actor。在第二篇當中,我們來看一下Actor消息傳遞的功能。這裏還是延用之前使用的那個學生-老師的例子。
在Actor消息的第一部分中,我們會創建一個老師的Actor,但學生Actor則先不創建,而是使用一個叫做StudentSimulatorApp的主程序。
仔細回顧下學生-老師模型
我們現在隻考慮StudentSimulatorApp發送給TeacherActor的消息。這裏我所說的StudentSimulatorApp指的是一個正常的主程序。
從圖中可以看到:
(如果有陌生的術語,沒關係,後麵我們會詳細解釋的)
1. 學生創建了一個叫ActorSystem的東西。
2. 他通過ActorSystem來創建了一個叫ActorRef的對象。QuoteRequest消息就是發送給ActorRef的(它是TeacherActor的一個代理)
3. ActorRef將消息發送給Dispatcher
4. Dispatcher將消息投遞到目標Actor的郵箱中。
5. 隨後Dispatcher將Mailbox扔給一個線程去執行(這點下節會重點講到)
6. MailBox將消息出隊並最終將其委托給真實的Teacher Actor的接收方法去處理。
正如我所說的,看不懂也別擔心。現在我們來一步步地詳細地分析下。全部講完後你可以再回過頭來看下這五個步驟。
STUDENTSIMULATORAPP程序
我們用這個STUDENTSIMULATORAPP來啟動JVM並初始化ActorSystem。
從圖中可以看到,StudentSimulatorApp
1. 創建了一個ActorSystem
2. 通過ActorSystem創建了一個Teacher Actor的代理(ActorRef)
3. 將QuoteRequest消息發送給代理
我們現在隻關注這三點。
1. 創建ActorSystem
ActorSystem是進入到Actor的世界的一扇大門。通過它你可以創建或中止Actor。甚至還可以把整個Actor環境給關閉掉。
另一方麵來說,Actor是一個分層的結構,ActorSystem之於Actor有點類似於java.lang.Object或者scala.Any的角色——也就是說,它是所有Actor的根對象。當你通過ActorSystem的actorOf方法創建了一個Actor時,你其實創建的是ActorSystem下麵的一個Actor。
初始化ActorSystem的代碼是這樣的:
val system=ActorSystem("UniversityMessageSystem")
UniversityMessageSystem隻是你給ActorSystem起的一個可愛的名字而已。
2. 創建一個TeacherActor的代理?
我們來看下下麵這段代碼:
val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])
actorOf是ActorSystem中創建Actor的方法。但是正如你所看到的,它並不會返回我們所需要的TeacherActor。它返回的是一個ActorRef。
這個ActorRef扮演了真實的Actor的一個代理的角色。客戶端並不會直接和Actor通信。這也正是Actor模型中避免直接訪問TeacherActor中任何的自定義/私有方法或者變量的一種方式。
再重複一遍,消息隻會發送給ActorRef,最終才會到達真正的Actor。你是絕對無法直接和Actor進行通信的。如果你真的找到了什麼拙劣的方式來直接通信,大家會恨你入骨的。
將消息發送給代理
還是隻有一行代碼。你隻需告訴說把QuoteRequest消息發送到ActorRef就好了。Actor中的這個告訴的方式就是一個!號。(ActorRef中確實也有一個tell方法,不過它隻是把這個調用委托給了!號)
//send a message to the Teacher Actor teacherActorRef!QuoteRequest
這就可以了!
如果你認為我在騙你的話,看一下下麵StudentSimulatorApp的完整代碼:
STUDENTSIMULATORAPP.SCALA
package me.rerun.akkanotes.messaging.actormsg1 import akka.actor.ActorSystem import akka.actor.Props import akka.actor.actorRef2Scala import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._ object StudentSimulatorApp extends App{ //Initialize the ActorSystem val actorSystem=ActorSystem("UniversityMessageSystem") //construct the Teacher Actor Ref val teacherActorRef=actorSystem.actorOf(Props[TeacherActor]) //send a message to the Teacher Actor teacherActorRef!QuoteRequest //Let's wait for a couple of seconds before we shut down the system Thread.sleep (2000) //Shut down the ActorSystem. actorSystem.shutdown() }
好吧,我承認我撒了點小謊。你還得關掉ActorSystem,不然JVM會一直運行下去的。我還讓主線程睡眠了一小會兒,以便給點時間讓TeacherActor去完成它的任務。我知道這聽起來很愚蠢。別擔心。後麵我們會通過些優雅的測試用例來替換掉這種取巧的方式。
消息
我們剛發送了一個QuoteMessage給ActorRef,但是,還壓根兒沒看著過這個消息類呢!
說曹操,曹操到:
(實踐中推薦你把消息封裝成一個好點的對象,這樣維護起來容易些)
TeacherProtocol
package me.rerun.akkanotes.messaging.protocols object TeacherProtocol{ case class QuoteRequest() case class QuoteResponse(quoteString:String) }
正如你所想的那樣,QuoteRequest就是發送給TeacherActor的那個消息。Actor會回複一個QuoteResponse。
分發器及郵箱
ActorRef把消息處理功能委托給了Dispatcher。實際上,當我們創建ActorSystem和ActorRef的時候,就已經創建了一個Dispatcher和MailBox了。我們來看下它們是幹什麼的。
郵箱
每個Actor都有一個MailBox(後麵會介紹一種特殊的情況)。在我們這個比喻當中,每個老師也有一個郵箱。老師得去檢查郵箱並處理消息。在Actor的世界中,則是另一種形式——郵箱一有機會就會要求Actor去完成自己的任務。
同樣的,郵箱裏也有一個隊列來以FIFO的方式來存儲並處理消息——它和實際的郵箱還有點不同,真實的郵箱新的信總是在最上麵的。
現在講到分發器了
Dispatcher會完成一些很酷的事。從它的角度來看,它隻是從ActorRef中取出一條消息然後將它傳給了MailBox。但是,在這後麵發生了一件不可意義的事情:
Dispatcher會封裝一個ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去運行。
看下Dispatcher裏麵的一段代碼:
protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = { ... try { executorService execute mbox ... }
什麼,你是說要執行一下郵箱?
是的。我們看到MailBox中包含了隊列裏麵的消息。由於Executor得去執行MailBox,所以它得是一個Thread類型。是的沒錯。MailBox的聲明及構造器就是這樣的。
下麵是MailBox的簽名信息。
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable
TeacherActor
當MailBox的run方法運行的時候,它會從隊列中取出一條消息,然後將它傳給Actor去處理。
當你把消息傳給ActorRef的時候,最終調用的實際是目標Actor裏麵的一個receive方法。
TeacherActor隻是一個很簡單的類,它有一個名言的列表,而receive方法很明顯就是用來處理消息的。
來看下代碼:
TeacherActor.scala
package me.rerun.akkanotes.messaging.actormsg1 import scala.util.Random import akka.actor.Actor import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._ /* * Your Teacher Actor class. * * The class could use refinement by way of * using ActorLogging which uses the EventBus of the Actor framework * instead of the plain old System out * */ class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") def receive = { case QuoteRequest => { import util.Random //Get a random Quote from the list and construct a response val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size))) println (quoteResponse) } } }
TeacherActor的receive方法的模式匹配隻會匹配一種消息——QuoteRequest (事實上,模式匹配中最好匹配下默認的情況,不過這個就說來話長了)
receive方法做的就是
1. 匹配QuoteRequest的模式
2. 從名言列表中隨機選取一條
3. 構造出一個QuoteResponse
4. 將QuoteResponse打印到控製台上
代碼
整個項目的代碼可以從Github中下載到。
本文最早發布於我的個人博客: Java譯站
最後更新:2017-05-23 14:35:00