diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala index c0ff1853e6..545708fc60 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala @@ -11,8 +11,6 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory -import akka.actor.ActorIdentity -import akka.actor.Identify import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.Behavior @@ -22,7 +20,7 @@ import akka.actor.typed.delivery.ProducerController import akka.actor.typed.delivery.WorkPullingProducerController import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.scaladsl.LoggerOps import akka.cluster.MultiNodeClusterSpec import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.ClusterSharding @@ -32,12 +30,10 @@ import akka.cluster.typed.MultiNodeTypedClusterSpec import akka.remote.artery.BenchmarkFileReporter import akka.remote.artery.PlotResult import akka.remote.artery.TestRateReporter -import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.PerfFlamesSupport import akka.serialization.jackson.CborSerializable -import akka.actor.typed.scaladsl.LoggerOps object DeliveryThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -324,12 +320,6 @@ abstract class DeliveryThroughputSpec super.afterAll() } - def identify(name: String, r: RoleName): akka.actor.ActorRef = { - val sel = system.actorSelection(node(r) / "user" / name) - sel.tell(Identify(None), testActor) - expectMsgType[ActorIdentity](10.seconds).ref.get - } - private val settingsToReport = List( "akka.test.DeliveryThroughputSpec.totalMessagesFactor", "akka.reliable-delivery.consumer-controller.flow-control-window") @@ -341,8 +331,8 @@ abstract class DeliveryThroughputSpec runPerfFlames(first, second)(delay = 5.seconds) runOn(second) { - val consumerController = system.spawn(ConsumerController[Consumer.Command](), s"consumerController-$testName") - val consumer = system.spawn(Consumer(consumerController), s"consumer-$testName") + val consumerController = spawn(ConsumerController[Consumer.Command](), s"consumerController-$testName") + val consumer = spawn(Consumer(consumerController), s"consumer-$testName") enterBarrier(testName + "-consumer-started") enterBarrier(testName + "-done") consumer ! Consumer.Stop @@ -350,14 +340,13 @@ abstract class DeliveryThroughputSpec runOn(first) { enterBarrier(testName + "-consumer-started") - val consumerController = - identify(s"consumerController-$testName", second).toTyped[ConsumerController.Command[Consumer.Command]] + val consumerController = identify(s"consumerController-$testName", second) val plotProbe = TestProbe[PlotResult]() - val producerController = system.spawn( + val producerController = spawn( ProducerController[Consumer.Command](testName, durableQueueBehavior = None), s"producerController-$testName") val producer = - system.spawn(Producer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName") + spawn(Producer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName") producerController ! ProducerController.RegisterConsumer(consumerController) producer ! Producer.Run val terminationProbe = TestProbe() @@ -384,8 +373,8 @@ abstract class DeliveryThroughputSpec val range = if (myself == second) (1 to numberOfConsumers by 2) else (2 to numberOfConsumers by 2) val consumers = range.map { n => val consumerController = - system.spawn(ConsumerController[Consumer.Command](serviceKey(testName)), s"consumerController-$n-$testName") - system.spawn(Consumer(consumerController), s"consumer-$n-$testName") + spawn(ConsumerController[Consumer.Command](serviceKey(testName)), s"consumerController-$n-$testName") + spawn(Consumer(consumerController), s"consumer-$n-$testName") } enterBarrier(testName + "-consumer-started") enterBarrier(testName + "-done") @@ -395,11 +384,11 @@ abstract class DeliveryThroughputSpec runOn(first) { enterBarrier(testName + "-consumer-started") val plotProbe = TestProbe[PlotResult]() - val producerController = system.spawn( + val producerController = spawn( WorkPullingProducerController[Consumer.Command](testName, serviceKey(testName), durableQueueBehavior = None), s"producerController-$testName") val producer = - system.spawn( + spawn( WorkPullingProducer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName") producer ! WorkPullingProducer.Run @@ -424,13 +413,11 @@ abstract class DeliveryThroughputSpec runOn(first) { val plotProbe = TestProbe[PlotResult]() - val producerController = system.spawn( + val producerController = spawn( ShardingProducerController[Consumer.Command](testName, region, durableQueueBehavior = None), s"producerController-$testName") val producer = - system.spawn( - ShardingProducer(producerController, testSettings, plotProbe.ref, resultReporter), - s"producer-$testName") + spawn(ShardingProducer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName") producer ! ShardingProducer.Run val terminationProbe = TestProbe() terminationProbe.expectTerminated(producer, 1.minute) diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala index 548a94769b..3187683de2 100644 --- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala @@ -10,8 +10,6 @@ import scala.util.Random import com.typesafe.config.ConfigFactory import org.HdrHistogram.Histogram -import akka.actor.ActorIdentity -import akka.actor.Identify import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.Behavior @@ -20,7 +18,6 @@ import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ProducerController import akka.actor.typed.scaladsl.Behaviors import akka.cluster.MultiNodeClusterSpec -import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.serialization.jackson.CborSerializable @@ -144,14 +141,6 @@ class ChunkLargeMessageMultiJvmNode2 extends ChunkLargeMessageSpec abstract class ChunkLargeMessageSpec extends MultiNodeSpec(ChunkLargeMessageSpec) with MultiNodeTypedClusterSpec { import ChunkLargeMessageSpec._ - // TODO move this to MultiNodeTypedClusterSpec - def identify[A](name: String, r: RoleName): ActorRef[A] = { - import akka.actor.typed.scaladsl.adapter._ - val sel = system.actorSelection(node(r) / "user" / "testSpawn" / name) - sel.tell(Identify(None), testActor) - expectMsgType[ActorIdentity](10.seconds).ref.get.toTyped - } - private def test(n: Int, numberOfMessages: Int, includeLarge: Boolean): Unit = { runOn(first) { val producerController = spawn(ProducerController[Consumer.TheMessage](s"p$n", None), s"producerController$n") diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala index f1fdd516a1..5b8a710f2e 100644 --- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala @@ -14,6 +14,7 @@ import scala.language.implicitConversions import org.scalatest.Suite import org.scalatest.matchers.should.Matchers +import akka.actor.ActorIdentity import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -22,6 +23,7 @@ import akka.actor.typed.SpawnProtocol import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.adapter._ import akka.actor.Address +import akka.actor.Identify import akka.actor.Scheduler import akka.cluster.ClusterEvent import akka.cluster.MemberStatus @@ -96,4 +98,11 @@ trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedB Await.result(f, timeout.duration * 2) } + def identify[A](name: String, r: RoleName): ActorRef[A] = { + import akka.actor.typed.scaladsl.adapter._ + val sel = system.actorSelection(node(r) / "user" / "testSpawn" / name) + sel.tell(Identify(None), testActor) + expectMsgType[ActorIdentity].ref.get.toTyped + } + }