fix DispatcherSameAsParent when using ClusterActorRefProvider, #28842
* add test * the check for valid config is not needed in RemoteActorRefProvider, since it is delegating to LocalActorRefProvider for all cases but the remote deployment case
This commit is contained in:
parent
f31b58b61a
commit
5ffa4b077d
4 changed files with 54 additions and 9 deletions
|
|
@ -6,11 +6,16 @@ package akka.actor.testkit.typed.internal
|
||||||
|
|
||||||
import java.lang.reflect.Modifier
|
import java.lang.reflect.Modifier
|
||||||
|
|
||||||
|
import scala.util.control.Exception.Catcher
|
||||||
|
|
||||||
import akka.actor.typed.scaladsl.Behaviors
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
|
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import scala.concurrent.{ Await, TimeoutException }
|
import scala.concurrent.{ Await, TimeoutException }
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
|
import akka.actor.typed.scaladsl.ActorContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -29,11 +34,15 @@ private[akka] object ActorTestKitGuardian {
|
||||||
|
|
||||||
val testKitGuardian: Behavior[TestKitCommand] = Behaviors.receive[TestKitCommand] {
|
val testKitGuardian: Behavior[TestKitCommand] = Behaviors.receive[TestKitCommand] {
|
||||||
case (context, SpawnActor(name, behavior, reply, props)) =>
|
case (context, SpawnActor(name, behavior, reply, props)) =>
|
||||||
reply ! context.spawn(behavior, name, props)
|
try {
|
||||||
Behaviors.same
|
reply ! context.spawn(behavior, name, props)
|
||||||
|
Behaviors.same
|
||||||
|
} catch handleSpawnException(context, reply, props)
|
||||||
case (context, SpawnActorAnonymous(behavior, reply, props)) =>
|
case (context, SpawnActorAnonymous(behavior, reply, props)) =>
|
||||||
reply ! context.spawnAnonymous(behavior, props)
|
try {
|
||||||
Behaviors.same
|
reply ! context.spawnAnonymous(behavior, props)
|
||||||
|
Behaviors.same
|
||||||
|
} catch handleSpawnException(context, reply, props)
|
||||||
case (context, StopActor(ref, reply)) =>
|
case (context, StopActor(ref, reply)) =>
|
||||||
context.watchWith(ref, ActorStopped(reply))
|
context.watchWith(ref, ActorStopped(reply))
|
||||||
context.stop(ref)
|
context.stop(ref)
|
||||||
|
|
@ -42,6 +51,16 @@ private[akka] object ActorTestKitGuardian {
|
||||||
reply ! Ack
|
reply ! Ack
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def handleSpawnException[T](
|
||||||
|
context: ActorContext[ActorTestKitGuardian.TestKitCommand],
|
||||||
|
reply: ActorRef[ActorRef[T]],
|
||||||
|
props: Props): Catcher[Behavior[TestKitCommand]] = {
|
||||||
|
case NonFatal(e) =>
|
||||||
|
context.log.error(s"Spawn failed, props [$props]", e)
|
||||||
|
reply ! context.spawnAnonymous(Behaviors.stopped)
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,13 @@ import akka.actor.testkit.typed.scaladsl.ActorTestKit
|
||||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||||
|
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
|
||||||
import akka.actor.typed.ActorRef
|
import akka.actor.typed.ActorRef
|
||||||
import akka.actor.typed.ActorSystem
|
import akka.actor.typed.ActorSystem
|
||||||
import akka.actor.typed.Behavior
|
import akka.actor.typed.Behavior
|
||||||
import akka.actor.typed.Props
|
import akka.actor.typed.Props
|
||||||
import akka.actor.typed.SpawnProtocol
|
import akka.actor.typed.SpawnProtocol
|
||||||
|
import com.typesafe.config.Config
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import org.scalatest.wordspec.AnyWordSpecLike
|
import org.scalatest.wordspec.AnyWordSpecLike
|
||||||
|
|
||||||
|
|
@ -40,13 +42,15 @@ object DispatcherSelectorSpec {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class DispatcherSelectorSpec
|
class DispatcherSelectorSpec(config: Config)
|
||||||
extends ScalaTestWithActorTestKit(DispatcherSelectorSpec.config)
|
extends ScalaTestWithActorTestKit(config)
|
||||||
with AnyWordSpecLike
|
with AnyWordSpecLike
|
||||||
with LogCapturing {
|
with LogCapturing {
|
||||||
import DispatcherSelectorSpec.PingPong
|
import DispatcherSelectorSpec.PingPong
|
||||||
import DispatcherSelectorSpec.PingPong._
|
import DispatcherSelectorSpec.PingPong._
|
||||||
|
|
||||||
|
def this() = this(DispatcherSelectorSpec.config)
|
||||||
|
|
||||||
"DispatcherSelector" must {
|
"DispatcherSelector" must {
|
||||||
|
|
||||||
"select dispatcher from config" in {
|
"select dispatcher from config" in {
|
||||||
|
|
@ -58,6 +62,14 @@ class DispatcherSelectorSpec
|
||||||
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
|
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"detect unknown dispatcher from config" in {
|
||||||
|
val probe = createTestProbe[Pong]()
|
||||||
|
LoggingTestKit.error("Spawn failed").expect {
|
||||||
|
val ref = spawn(PingPong(), Props.empty.withDispatcherFromConfig("unknown"))
|
||||||
|
probe.expectTerminated(ref)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"select same dispatcher as parent" in {
|
"select same dispatcher as parent" in {
|
||||||
val parent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
|
val parent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
|
||||||
val childProbe = createTestProbe[ActorRef[Ping]]()
|
val childProbe = createTestProbe[ActorRef[Ping]]()
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster.typed
|
||||||
|
|
||||||
|
import akka.actor.typed.scaladsl.DispatcherSelectorSpec
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
class ClusterDispatcherSelectorSpec
|
||||||
|
extends DispatcherSelectorSpec(ConfigFactory.parseString("""
|
||||||
|
akka.actor.provider = cluster
|
||||||
|
""").withFallback(DispatcherSelectorSpec.config)) {
|
||||||
|
|
||||||
|
// same tests as in DispatcherSelectorSpec
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -372,9 +372,6 @@ private[akka] class RemoteActorRefProvider(
|
||||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async)
|
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async)
|
||||||
else {
|
else {
|
||||||
|
|
||||||
if (!system.dispatchers.hasDispatcher(props.dispatcher))
|
|
||||||
throw new ConfigurationException(s"Dispatcher [${props.dispatcher}] not configured for path $path")
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This needs to deal with “mangled” paths, which are created by remote
|
* This needs to deal with “mangled” paths, which are created by remote
|
||||||
* deployment, also in this method. The scheme is the following:
|
* deployment, also in this method. The scheme is the following:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue