parent
08de65f0f1
commit
336400832b
6 changed files with 164 additions and 0 deletions
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package jdocs.akka.typed;
|
||||||
|
|
||||||
|
import akka.actor.typed.Behavior;
|
||||||
|
import akka.actor.typed.javadsl.*;
|
||||||
|
import org.scalatest.junit.JUnitSuite;
|
||||||
|
import akka.actor.typed.DispatcherSelector;
|
||||||
|
|
||||||
|
public class DispatchersDocTest {
|
||||||
|
|
||||||
|
private static Behavior<String> yourBehavior = Behaviors.empty();
|
||||||
|
|
||||||
|
private static Behavior<Object> example = Behaviors.receive((ctx, msg) -> {
|
||||||
|
|
||||||
|
//#spawn-dispatcher
|
||||||
|
ctx.spawn(yourBehavior, "DefaultDispatcher");
|
||||||
|
ctx.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.defaultDispatcher());
|
||||||
|
ctx.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking());
|
||||||
|
ctx.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"));
|
||||||
|
//#spawn-dispatcher
|
||||||
|
|
||||||
|
return Behaviors.same();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.akka.typed
|
||||||
|
|
||||||
|
import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
|
||||||
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
|
import akka.actor.typed.SpawnProtocol.Spawn
|
||||||
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
|
import akka.actor.typed.{ ActorRef, Behavior, SpawnProtocol, TypedAkkaSpecWithShutdown, Props, DispatcherSelector }
|
||||||
|
import akka.dispatch.Dispatcher
|
||||||
|
import org.scalatest.concurrent.ScalaFutures
|
||||||
|
import DispatchersDocSpec._
|
||||||
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
|
|
||||||
|
object DispatchersDocSpec {
|
||||||
|
|
||||||
|
val config = ConfigFactory.parseString(
|
||||||
|
"""
|
||||||
|
//#config
|
||||||
|
your-dispatcher {
|
||||||
|
type = Dispatcher
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
fixed-pool-size = 32
|
||||||
|
}
|
||||||
|
throughput = 1
|
||||||
|
}
|
||||||
|
//#config
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
case class WhichDispatcher(replyTo: ActorRef[Dispatcher])
|
||||||
|
|
||||||
|
val giveMeYourDispatcher = Behaviors.receive[WhichDispatcher] { (ctx, msg) ⇒
|
||||||
|
msg.replyTo ! ctx.executionContext.asInstanceOf[Dispatcher]
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
|
||||||
|
val yourBehavior: Behavior[String] = Behaviors.same
|
||||||
|
|
||||||
|
val example = Behaviors.receive[Any] { (ctx, msg) ⇒
|
||||||
|
|
||||||
|
//#spawn-dispatcher
|
||||||
|
import akka.actor.typed.DispatcherSelector
|
||||||
|
|
||||||
|
ctx.spawn(yourBehavior, "DefaultDispatcher")
|
||||||
|
ctx.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.default())
|
||||||
|
ctx.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking())
|
||||||
|
ctx.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"))
|
||||||
|
//#spawn-dispatcher
|
||||||
|
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class DispatchersDocSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with ScalaFutures {
|
||||||
|
|
||||||
|
override def config: Config = DispatchersDocSpec.config
|
||||||
|
|
||||||
|
"Actor Dispatchers" should {
|
||||||
|
"support default and blocking dispatcher" in {
|
||||||
|
val probe = TestProbe[Dispatcher]()
|
||||||
|
val actor: ActorRef[SpawnProtocol] = spawn(SpawnProtocol.behavior)
|
||||||
|
|
||||||
|
val withDefault = (actor ? Spawn(giveMeYourDispatcher, "default", Props.empty)).futureValue
|
||||||
|
withDefault ! WhichDispatcher(probe.ref)
|
||||||
|
probe.expectMessageType[Dispatcher].id shouldEqual "akka.actor.default-dispatcher"
|
||||||
|
|
||||||
|
val withBlocking = (actor ? Spawn(giveMeYourDispatcher, "default", DispatcherSelector.blocking())).futureValue
|
||||||
|
withBlocking ! WhichDispatcher(probe.ref)
|
||||||
|
probe.expectMessageType[Dispatcher].id shouldEqual "akka.actor.default-blocking-io-dispatcher"
|
||||||
|
|
||||||
|
val withCustom = (actor ? Spawn(giveMeYourDispatcher, "default", DispatcherSelector.fromConfig("your-dispatcher"))).futureValue
|
||||||
|
withCustom ! WhichDispatcher(probe.ref)
|
||||||
|
probe.expectMessageType[Dispatcher].id shouldEqual "your-dispatcher"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -164,6 +164,12 @@ object DispatcherSelector {
|
||||||
*/
|
*/
|
||||||
def defaultDispatcher(): DispatcherSelector = default()
|
def defaultDispatcher(): DispatcherSelector = default()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run the actor on the default blocking dispatcher that is
|
||||||
|
* configured under default-blocking-io-dispatcher
|
||||||
|
*/
|
||||||
|
def blocking(): DispatcherSelector = fromConfig("akka.actor.default-blocking-io-dispatcher")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Look up an executor definition in the [[ActorSystem]] configuration.
|
* Look up an executor definition in the [[ActorSystem]] configuration.
|
||||||
* ExecutorServices created in this fashion will be shut down when the
|
* ExecutorServices created in this fashion will be shut down when the
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,12 @@ object SpawnProtocol {
|
||||||
*/
|
*/
|
||||||
def apply[T](behavior: Behavior[T], name: String, props: Props): ActorRef[ActorRef[T]] ⇒ Spawn[T] =
|
def apply[T](behavior: Behavior[T], name: String, props: Props): ActorRef[ActorRef[T]] ⇒ Spawn[T] =
|
||||||
replyTo ⇒ new Spawn(behavior, name, props, replyTo)
|
replyTo ⇒ new Spawn(behavior, name, props, replyTo)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Special factory to make using Spawn with ask easier. Props defaults to Props.empty
|
||||||
|
*/
|
||||||
|
def apply[T](behavior: Behavior[T], name: String): ActorRef[ActorRef[T]] ⇒ Spawn[T] =
|
||||||
|
replyTo ⇒ new Spawn(behavior, name, Props.empty, replyTo)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
44
akka-docs/src/main/paradox/typed/dispatchers.md
Normal file
44
akka-docs/src/main/paradox/typed/dispatchers.md
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
# Dispatchers
|
||||||
|
|
||||||
|
## Dependency
|
||||||
|
|
||||||
|
Dispatchers are part of core akka, which means that they are part of the akka-actor-typed dependency:
|
||||||
|
|
||||||
|
@@dependency[sbt,Maven,Gradle] {
|
||||||
|
group="com.typesafe.akka"
|
||||||
|
artifact="akka-actor-typed_$scala.binary_version$"
|
||||||
|
version="$akka.version$"
|
||||||
|
}
|
||||||
|
|
||||||
|
## Introduction
|
||||||
|
|
||||||
|
An Akka `MessageDispatcher` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
|
||||||
|
All `MessageDispatcher` implementations are also an @scala[`ExecutionContext`]@java[`Executor`], which means that they can be used
|
||||||
|
to execute arbitrary code, for instance @ref:[Futures](../futures.md).
|
||||||
|
|
||||||
|
## Selecting a dispatcher
|
||||||
|
|
||||||
|
A default dispatcher is used for all actors that are spawned without specifying a custom dispatcher.
|
||||||
|
This is suitable for all actors that don't block. Blocking in actors needs to be carefully managed, more
|
||||||
|
details @ref:[here](../dispatchers.md#blocking-needs-careful-management).
|
||||||
|
|
||||||
|
To select a dispatcher use `DispatcherSelector` to create a `Props` instance for spawning your actor:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [DispatcherDocSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/DispatchersDocSpec.scala) { #spawn-dispatcher }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [DispatcherDocTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/DispatchersDocTest.java) { #spawn-dispatcher }
|
||||||
|
|
||||||
|
`DispatcherSelector` has two convenience methods to look up the default dispatcher and a dispatcher you can use to
|
||||||
|
execute actors that block e.g. a legacy database API that does not support @scala[`Future`]@java[`CompletionStage`]s.
|
||||||
|
|
||||||
|
The final example shows how to load a custom dispatcher from configuration and replies on this being in your application.conf:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [DispatcherDocSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/DispatchersDocSpec.scala) { #config }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [DispatcherDocSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/DispatchersDocSpec.scala) { #config }
|
||||||
|
|
||||||
|
For full details on how to configure custom dispatchers see the @ref:[untyped docs](../dispatchers.md#types-of-dispatchers).
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
@@@ index
|
@@@ index
|
||||||
|
|
||||||
* [actors](actors.md)
|
* [actors](actors.md)
|
||||||
|
* [dispatchers](dispatchers.md)
|
||||||
* [coexisting](coexisting.md)
|
* [coexisting](coexisting.md)
|
||||||
* [actor-lifecycle](actor-lifecycle.md)
|
* [actor-lifecycle](actor-lifecycle.md)
|
||||||
* [interaction patterns](interaction-patterns.md)
|
* [interaction patterns](interaction-patterns.md)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue