move identify to /MultiNodeTypedClusterSpec (#29821)

* just a small cleanup
This commit is contained in:
Patrik Nordwall 2020-11-16 13:34:18 +01:00 committed by GitHub
parent 02fc67f6a1
commit 0be3712694
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 36 deletions

View file

@ -11,8 +11,6 @@ import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
@ -22,7 +20,7 @@ import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.delivery.WorkPullingProducerController import akka.actor.typed.delivery.WorkPullingProducerController
import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.LoggerOps
import akka.cluster.MultiNodeClusterSpec import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.ClusterSharding
@ -32,12 +30,10 @@ import akka.cluster.typed.MultiNodeTypedClusterSpec
import akka.remote.artery.BenchmarkFileReporter import akka.remote.artery.BenchmarkFileReporter
import akka.remote.artery.PlotResult import akka.remote.artery.PlotResult
import akka.remote.artery.TestRateReporter import akka.remote.artery.TestRateReporter
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.PerfFlamesSupport import akka.remote.testkit.PerfFlamesSupport
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.actor.typed.scaladsl.LoggerOps
object DeliveryThroughputSpec extends MultiNodeConfig { object DeliveryThroughputSpec extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -324,12 +320,6 @@ abstract class DeliveryThroughputSpec
super.afterAll() super.afterAll()
} }
def identify(name: String, r: RoleName): akka.actor.ActorRef = {
val sel = system.actorSelection(node(r) / "user" / name)
sel.tell(Identify(None), testActor)
expectMsgType[ActorIdentity](10.seconds).ref.get
}
private val settingsToReport = List( private val settingsToReport = List(
"akka.test.DeliveryThroughputSpec.totalMessagesFactor", "akka.test.DeliveryThroughputSpec.totalMessagesFactor",
"akka.reliable-delivery.consumer-controller.flow-control-window") "akka.reliable-delivery.consumer-controller.flow-control-window")
@ -341,8 +331,8 @@ abstract class DeliveryThroughputSpec
runPerfFlames(first, second)(delay = 5.seconds) runPerfFlames(first, second)(delay = 5.seconds)
runOn(second) { runOn(second) {
val consumerController = system.spawn(ConsumerController[Consumer.Command](), s"consumerController-$testName") val consumerController = spawn(ConsumerController[Consumer.Command](), s"consumerController-$testName")
val consumer = system.spawn(Consumer(consumerController), s"consumer-$testName") val consumer = spawn(Consumer(consumerController), s"consumer-$testName")
enterBarrier(testName + "-consumer-started") enterBarrier(testName + "-consumer-started")
enterBarrier(testName + "-done") enterBarrier(testName + "-done")
consumer ! Consumer.Stop consumer ! Consumer.Stop
@ -350,14 +340,13 @@ abstract class DeliveryThroughputSpec
runOn(first) { runOn(first) {
enterBarrier(testName + "-consumer-started") enterBarrier(testName + "-consumer-started")
val consumerController = val consumerController = identify(s"consumerController-$testName", second)
identify(s"consumerController-$testName", second).toTyped[ConsumerController.Command[Consumer.Command]]
val plotProbe = TestProbe[PlotResult]() val plotProbe = TestProbe[PlotResult]()
val producerController = system.spawn( val producerController = spawn(
ProducerController[Consumer.Command](testName, durableQueueBehavior = None), ProducerController[Consumer.Command](testName, durableQueueBehavior = None),
s"producerController-$testName") s"producerController-$testName")
val producer = val producer =
system.spawn(Producer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName") spawn(Producer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName")
producerController ! ProducerController.RegisterConsumer(consumerController) producerController ! ProducerController.RegisterConsumer(consumerController)
producer ! Producer.Run producer ! Producer.Run
val terminationProbe = TestProbe() val terminationProbe = TestProbe()
@ -384,8 +373,8 @@ abstract class DeliveryThroughputSpec
val range = if (myself == second) (1 to numberOfConsumers by 2) else (2 to numberOfConsumers by 2) val range = if (myself == second) (1 to numberOfConsumers by 2) else (2 to numberOfConsumers by 2)
val consumers = range.map { n => val consumers = range.map { n =>
val consumerController = val consumerController =
system.spawn(ConsumerController[Consumer.Command](serviceKey(testName)), s"consumerController-$n-$testName") spawn(ConsumerController[Consumer.Command](serviceKey(testName)), s"consumerController-$n-$testName")
system.spawn(Consumer(consumerController), s"consumer-$n-$testName") spawn(Consumer(consumerController), s"consumer-$n-$testName")
} }
enterBarrier(testName + "-consumer-started") enterBarrier(testName + "-consumer-started")
enterBarrier(testName + "-done") enterBarrier(testName + "-done")
@ -395,11 +384,11 @@ abstract class DeliveryThroughputSpec
runOn(first) { runOn(first) {
enterBarrier(testName + "-consumer-started") enterBarrier(testName + "-consumer-started")
val plotProbe = TestProbe[PlotResult]() val plotProbe = TestProbe[PlotResult]()
val producerController = system.spawn( val producerController = spawn(
WorkPullingProducerController[Consumer.Command](testName, serviceKey(testName), durableQueueBehavior = None), WorkPullingProducerController[Consumer.Command](testName, serviceKey(testName), durableQueueBehavior = None),
s"producerController-$testName") s"producerController-$testName")
val producer = val producer =
system.spawn( spawn(
WorkPullingProducer(producerController, testSettings, plotProbe.ref, resultReporter), WorkPullingProducer(producerController, testSettings, plotProbe.ref, resultReporter),
s"producer-$testName") s"producer-$testName")
producer ! WorkPullingProducer.Run producer ! WorkPullingProducer.Run
@ -424,13 +413,11 @@ abstract class DeliveryThroughputSpec
runOn(first) { runOn(first) {
val plotProbe = TestProbe[PlotResult]() val plotProbe = TestProbe[PlotResult]()
val producerController = system.spawn( val producerController = spawn(
ShardingProducerController[Consumer.Command](testName, region, durableQueueBehavior = None), ShardingProducerController[Consumer.Command](testName, region, durableQueueBehavior = None),
s"producerController-$testName") s"producerController-$testName")
val producer = val producer =
system.spawn( spawn(ShardingProducer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName")
ShardingProducer(producerController, testSettings, plotProbe.ref, resultReporter),
s"producer-$testName")
producer ! ShardingProducer.Run producer ! ShardingProducer.Run
val terminationProbe = TestProbe() val terminationProbe = TestProbe()
terminationProbe.expectTerminated(producer, 1.minute) terminationProbe.expectTerminated(producer, 1.minute)

View file

@ -10,8 +10,6 @@ import scala.util.Random
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.HdrHistogram.Histogram import org.HdrHistogram.Histogram
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
@ -20,7 +18,6 @@ import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.ProducerController import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MultiNodeClusterSpec import akka.cluster.MultiNodeClusterSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
@ -144,14 +141,6 @@ class ChunkLargeMessageMultiJvmNode2 extends ChunkLargeMessageSpec
abstract class ChunkLargeMessageSpec extends MultiNodeSpec(ChunkLargeMessageSpec) with MultiNodeTypedClusterSpec { abstract class ChunkLargeMessageSpec extends MultiNodeSpec(ChunkLargeMessageSpec) with MultiNodeTypedClusterSpec {
import ChunkLargeMessageSpec._ import ChunkLargeMessageSpec._
// TODO move this to MultiNodeTypedClusterSpec
def identify[A](name: String, r: RoleName): ActorRef[A] = {
import akka.actor.typed.scaladsl.adapter._
val sel = system.actorSelection(node(r) / "user" / "testSpawn" / name)
sel.tell(Identify(None), testActor)
expectMsgType[ActorIdentity](10.seconds).ref.get.toTyped
}
private def test(n: Int, numberOfMessages: Int, includeLarge: Boolean): Unit = { private def test(n: Int, numberOfMessages: Int, includeLarge: Boolean): Unit = {
runOn(first) { runOn(first) {
val producerController = spawn(ProducerController[Consumer.TheMessage](s"p$n", None), s"producerController$n") val producerController = spawn(ProducerController[Consumer.TheMessage](s"p$n", None), s"producerController$n")

View file

@ -14,6 +14,7 @@ import scala.language.implicitConversions
import org.scalatest.Suite import org.scalatest.Suite
import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers
import akka.actor.ActorIdentity
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
@ -22,6 +23,7 @@ import akka.actor.typed.SpawnProtocol
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.actor.Address import akka.actor.Address
import akka.actor.Identify
import akka.actor.Scheduler import akka.actor.Scheduler
import akka.cluster.ClusterEvent import akka.cluster.ClusterEvent
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
@ -96,4 +98,11 @@ trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedB
Await.result(f, timeout.duration * 2) Await.result(f, timeout.duration * 2)
} }
def identify[A](name: String, r: RoleName): ActorRef[A] = {
import akka.actor.typed.scaladsl.adapter._
val sel = system.actorSelection(node(r) / "user" / "testSpawn" / name)
sel.tell(Identify(None), testActor)
expectMsgType[ActorIdentity].ref.get.toTyped
}
} }