diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index c2e1b8d92c..3148ce7d7b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -3,11 +3,15 @@ */ package akka.remote.artery +import java.io.File import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import scala.concurrent.duration._ + import akka.Done +import akka.actor.ExtendedActorSystem +import akka.actor.Props import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -17,14 +21,11 @@ import akka.stream.KillSwitches import akka.stream.ThrottleMode import akka.stream.scaladsl.Source import akka.testkit._ +import akka.util.ByteString import com.typesafe.config.ConfigFactory import io.aeron.Aeron import io.aeron.driver.MediaDriver -import akka.actor.ExtendedActorSystem import org.agrona.IoUtil -import java.io.File - -import akka.util.ByteString object AeronStreamConsistencySpec extends MultiNodeConfig { val first = role("first") @@ -42,7 +43,6 @@ object AeronStreamConsistencySpec extends MultiNodeConfig { remote.artery.enabled = off } """))) - } class AeronStreamConsistencySpecMultiJvmNode1 extends AeronStreamConsistencySpec @@ -77,8 +77,10 @@ abstract class AeronStreamConsistencySpec override def initialParticipants = roles.size def channel(roleName: RoleName) = { - val a = node(roleName).address - s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + val n = node(roleName) + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + val port = expectMsgType[Int] + s"aeron:udp?endpoint=${n.address.host.get}:$port" } val streamId = 1 @@ -94,6 +96,11 @@ abstract class AeronStreamConsistencySpec "Message consistency of Aeron Streams" must { + "start upd port" in { + system.actorOf(Props[UdpPortActor], "updPort") + enterBarrier("udp-port-started") + } + "start echo" in { runOn(second) { // just echo back diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 3d49bc222d..7e83240674 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -110,8 +110,10 @@ abstract class AeronStreamLatencySpec override def initialParticipants = roles.size def channel(roleName: RoleName) = { - val a = node(roleName).address - s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + val n = node(roleName) + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + val port = expectMsgType[Int] + s"aeron:udp?endpoint=${n.address.host.get}:$port" } val streamId = 1 @@ -304,6 +306,11 @@ abstract class AeronStreamLatencySpec "Latency of Aeron Streams" must { + "start upd port" in { + system.actorOf(Props[UdpPortActor], "updPort") + enterBarrier("udp-port-started") + } + "start echo" in { runOn(second) { // just echo back diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 2dedbc157d..02aa5f7a9f 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -110,8 +110,10 @@ abstract class AeronStreamMaxThroughputSpec override def initialParticipants = roles.size def channel(roleName: RoleName) = { - val a = node(roleName).address - s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + val n = node(roleName) + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + val port = expectMsgType[Int] + s"aeron:udp?endpoint=${n.address.host.get}:$port" } val streamId = 1 @@ -226,6 +228,11 @@ abstract class AeronStreamMaxThroughputSpec "Max throughput of Aeron Streams" must { + "start upd port" in { + system.actorOf(Props[UdpPortActor], "updPort") + enterBarrier("udp-port-started") + } + for (s ← scenarios) { s"be great for ${s.testName}, payloadSize = ${s.payloadSize}" in test(s) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/UdpPortActor.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/UdpPortActor.scala new file mode 100644 index 0000000000..a4553c4784 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/UdpPortActor.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.Actor +import akka.remote.RARP +import akka.testkit.SocketUtil + +object UdpPortActor { + case object GetUdpPort +} + +/** + * Used for exchanging free udp port between multi-jvm nodes + */ +class UdpPortActor extends Actor { + import UdpPortActor._ + + val port = SocketUtil.temporaryServerAddress(RARP(context.system).provider + .getDefaultAddress.host.get, udp = true).getPort + + def receive = { + case GetUdpPort ⇒ sender() ! port + } +}