AKKA文檔(JAVA版)—派發器
派發器
Akka MessageDispatcher 是維持 Akka Actor “運作”的部分, 可以說它是整個機器的引擎. 所有的MessageDispatcher 實現也同時是一個 ExecutionContext, 這意味著它們可以用來執行任何代碼, 例如 Future.
缺省派發器
在沒有為 Actor作配置的情況下,每個 ActorSystem 將有一個缺省的派發器。 缺省派發器是可配置的,缺省情況下是一個確定的default-executor的 Dispatcher。如果通過傳遞ExecutionContext 來創建ActorSystem ,在ActorSystem中,此ExecutionContext 將作為所有派發器的defalut-executor 。如果沒有指定ExecutionContext,將後退到akka.actor.default-dispatcher.default-executor.fallback 的executor。缺省情況下的”fork-join-executor”,在大多數情況下擁有非常好的性能。
查找派發器
派發器實現ExecutionContext 接口,因此可以用來運行Future 調用 等待。
// for use with Futures, Scheduler, etc. final ExecutionContext ex = system.dispatchers().lookup("my-dispatcher");
為角色指定派發器
在你想為Actor配置一個不同派發器而不是默認情況下,你需要做兩樣東西,首先是配置派發器:
my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "fork-join-executor" # Configuration for the fork join pool fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
接著使用 “thread-pool-executor”:
my-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "thread-pool-executor" # Configuration for the thread pool thread-pool-executor { # minimum number of threads to cap factor-based core number to core-pool-size-min = 2 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 2.0 # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
更多細節選項,請見默認派發器配置章節
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class), "myactor");
akka.actor.deployment { /myactor { dispatcher = my-dispatcher } }
一種代替部署配置方法是定義派發器在代碼裏麵。如果在部署配置裏麵定義派發器則該值將被使用代替編碼設置參數。
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor3");
注意:你在withDispatcher中指定的 “dispatcherId” 其實是配置中的一個路徑. 所以在這種情況下它位於配置的頂層,但你可以把它放在下麵的層次,用.來代表子層次,象這樣: “foo.bar.my-dispatcher”。
派發器的類型
一共有4種類型的消息派發器:
- Dispatcher
- 這是個基於事件派發器,該派發器綁定一組角色到一個線程池中。如果沒有一個明確定義,這將是一個默認派發器使用。
- 可共享性: 無限製
- 郵箱: 任何,為每一個Actor創建一個
- 使用場景: 缺省派發器,Bulkheading
- 底層使用: java.util.concurrent.ExecutorService specify using “executor” using “fork-join-executor”, “thread-pool-executor” or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator
- PinnedDispatcher
- 該派發器致力於為每一個使用它的角色提供一個唯一的線程,例如:每一個角色將有自己的僅包含一個線程的線程池。
- 可共享性: 無
- 郵箱: 任何,為每個Actor創建一個
- 使用場景: Bulkheading
- 底層使用: 任何 akka.dispatch.ThreadPoolExecutorConfigurator缺省為一個 “thread-pool-executor”
- CallingThreadDispatcher
- 該派發器僅在當前線程上運行調用。此派發器不會創建任何新的線程,但可以使用來自不同的線程同時為相同的角色。請見 CallingThreadDispatcher for細節和限製
- 可共享性: 無限製
- 郵箱: 任何,每Actor每線程創建一個(需要時)
- 使用場景: 測試
- 底層使用: 調用的線程 (duh)
更多派發器配置例子
配置PinnedDispatcher:
my-pinned-dispatcher { executor = "thread-pool-executor" type = PinnedDispatcher }
接著使用它:
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class) .withDispatcher("my-pinned-dispatcher"));
注意:thread-pool-executor 配置按照上麵my-thread-pool-dispatcher例子是不適用的。這是因為當使用PinnedDispatcher時候,每一個角色將有自己的線程池,線程池將隻有一個線程。
注意:隨著時間推移這將不保證一直使用相同線程,由於核心池超時用於PinnedDispatcher 在閑置角色情況下,降低資源使用。為了一直使用相同的線程,你需要添加 thread-pool-executor.allow-core-timeout=off到PinnedDispatcher配置中。
最後更新:2017-05-23 10:32:11