harden AeronStreamConsistencySpec, #24190 (#24326)

This test is using the tcp ports assigned from the multi-jvm infra (classic remoting)
for the Aeron udp. Even though tcp and udp ports are independent and same port number
can be used at the same time for tcp and udp there is no guarantee that the udp port
is free just because the tcp port was.
This commit is contained in:
Patrik Nordwall 2018-01-16 16:49:28 +01:00 committed by Christopher Batey
parent accfa21731
commit 1f708550ee
4 changed files with 58 additions and 11 deletions

View file

@ -3,11 +3,15 @@
*/
package akka.remote.artery
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.Done
import akka.actor.ExtendedActorSystem
import akka.actor.Props
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
@ -17,14 +21,11 @@ import akka.stream.KillSwitches
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Source
import akka.testkit._
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import akka.actor.ExtendedActorSystem
import org.agrona.IoUtil
import java.io.File
import akka.util.ByteString
object AeronStreamConsistencySpec extends MultiNodeConfig {
val first = role("first")
@ -42,7 +43,6 @@ object AeronStreamConsistencySpec extends MultiNodeConfig {
remote.artery.enabled = off
}
""")))
}
class AeronStreamConsistencySpecMultiJvmNode1 extends AeronStreamConsistencySpec
@ -77,8 +77,10 @@ abstract class AeronStreamConsistencySpec
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
val n = node(roleName)
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
val port = expectMsgType[Int]
s"aeron:udp?endpoint=${n.address.host.get}:$port"
}
val streamId = 1
@ -94,6 +96,11 @@ abstract class AeronStreamConsistencySpec
"Message consistency of Aeron Streams" must {
"start upd port" in {
system.actorOf(Props[UdpPortActor], "updPort")
enterBarrier("udp-port-started")
}
"start echo" in {
runOn(second) {
// just echo back

View file

@ -110,8 +110,10 @@ abstract class AeronStreamLatencySpec
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
val n = node(roleName)
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
val port = expectMsgType[Int]
s"aeron:udp?endpoint=${n.address.host.get}:$port"
}
val streamId = 1
@ -304,6 +306,11 @@ abstract class AeronStreamLatencySpec
"Latency of Aeron Streams" must {
"start upd port" in {
system.actorOf(Props[UdpPortActor], "updPort")
enterBarrier("udp-port-started")
}
"start echo" in {
runOn(second) {
// just echo back

View file

@ -110,8 +110,10 @@ abstract class AeronStreamMaxThroughputSpec
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
val n = node(roleName)
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
val port = expectMsgType[Int]
s"aeron:udp?endpoint=${n.address.host.get}:$port"
}
val streamId = 1
@ -226,6 +228,11 @@ abstract class AeronStreamMaxThroughputSpec
"Max throughput of Aeron Streams" must {
"start upd port" in {
system.actorOf(Props[UdpPortActor], "updPort")
enterBarrier("udp-port-started")
}
for (s scenarios) {
s"be great for ${s.testName}, payloadSize = ${s.payloadSize}" in test(s)
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.actor.Actor
import akka.remote.RARP
import akka.testkit.SocketUtil
object UdpPortActor {
case object GetUdpPort
}
/**
* Used for exchanging free udp port between multi-jvm nodes
*/
class UdpPortActor extends Actor {
import UdpPortActor._
val port = SocketUtil.temporaryServerAddress(RARP(context.system).provider
.getDefaultAddress.host.get, udp = true).getPort
def receive = {
case GetUdpPort sender() ! port
}
}