閱讀321 返回首頁    go 技術社區[雲棲]


Akka筆記之消息傳遞

在Akka筆記第一篇的介紹中,我們大致介紹了下Akka工具箱中的Actor。在第二篇當中,我們來看一下Actor消息傳遞的功能。這裏還是延用之前使用的那個學生-老師的例子。

在Actor消息的第一部分中,我們會創建一個老師的Actor,但學生Actor則先不創建,而是使用一個叫做StudentSimulatorApp的主程序。

仔細回顧下學生-老師模型

我們現在隻考慮StudentSimulatorApp發送給TeacherActor的消息。這裏我所說的StudentSimulatorApp指的是一個正常的主程序。

從圖中可以看到:
(如果有陌生的術語,沒關係,後麵我們會詳細解釋的)

1. 學生創建了一個叫ActorSystem的東西。
2. 他通過ActorSystem來創建了一個叫ActorRef的對象。QuoteRequest消息就是發送給ActorRef的(它是TeacherActor的一個代理)
3. ActorRef將消息發送給Dispatcher
4. Dispatcher將消息投遞到目標Actor的郵箱中。
5. 隨後Dispatcher將Mailbox扔給一個線程去執行(這點下節會重點講到)
6. MailBox將消息出隊並最終將其委托給真實的Teacher Actor的接收方法去處理。

正如我所說的,看不懂也別擔心。現在我們來一步步地詳細地分析下。全部講完後你可以再回過頭來看下這五個步驟。

STUDENTSIMULATORAPP程序

我們用這個STUDENTSIMULATORAPP來啟動JVM並初始化ActorSystem。

從圖中可以看到,StudentSimulatorApp

1. 創建了一個ActorSystem
2. 通過ActorSystem創建了一個Teacher Actor的代理(ActorRef)
3. 將QuoteRequest消息發送給代理

我們現在隻關注這三點。

1. 創建ActorSystem

ActorSystem是進入到Actor的世界的一扇大門。通過它你可以創建或中止Actor。甚至還可以把整個Actor環境給關閉掉。

另一方麵來說,Actor是一個分層的結構,ActorSystem之於Actor有點類似於java.lang.Object或者scala.Any的角色——也就是說,它是所有Actor的根對象。當你通過ActorSystem的actorOf方法創建了一個Actor時,你其實創建的是ActorSystem下麵的一個Actor。

初始化ActorSystem的代碼是這樣的:

val system=ActorSystem("UniversityMessageSystem")





UniversityMessageSystem隻是你給ActorSystem起的一個可愛的名字而已。

2. 創建一個TeacherActor的代理?

我們來看下下麵這段代碼:

val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])

actorOf是ActorSystem中創建Actor的方法。但是正如你所看到的,它並不會返回我們所需要的TeacherActor。它返回的是一個ActorRef。

這個ActorRef扮演了真實的Actor的一個代理的角色。客戶端並不會直接和Actor通信。這也正是Actor模型中避免直接訪問TeacherActor中任何的自定義/私有方法或者變量的一種方式。

再重複一遍,消息隻會發送給ActorRef,最終才會到達真正的Actor。你是絕對無法直接和Actor進行通信的。如果你真的找到了什麼拙劣的方式來直接通信,大家會恨你入骨的。

image

將消息發送給代理

還是隻有一行代碼。你隻需告訴說把QuoteRequest消息發送到ActorRef就好了。Actor中的這個告訴的方式就是一個!號。(ActorRef中確實也有一個tell方法,不過它隻是把這個調用委托給了!號)

//send a message to the Teacher Actor
teacherActorRef!QuoteRequest


這就可以了!

如果你認為我在騙你的話,看一下下麵StudentSimulatorApp的完整代碼:

STUDENTSIMULATORAPP.SCALA
package me.rerun.akkanotes.messaging.actormsg1

import akka.actor.ActorSystem  
import akka.actor.Props  
import akka.actor.actorRef2Scala  
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._


object StudentSimulatorApp extends App{

  //Initialize the ActorSystem
  val actorSystem=ActorSystem("UniversityMessageSystem")

  //construct the Teacher Actor Ref
  val teacherActorRef=actorSystem.actorOf(Props[TeacherActor])

