閱讀807 返回首頁    go 阿裏雲 go 技術社區[雲棲]


AKKA文檔(JAVA版)—派發器

派發器

Akka MessageDispatcher 是維持 Akka Actor “運作”的部分, 可以說它是整個機器的引擎. 所有的MessageDispatcher 實現也同時是一個 ExecutionContext, 這意味著它們可以用來執行任何代碼, 例如 Future.

 

缺省派發器

在沒有為 Actor作配置的情況下,每個 ActorSystem 將有一個缺省的派發器。 缺省派發器是可配置的,缺省情況下是一個確定的default-executor的 Dispatcher。如果通過傳遞ExecutionContext 來創建ActorSystem ,在ActorSystem中,此ExecutionContext 將作為所有派發器的defalut-executor 。如果沒有指定ExecutionContext,將後退到akka.actor.default-dispatcher.default-executor.fallback 的executor。缺省情況下的”fork-join-executor”,在大多數情況下擁有非常好的性能。

查找派發器

派發器實現ExecutionContext 接口,因此可以用來運行Future 調用 等待。

// for use with Futures, Scheduler, etc.
final ExecutionContext ex = system.dispatchers().lookup("my-dispatcher");

 

為角色指定派發器

在你想為Actor配置一個不同派發器而不是默認情況下,你需要做兩樣東西,首先是配置派發器:

my-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

接著使用 “thread-pool-executor”:

my-thread-pool-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "thread-pool-executor"
  # Configuration for the thread pool
  thread-pool-executor {
    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 2
    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 2.0
    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

更多細節選項,請見默認派發器配置章節

ActorRef myActor =
  system.actorOf(Props.create(MyUntypedActor.class),
    "myactor");

 

akka.actor.deployment {
  /myactor {
    dispatcher = my-dispatcher
  }
}

一種代替部署配置方法是定義派發器在代碼裏麵。如果在部署配置裏麵定義派發器則該值將被使用代替編碼設置參數。

ActorRef myActor =
  system.actorOf(Props.create(MyUntypedActor.class).withDispatcher("my-dispatcher"),
    "myactor3");

 

注意:你在withDispatcher中指定的 “dispatcherId” 其實是配置中的一個路徑. 所以在這種情況下它位於配置的頂層,但你可以把它放在下麵的層次,用.來代表子層次,象這樣: “foo.bar.my-dispatcher”。

派發器的類型

一共有4種類型的消息派發器:

  • Dispatcher
    • 這是個基於事件派發器,該派發器綁定一組角色到一個線程池中。如果沒有一個明確定義,這將是一個默認派發器使用。
    • 可共享性: 無限製
    • 郵箱: 任何,為每一個Actor創建一個
    • 使用場景: 缺省派發器,Bulkheading
    • 底層使用: java.util.concurrent.ExecutorService specify using “executor” using “fork-join-executor”, “thread-pool-executor” or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator
  • PinnedDispatcher
    • 該派發器致力於為每一個使用它的角色提供一個唯一的線程,例如:每一個角色將有自己的僅包含一個線程的線程池。
    • 可共享性: 無
    • 郵箱: 任何,為每個Actor創建一個
    • 使用場景: Bulkheading
    • 底層使用: 任何 akka.dispatch.ThreadPoolExecutorConfigurator缺省為一個 “thread-pool-executor”
  • CallingThreadDispatcher
    • 該派發器僅在當前線程上運行調用。此派發器不會創建任何新的線程,但可以使用來自不同的線程同時為相同的角色。請見 CallingThreadDispatcher for細節和限製
    • 可共享性: 無限製
    • 郵箱: 任何,每Actor每線程創建一個(需要時)
    • 使用場景: 測試
    • 底層使用: 調用的線程 (duh)

更多派發器配置例子

配置PinnedDispatcher:

my-pinned-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

接著使用它:

ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class)
    .withDispatcher("my-pinned-dispatcher"));

 

注意:thread-pool-executor 配置按照上麵my-thread-pool-dispatcher例子是不適用的。這是因為當使用PinnedDispatcher時候,每一個角色將有自己的線程池,線程池將隻有一個線程。

注意:隨著時間推移這將不保證一直使用相同線程,由於核心池超時用於PinnedDispatcher 在閑置角色情況下,降低資源使用。為了一直使用相同的線程,你需要添加 thread-pool-executor.allow-core-timeout=off到PinnedDispatcher配置中。

最後更新:2017-05-23 10:32:11

  上一篇:go  Java FP: 偽造閉包工廠,創建域對象
  下一篇:go  Disruptor 2.0更新摘要