From 1f708550eeaa72ddaf1933467e29e2cf66a38f18 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 16 Jan 2018 16:49:28 +0100 Subject: [PATCH] harden AeronStreamConsistencySpec, #24190 (#24326) This test is using the tcp ports assigned from the multi-jvm infra (classic remoting) for the Aeron udp. Even though tcp and udp ports are independent and same port number can be used at the same time for tcp and udp there is no guarantee that the udp port is free just because the tcp port was. --- .../artery/AeronStreamConcistencySpec.scala | 21 ++++++++++----- .../artery/AeronStreamLatencySpec.scala | 11 ++++++-- .../artery/AeronStreamMaxThroughputSpec.scala | 11 ++++++-- .../akka/remote/artery/UdpPortActor.scala | 26 +++++++++++++++++++ 4 files changed, 58 insertions(+), 11 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/UdpPortActor.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index c2e1b8d92c..3148ce7d7b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -3,11 +3,15 @@ */ package akka.remote.artery +import java.io.File import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import scala.concurrent.duration._ + import akka.Done +import akka.actor.ExtendedActorSystem +import akka.actor.Props import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -17,14 +21,11 @@ import akka.stream.KillSwitches import akka.stream.ThrottleMode import akka.stream.scaladsl.Source import akka.testkit._ +import akka.util.ByteString import com.typesafe.config.ConfigFactory import io.aeron.Aeron import io.aeron.driver.MediaDriver -import akka.actor.ExtendedActorSystem import org.agrona.IoUtil -import java.io.File - -import akka.util.ByteString object AeronStreamConsistencySpec extends MultiNodeConfig { val first = role("first") @@ -42,7 +43,6 @@ object AeronStreamConsistencySpec extends MultiNodeConfig { remote.artery.enabled = off } """))) - } class AeronStreamConsistencySpecMultiJvmNode1 extends AeronStreamConsistencySpec @@ -77,8 +77,10 @@ abstract class AeronStreamConsistencySpec override def initialParticipants = roles.size def channel(roleName: RoleName) = { - val a = node(roleName).address - s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + val n = node(roleName) + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + val port = expectMsgType[Int] + s"aeron:udp?endpoint=${n.address.host.get}:$port" } val streamId = 1 @@ -94,6 +96,11 @@ abstract class AeronStreamConsistencySpec "Message consistency of Aeron Streams" must { + "start upd port" in { + system.actorOf(Props[UdpPortActor], "updPort") + enterBarrier("udp-port-started") + } + "start echo" in { runOn(second) { // just echo back diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 3d49bc222d..7e83240674 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -110,8 +110,10 @@ abstract class AeronStreamLatencySpec override def initialParticipants = roles.size def channel(roleName: RoleName) = { - val a = node(roleName).address - s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + val n = node(roleName) + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + val port = expectMsgType[Int] + s"aeron:udp?endpoint=${n.address.host.get}:$port" } val streamId = 1 @@ -304,6 +306,11 @@ abstract class AeronStreamLatencySpec "Latency of Aeron Streams" must { + "start upd port" in { + system.actorOf(Props[UdpPortActor], "updPort") + enterBarrier("udp-port-started") + } + "start echo" in { runOn(second) { // just echo back diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 2dedbc157d..02aa5f7a9f 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -110,8 +110,10 @@ abstract class AeronStreamMaxThroughputSpec override def initialParticipants = roles.size def channel(roleName: RoleName) = { - val a = node(roleName).address - s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + val n = node(roleName) + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + val port = expectMsgType[Int] + s"aeron:udp?endpoint=${n.address.host.get}:$port" } val streamId = 1 @@ -226,6 +228,11 @@ abstract class AeronStreamMaxThroughputSpec "Max throughput of Aeron Streams" must { + "start upd port" in { + system.actorOf(Props[UdpPortActor], "updPort") + enterBarrier("udp-port-started") + } + for (s ← scenarios) { s"be great for ${s.testName}, payloadSize = ${s.payloadSize}" in test(s) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/UdpPortActor.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/UdpPortActor.scala new file mode 100644 index 0000000000..a4553c4784 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/UdpPortActor.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.Actor +import akka.remote.RARP +import akka.testkit.SocketUtil + +object UdpPortActor { + case object GetUdpPort +} + +/** + * Used for exchanging free udp port between multi-jvm nodes + */ +class UdpPortActor extends Actor { + import UdpPortActor._ + + val port = SocketUtil.temporaryServerAddress(RARP(context.system).provider + .getDefaultAddress.host.get, udp = true).getPort + + def receive = { + case GetUdpPort ⇒ sender() ! port + } +}