diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index 655b8f7401..e59e790918 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -268,8 +268,10 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat val systemWrap = newRemoteSystem(extraConfig = Some(extraConfig)) - val wrapRefProbe = TestProbe()(systemWrap) - system.eventStream.subscribe(wrapRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable]) + val receivedActorRefCompressionTableProbe = TestProbe()(system) + system.eventStream.subscribe( + receivedActorRefCompressionTableProbe.ref, + classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable]) def createAndIdentify(i: Int) = { val echoWrap = systemWrap.actorOf(TestActors.echoActorProps, s"echo_$i") @@ -278,32 +280,28 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat } val maxTableVersions = 130 // so table version wraps around at least once - val maxDuplicateTables = 40 // max duplicate tables that will not fail the test - var tableVersionsSeen = 0 var lastTableVersion = 0 - var iteration = 0 - - while (tableVersionsSeen < maxTableVersions) { - iteration += 1 - if (iteration - maxTableVersions > maxDuplicateTables) { - throw new Error("Too much duplicate tables. Giving up on the test.") - } + for (iteration ← 1 to maxTableVersions) { val echoWrap = createAndIdentify(iteration) // create a different actor for every iteration // cause echo to become a heavy hitter (1 to messagesToExchange).foreach { i ⇒ echoWrap ! TestMessage("hello") } receiveN(messagesToExchange) // the replies - // on system A side - val a1 = wrapRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds) - val currentTableVersion = a1.table.version.toInt - - if (currentTableVersion != lastTableVersion) { // if we get a new table + receivedActorRefCompressionTableProbe.within(5.seconds) { + var currentTableVersion = -1 + // discard duplicates with awaitAssert until we receive next version + receivedActorRefCompressionTableProbe.awaitAssert { + val a1 = receivedActorRefCompressionTableProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds) + currentTableVersion = a1.table.version.toInt + // until we get next version, discard duplicates + currentTableVersion should !==(lastTableVersion) + } lastTableVersion = currentTableVersion - tableVersionsSeen += 1 + currentTableVersion should ===(iteration & 0x7F) } - currentTableVersion should ===(tableVersionsSeen & 0x7F) + } }