diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala index 5ed0ee2975..b157b9c45e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala @@ -8,11 +8,7 @@ import akka.actor.ActorRef import akka.remote.UniqueAddress import akka.remote.artery.ControlMessage -// FIXME serialization /** INTERNAL API */ -/** - * INTERNAL API - */ private[remote] object CompressionProtocol { /** INTERNAL API */ diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 3a9073bf3e..80cc0b5dc2 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -58,7 +58,7 @@ private[remote] final class InboundCompressionsImpl( // TODO would be nice if we can cleanup the tombstones // FIXME we should be able to remove the tombstones easily now private[this] val _actorRefsIns = new Long2ObjectHashMap[Option[InboundActorRefCompression]]() - private[this] val _inboundActorRefsLog = Logging(system, classOf[InboundManifestCompression]) + private[this] val _inboundActorRefsLog = Logging(system, classOf[InboundActorRefCompression]) private val createInboundActorRefsForOrigin = new LongFunction[Option[InboundActorRefCompression]] { override def apply(originUid: Long): Option[InboundActorRefCompression] = { val actorRefHitters = new TopHeavyHitters[ActorRef](settings.ActorRefs.Max) @@ -329,7 +329,8 @@ private[remote] abstract class InboundCompression[T >: Null]( case _ if incomingVersionIsAdvertisementInProgress(incomingTableVersion) ⇒ log.debug( - "Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})", + "Received first value from originUid [{}] compressed using the advertised compression table, " + + "flipping to it (version: {})", originUid, current.nextTable.version) confirmAdvertisement(incomingTableVersion) decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse @@ -404,7 +405,7 @@ private[remote] abstract class InboundCompression[T >: Null]( resendCount = 0 advertiseCompressionTable(association, table) } else - log.debug("Inbound compression table for originUid [{}] not changed, no need to advertise same.", originUid) + log.debug("{} for originUid [{}] not changed, no need to advertise same.", Logging.simpleName(tables.activeTable), originUid) case OptionVal.None ⇒ // otherwise it's too early, association not ready yet. @@ -451,7 +452,7 @@ private[remote] abstract class InboundCompression[T >: Null]( } override def toString = - s"""${getClass.getSimpleName}(countMinSketch: $cms, heavyHitters: $heavyHitters)""" + s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: $heavyHitters)""" } diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index c14a6b48fc..87eaa2df2a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -261,9 +261,13 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat } "wrap around" in { - val extraConfig = """ + val extraConfig = + """ + akka.loglevel = INFO + akka.remote.artery.advanced.compression { actor-refs.advertisement-interval = 10 millis + manifests.advertisement-interval = 10 minutes } """ @@ -280,35 +284,57 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat expectMsgType[ActorIdentity].ref.get } - val maxTableVersions = 130 // so table version wraps around at least once - var lastVersion = 0 + var seenTableVersions = List.empty[Int] + // iterate from 2, since our assertion wants the locally created actor to be included in the table + // which will only happen in the 2nd advertisement the earliest. + val upToNTablesAcceptedAfterWrap = 6 + var remainingExpectedTableVersions = (Iterator.from(2).take(126) ++ Iterator.from(0).take(upToNTablesAcceptedAfterWrap + 1)).toList + + // so table version wraps around at least once var lastTable: CompressionTable[ActorRef] = null var allRefs: List[ActorRef] = Nil - for (iteration ← 1 to maxTableVersions) { - val echoWrap = createAndIdentify(iteration) // create a different actor for every iteration - allRefs ::= echoWrap + within(3.minutes) { + var iteration = 0 + while (remainingExpectedTableVersions.nonEmpty) { + iteration += 1 + val echoWrap = createAndIdentify(iteration) // create a different actor for every iteration + allRefs ::= echoWrap - // cause echo to become a heavy hitter - (1 to messagesToExchange).foreach { i ⇒ echoWrap ! TestMessage("hello") } - receiveN(messagesToExchange) // the replies + // cause echo to become a heavy hitter + (1 to messagesToExchange).foreach { i ⇒ echoWrap ! TestMessage("hello") } + receiveN(messagesToExchange) // the replies - // discard duplicates with awaitAssert until we receive next version - var currentTable: CompressionTable[ActorRef] = null - receivedActorRefCompressionTableProbe.awaitAssert { - currentTable = - receivedActorRefCompressionTableProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds).table - // Until we get a new version, discard duplicates or old advertisements. - // Please note that we might not get the advertisements in order - allRefs.forall(ref ⇒ currentTable.dictionary.contains(ref)) should be(true) + var currentTable: CompressionTable[ActorRef] = null + receivedActorRefCompressionTableProbe.awaitAssert({ + // discard duplicates with awaitAssert until we receive next version + val receivedActorRefCompressionTable = + receivedActorRefCompressionTableProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](10.seconds) + + currentTable = receivedActorRefCompressionTable.table + seenTableVersions = currentTable.version :: seenTableVersions + }, max = 10.seconds) + + // debugging: info("Seen versions: " + seenTableVersions) + lastTable = currentTable + + // distance between delivered versions, must not be greater than 2 + // - this allows for some "wiggle room" for redeliveries + // - this is linked with the number of old tables we keep around in the impl, see `keepOldTables` + (((currentTable.version - lastTable.version) & 127) <= 2) should be(true) + + def removeFirst(l: List[Int], it: Int): List[Int] = l match { + case Nil ⇒ Nil + case `it` :: tail ⇒ tail + case other :: tail ⇒ other :: removeFirst(tail, it) + } + + remainingExpectedTableVersions = removeFirst(remainingExpectedTableVersions, lastTable.version) } - currentTable.version should !==(lastVersion) - lastTable = currentTable - (((currentTable.version - lastTable.version) & 0x7F) <= 2) should be(true) - lastVersion = lastTable.version - } - lastTable.version.toInt should be < (128) + remainingExpectedTableVersions should be('empty) + lastTable.version.toInt should be <= upToNTablesAcceptedAfterWrap // definitely, since we expected to wrap around and start from 0 again + } } }