  //send a message to the Teacher Actor
  teacherActorRef!QuoteRequest

  //Let's wait for a couple of seconds before we shut down the system
  Thread.sleep (2000) 

  //Shut down the ActorSystem.
  actorSystem.shutdown()

} 


好吧,我承認我撒了點小謊。你還得關掉ActorSystem,不然JVM會一直運行下去的。我還讓主線程睡眠了一小會兒,以便給點時間讓TeacherActor去完成它的任務。我知道這聽起來很愚蠢。別擔心。後麵我們會通過些優雅的測試用例來替換掉這種取巧的方式。

消息

我們剛發送了一個QuoteMessage給ActorRef,但是,還壓根兒沒看著過這個消息類呢!

說曹操,曹操到:

(實踐中推薦你把消息封裝成一個好點的對象,這樣維護起來容易些)

TeacherProtocol
package me.rerun.akkanotes.messaging.protocols

object TeacherProtocol{

  case class QuoteRequest()
  case class QuoteResponse(quoteString:String)

}


正如你所想的那樣,QuoteRequest就是發送給TeacherActor的那個消息。Actor會回複一個QuoteResponse。

分發器及郵箱

ActorRef把消息處理功能委托給了Dispatcher。實際上,當我們創建ActorSystem和ActorRef的時候,就已經創建了一個Dispatcher和MailBox了。我們來看下它們是幹什麼的。

郵箱

每個Actor都有一個MailBox(後麵會介紹一種特殊的情況)。在我們這個比喻當中,每個老師也有一個郵箱。老師得去檢查郵箱並處理消息。在Actor的世界中,則是另一種形式——郵箱一有機會就會要求Actor去完成自己的任務。

同樣的,郵箱裏也有一個隊列來以FIFO的方式來存儲並處理消息——它和實際的郵箱還有點不同,真實的郵箱新的信總是在最上麵的。

現在講到分發器了

Dispatcher會完成一些很酷的事。從它的角度來看,它隻是從ActorRef中取出一條消息然後將它傳給了MailBox。但是,在這後麵發生了一件不可意義的事情:

Dispatcher會封裝一個ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去運行。

看下Dispatcher裏麵的一段代碼:

protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = {  
    ...
    try {
        executorService execute mbox
    ...
}


什麼,你是說要執行一下郵箱?

是的。我們看到MailBox中包含了隊列裏麵的消息。由於Executor得去執行MailBox,所以它得是一個Thread類型。是的沒錯。MailBox的聲明及構造器就是這樣的。

下麵是MailBox的簽名信息。

private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable


TeacherActor

當MailBox的run方法運行的時候,它會從隊列中取出一條消息,然後將它傳給Actor去處理。

當你把消息傳給ActorRef的時候,最終調用的實際是目標Actor裏麵的一個receive方法。

TeacherActor隻是一個很簡單的類,它有一個名言的列表,而receive方法很明顯就是用來處理消息的。

來看下代碼:

TeacherActor.scala

package me.rerun.akkanotes.messaging.actormsg1

import scala.util.Random

import akka.actor.Actor  
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._

/*
 * Your Teacher Actor class. 
 * 
 * The class could use refinement by way of  
 * using ActorLogging which uses the EventBus of the Actor framework
 * instead of the plain old System out
 * 
 */

class TeacherActor extends Actor {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  def receive = {

    case QuoteRequest => {

      import util.Random

      //Get a random Quote from the list and construct a response
      val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size)))

      println (quoteResponse)

    }

  }

}

TeacherActor的receive方法的模式匹配隻會匹配一種消息——QuoteRequest (事實上,模式匹配中最好匹配下默認的情況,不過這個就說來話長了)

receive方法做的就是

1. 匹配QuoteRequest的模式
2. 從名言列表中隨機選取一條
3. 構造出一個QuoteResponse
4. 將QuoteResponse打印到控製台上

代碼

整個項目的代碼可以從Github中下載到。

本文最早發布於我的個人博客: Java譯站

最後更新:2017-05-23 14:35:00

  上一篇:go  Akka筆記之Actor簡介
  下一篇:go  Akka筆記之日誌及測試