Support same dispatcher as parent in Typed, #27123 (#27127)

* Support same dispatcher as parent in Typed, #27123

* remove apply in internal DispatcherDefault and DispatcherSameAsParent
This commit is contained in:
Patrik Nordwall 2019-07-04 11:00:04 +02:00 committed by GitHub
parent d88db078ad
commit 76c3271575
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 250 additions and 84 deletions

View file

@ -10,4 +10,5 @@ public class DispatcherSelectorTest {
private DispatcherSelector def = DispatcherSelector.defaultDispatcher();
private DispatcherSelector conf = DispatcherSelector.fromConfig("somepath");
private DispatcherSelector parent = DispatcherSelector.sameAsParent();
}

View file

@ -26,6 +26,7 @@ public class DispatchersDocTest {
yourBehavior,
"DispatcherFromConfig",
DispatcherSelector.fromConfig("your-dispatcher"));
context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent());
// #spawn-dispatcher
return Behaviors.same();

View file

@ -9,7 +9,7 @@ import org.scalatest.WordSpec
class PropsSpec extends WordSpec with Matchers {
val dispatcherFirst = DispatcherDefault(DispatcherFromConfig("pool"))
val dispatcherFirst = Props.empty.withDispatcherFromConfig("pool").withDispatcherDefault
"A Props" must {

View file

@ -0,0 +1,109 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import akka.actor.BootstrapSetup
import akka.actor.setup.ActorSystemSetup
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.SpawnProtocol
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object DispatcherSelectorSpec {
val config = ConfigFactory.parseString("""
ping-pong-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher
}
""")
object PingPong {
case class Ping(replyTo: ActorRef[Pong])
case class Pong(threadName: String)
def apply(): Behavior[Ping] =
Behaviors.receiveMessage[Ping] { message =>
message.replyTo ! Pong(Thread.currentThread().getName)
Behaviors.same
}
}
}
class DispatcherSelectorSpec extends ScalaTestWithActorTestKit(DispatcherSelectorSpec.config) with WordSpecLike {
import DispatcherSelectorSpec.PingPong
import DispatcherSelectorSpec.PingPong._
"DispatcherSelector" must {
"select dispatcher from config" in {
val probe = createTestProbe[Pong]()
val pingPong = spawn(PingPong(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
pingPong ! Ping(probe.ref)
val response = probe.receiveMessage()
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
}
"select same dispatcher as parent" in {
val parent = spawn(SpawnProtocol.behavior, Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val childProbe = createTestProbe[ActorRef[Ping]]()
parent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
val probe = createTestProbe[Pong]()
val child = childProbe.receiveMessage()
child ! Ping(probe.ref)
val response = probe.receiveMessage()
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
}
"select same dispatcher as parent, several levels" in {
val grandParent = spawn(SpawnProtocol.behavior, Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val parentProbe = createTestProbe[ActorRef[SpawnProtocol.Spawn[Ping]]]()
grandParent ! SpawnProtocol.Spawn(
SpawnProtocol.behavior,
"parent",
Props.empty.withDispatcherSameAsParent,
parentProbe.ref)
val childProbe = createTestProbe[ActorRef[Ping]]()
grandParent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
val probe = createTestProbe[Pong]()
val child = childProbe.receiveMessage()
child ! Ping(probe.ref)
val response = probe.receiveMessage()
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
}
"use default dispatcher if selecting parent dispatcher for user guardian" in {
val sys = ActorSystem(
PingPong(),
"DispatcherSelectorSpec2",
ActorSystemSetup.create(BootstrapSetup()),
Props.empty.withDispatcherSameAsParent)
try {
val probe = TestProbe[Pong]()(sys)
sys ! Ping(probe.ref)
val response = probe.receiveMessage()
response.threadName should startWith("DispatcherSelectorSpec2-akka.actor.default-dispatcher")
} finally {
ActorTestKit.shutdown(sys)
}
}
}
}

View file

@ -47,6 +47,7 @@ object DispatchersDocSpec {
context.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.default())
context.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking())
context.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"))
context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent())
//#spawn-dispatcher
Behaviors.same

View file

@ -10,6 +10,8 @@ import akka.annotation.InternalApi
import scala.annotation.tailrec
import scala.reflect.ClassTag
import akka.actor.typed.internal.PropsImpl._
object Props {
/**
@ -68,6 +70,11 @@ abstract class Props private[akka] () extends Product with Serializable {
*/
def withDispatcherFromConfig(path: String): Props = DispatcherFromConfig(path, this)
/**
* Prepend a selection of the same executor as the parent actor to this Props.
*/
def withDispatcherSameAsParent: Props = DispatcherSameAsParent(this)
/**
* Find the first occurrence of a configuration node of the given type, falling
* back to the provided default if none is found.
@ -125,21 +132,11 @@ abstract class Props private[akka] () extends Product with Serializable {
}
}
/**
* The empty configuration node, used as a terminator for the internally linked
* list of each Props.
*/
@InternalApi
private[akka] case object EmptyProps extends Props {
override def next = throw new NoSuchElementException("EmptyProps has no next")
override def withNext(next: Props): Props = next
}
/**
* Not for user extension.
*/
@DoNotInherit
sealed abstract class DispatcherSelector extends Props
abstract class DispatcherSelector extends Props
/**
* Factories for [[DispatcherSelector]]s which describe which thread pool shall be used to run
@ -155,7 +152,7 @@ object DispatcherSelector {
* Scala API:
* Run the actor on the default [[ActorSystem]] executor.
*/
def default(): DispatcherSelector = DispatcherDefault()
def default(): DispatcherSelector = DispatcherDefault.empty
/**
* Java API:
@ -175,38 +172,10 @@ object DispatcherSelector {
* ActorSystem terminates.
*/
def fromConfig(path: String): DispatcherSelector = DispatcherFromConfig(path)
}
/**
* INTERNAL API
*
* Use the [[ActorSystem]] default executor to run the actor.
*/
@DoNotInherit
@InternalApi
private[akka] sealed case class DispatcherDefault(next: Props) extends DispatcherSelector {
@InternalApi
override def withNext(next: Props): Props = copy(next = next)
}
object DispatcherDefault {
// this is hidden in order to avoid having people match on this object
private val empty = DispatcherDefault(EmptyProps)
/**
* Retrieve an instance for this configuration node with empty `next` reference.
* Run the actor on the same executor as the parent actor.
* @return
*/
def apply(): DispatcherDefault = empty
}
/**
* Look up an executor definition in the [[ActorSystem]] configuration.
* ExecutorServices created in this fashion will be shut down when the
* ActorSystem terminates.
*
* INTERNAL API
*/
@InternalApi
private[akka] final case class DispatcherFromConfig(path: String, next: Props = Props.empty)
extends DispatcherSelector {
override def withNext(next: Props): Props = copy(next = next)
def sameAsParent(): DispatcherSelector = DispatcherSameAsParent.empty
}

View file

@ -0,0 +1,43 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.Props
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[akka] object PropsImpl {
/**
* The empty configuration node, used as a terminator for the internally linked
* list of each Props.
*/
case object EmptyProps extends Props {
override def next = throw new NoSuchElementException("EmptyProps has no next")
override def withNext(next: Props): Props = next
}
final case class DispatcherDefault(next: Props) extends DispatcherSelector {
override def withNext(next: Props): Props = copy(next = next)
}
object DispatcherDefault {
val empty = DispatcherDefault(EmptyProps)
}
final case class DispatcherFromConfig(path: String, next: Props = Props.empty) extends DispatcherSelector {
override def withNext(next: Props): Props = copy(next = next)
}
final case class DispatcherSameAsParent(next: Props) extends DispatcherSelector {
override def withNext(next: Props): Props = copy(next = next)
}
object DispatcherSameAsParent {
val empty = DispatcherSameAsParent(EmptyProps)
}
}

View file

@ -3,9 +3,10 @@
*/
package akka.actor.typed.internal.adapter
import akka.ConfigurationException
import akka.actor.typed._
import akka.annotation.InternalApi
import akka.ConfigurationException
import akka.util.ErrorMessages
/**

View file

@ -2,27 +2,39 @@
* Copyright (C) 2016-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
package internal
package adapter
package akka.actor.typed.internal.adapter
import java.util.concurrent.CompletionStage
import akka.actor
import scala.compat.java8.FutureConverters
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import akka.Done
import akka.actor
import akka.actor.ActorRefProvider
import akka.actor.ExtendedActorSystem
import akka.actor.InvalidMessageException
import akka.{ actor => untyped }
import scala.concurrent.ExecutionContextExecutor
import akka.util.Timeout
import scala.concurrent.Future
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.Dispatchers
import akka.actor.typed.Logger
import akka.actor.typed.Props
import akka.actor.typed.Scheduler
import akka.actor.typed.Settings
import akka.actor.typed.internal.ActorRefImpl
import akka.actor.typed.internal.ExtensionsImpl
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.PropsImpl.DispatcherDefault
import akka.actor.typed.internal.PropsImpl.DispatcherFromConfig
import akka.actor.typed.internal.PropsImpl.DispatcherSameAsParent
import akka.actor.typed.internal.SystemMessage
import akka.annotation.InternalApi
import scala.compat.java8.FutureConverters
import akka.actor.ActorRefProvider
import akka.event.LoggingFilterWithMarker
import akka.util.Timeout
import akka.{ actor => untyped }
/**
* INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context).
@ -34,8 +46,8 @@ import akka.event.LoggingFilterWithMarker
@InternalApi private[akka] class ActorSystemAdapter[-T](val untypedSystem: untyped.ActorSystemImpl)
extends ActorSystem[T]
with ActorRef[T]
with internal.ActorRefImpl[T]
with internal.InternalRecipientRef[T]
with ActorRefImpl[T]
with InternalRecipientRef[T]
with ExtensionsImpl {
// note that the untypedSystem may not be initialized yet here, and that is fine because
@ -52,7 +64,7 @@ import akka.event.LoggingFilterWithMarker
// impl ActorRefImpl
override def isLocal: Boolean = true
// impl ActorRefImpl
override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untypedSystem.guardian, signal)
override def sendSystem(signal: SystemMessage): Unit = sendSystemMessage(untypedSystem.guardian, signal)
// impl InternalRecipientRef
override def provider: ActorRefProvider = untypedSystem.provider
@ -71,6 +83,7 @@ import akka.event.LoggingFilterWithMarker
selector match {
case DispatcherDefault(_) => untypedSystem.dispatcher
case DispatcherFromConfig(str, _) => untypedSystem.dispatchers.lookup(str)
case DispatcherSameAsParent(_) => untypedSystem.dispatcher
}
override def shutdown(): Unit = () // there was no shutdown in untyped Akka
}

View file

@ -2,11 +2,13 @@
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
package internal
package adapter
package akka.actor.typed.internal.adapter
import akka.actor.Deploy
import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.Props
import akka.actor.typed.internal.PropsImpl._
import akka.annotation.InternalApi
/**
@ -19,9 +21,10 @@ import akka.annotation.InternalApi
rethrowTypedFailure: Boolean = true): akka.actor.Props = {
val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure))
(deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match {
(deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match {
case _: DispatcherDefault => props
case DispatcherFromConfig(name, _) => props.withDispatcher(name)
case _: DispatcherSameAsParent => props.withDispatcher(Deploy.DispatcherSameAsParent)
}).withDeploy(Deploy.local) // disallow remote deployment for typed actors
}

View file

@ -7,7 +7,6 @@ package akka.actor.typed.javadsl
import akka.actor
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.EmptyProps
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorSystem
@ -36,7 +35,7 @@ object Adapter {
* `Behaviors.supervise`.
*/
def spawnAnonymous[T](sys: akka.actor.ActorSystem, behavior: Behavior[T]): ActorRef[T] =
spawnAnonymous(sys, behavior, EmptyProps)
spawnAnonymous(sys, behavior, Props.empty)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
@ -52,7 +51,7 @@ object Adapter {
* `Behaviors.supervise`.
*/
def spawn[T](sys: akka.actor.ActorSystem, behavior: Behavior[T], name: String): ActorRef[T] =
spawn(sys, behavior, name, EmptyProps)
spawn(sys, behavior, name, Props.empty)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorSystem.
@ -68,7 +67,7 @@ object Adapter {
* `Behaviors.supervise`.
*/
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T]): ActorRef[T] =
spawnAnonymous(ctx, behavior, EmptyProps)
spawnAnonymous(ctx, behavior, Props.empty)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
@ -84,7 +83,7 @@ object Adapter {
* `Behaviors.supervise`.
*/
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String): ActorRef[T] =
spawn(ctx, behavior, name, EmptyProps)
spawn(ctx, behavior, name, Props.empty)
/**
* Spawn the given behavior as a child of the user actor in an untyped ActorContext.
@ -155,5 +154,5 @@ object Adapter {
* example of that.
*/
def props[T](behavior: Creator[Behavior[T]]): akka.actor.Props =
props(behavior, EmptyProps)
props(behavior, Props.empty)
}

View file

@ -10,15 +10,16 @@ import akka.routing._
import akka.event._
import akka.util.Helpers
import akka.util.Collections.EmptyImmutableSeq
import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ ExecutionContextExecutor, Future, Promise }
import scala.annotation.implicitNotFound
import akka.ConfigurationException
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.Dispatchers
import akka.serialization.Serialization
import akka.util.OptionVal
@ -513,8 +514,12 @@ private[akka] class LocalActorRefProvider private[akka] (
// make user provided guardians not run on internal dispatcher
val dispatcher =
system.guardianProps match {
case None => internalDispatcher
case Some(props) => system.dispatchers.lookup(props.dispatcher)
case None => internalDispatcher
case Some(props) =>
val dispatcherId =
if (props.deploy.dispatcher == Deploy.DispatcherSameAsParent) Dispatchers.DefaultDispatcherId
else props.dispatcher
system.dispatchers.lookup(dispatcherId)
}
val ref = new LocalActorRef(
system,
@ -621,17 +626,29 @@ private[akka] class LocalActorRefProvider private[akka] (
}
}
def parentDispatcher: String = supervisor match {
case withCell: ActorRefWithCell => withCell.underlying.props.dispatcher
case _ => Deploy.NoDispatcherGiven
}
val props2 =
// mailbox and dispatcher defined in deploy should override props
(if (lookupDeploy) deployer.lookup(path) else deploy) match {
case Some(d) =>
(d.dispatcher, d.mailbox) match {
case (Deploy.NoDispatcherGiven, Deploy.NoMailboxGiven) => props
case (dsp, Deploy.NoMailboxGiven) => props.withDispatcher(dsp)
case (Deploy.NoMailboxGiven, mbx) => props.withMailbox(mbx)
case (dsp, mbx) => props.withDispatcher(dsp).withMailbox(mbx)
case (Deploy.NoDispatcherGiven, Deploy.NoMailboxGiven) => props
case (Deploy.DispatcherSameAsParent, Deploy.NoMailboxGiven) => props.withDispatcher(parentDispatcher)
case (dsp, Deploy.NoMailboxGiven) => props.withDispatcher(dsp)
case (Deploy.NoDispatcherGiven, mbx) => props.withMailbox(mbx)
case (Deploy.DispatcherSameAsParent, mbx) => props.withDispatcher(parentDispatcher).withMailbox(mbx)
case (dsp, mbx) => props.withDispatcher(dsp).withMailbox(mbx)
}
case _ => props // no deployment config found
case _ =>
// no deployment config found
if (props.deploy.dispatcher == Deploy.DispatcherSameAsParent)
props.withDispatcher(parentDispatcher)
else
props
}
if (!system.dispatchers.hasDispatcher(props2.dispatcher))

View file

@ -10,13 +10,19 @@ import akka.routing._
import akka.util.WildcardIndex
import com.github.ghik.silencer.silent
import com.typesafe.config._
import scala.annotation.tailrec
import akka.annotation.InternalApi
object Deploy {
final val NoDispatcherGiven = ""
final val NoMailboxGiven = ""
val local = Deploy(scope = LocalScope)
/**
* INTERNAL API
*/
@InternalApi private[akka] final val DispatcherSameAsParent = ".."
}
/**

View file

@ -171,7 +171,7 @@ private[akka] class Mailboxes(
if (deploy.mailbox != Deploy.NoMailboxGiven) {
verifyRequirements(lookup(deploy.mailbox))
} else if (deploy.dispatcher != Deploy.NoDispatcherGiven && hasMailboxType) {
} else if (deploy.dispatcher != Deploy.NoDispatcherGiven && deploy.dispatcher != Deploy.DispatcherSameAsParent && hasMailboxType) {
verifyRequirements(lookup(dispatcherConfig.getString("id")))
} else if (hasRequiredType(actorClass)) {
try verifyRequirements(lookupByQueueType(getRequiredType(actorClass)))

View file

@ -2,7 +2,7 @@
## Dependency
Dispatchers are part of core akka, which means that they are part of the akka-actor-typed dependency:
Dispatchers are part of core Akka, which means that they are part of the akka-actor-typed dependency:
@@dependency[sbt,Maven,Gradle] {
group="com.typesafe.akka"
@ -30,8 +30,11 @@ Scala
Java
: @@snip [DispatcherDocTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/DispatchersDocTest.java) { #spawn-dispatcher }
`DispatcherSelector` has two convenience methods to look up the default dispatcher and a dispatcher you can use to
execute actors that block e.g. a legacy database API that does not support @scala[`Future`]@java[`CompletionStage`]s.
`DispatcherSelector` has a few convenience methods:
* @scala[`DispatcherSelector.default`]@java[`DispatcherSelector.defaultDispatcher`] to look up the default dispatcher
* `DispatcherSelector.blocking` can be used to execute actors that block e.g. a legacy database API that does not support @scala[`Future`]@java[`CompletionStage`]s
* `DispatcherSelector.sameAsParent` to use the same dispatcher as the parent actor
The final example shows how to load a custom dispatcher from configuration and replies on this being in your application.conf: