attempt to stabilize the CompressionIntegrationSpec (#23987)
This commit is contained in:
parent
34f7b319c5
commit
3eed8e79d0
3 changed files with 54 additions and 31 deletions
|
|
@ -8,11 +8,7 @@ import akka.actor.ActorRef
|
||||||
import akka.remote.UniqueAddress
|
import akka.remote.UniqueAddress
|
||||||
import akka.remote.artery.ControlMessage
|
import akka.remote.artery.ControlMessage
|
||||||
|
|
||||||
// FIXME serialization
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
/**
|
|
||||||
* INTERNAL API
|
|
||||||
*/
|
|
||||||
private[remote] object CompressionProtocol {
|
private[remote] object CompressionProtocol {
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ private[remote] final class InboundCompressionsImpl(
|
||||||
// TODO would be nice if we can cleanup the tombstones
|
// TODO would be nice if we can cleanup the tombstones
|
||||||
// FIXME we should be able to remove the tombstones easily now
|
// FIXME we should be able to remove the tombstones easily now
|
||||||
private[this] val _actorRefsIns = new Long2ObjectHashMap[Option[InboundActorRefCompression]]()
|
private[this] val _actorRefsIns = new Long2ObjectHashMap[Option[InboundActorRefCompression]]()
|
||||||
private[this] val _inboundActorRefsLog = Logging(system, classOf[InboundManifestCompression])
|
private[this] val _inboundActorRefsLog = Logging(system, classOf[InboundActorRefCompression])
|
||||||
private val createInboundActorRefsForOrigin = new LongFunction[Option[InboundActorRefCompression]] {
|
private val createInboundActorRefsForOrigin = new LongFunction[Option[InboundActorRefCompression]] {
|
||||||
override def apply(originUid: Long): Option[InboundActorRefCompression] = {
|
override def apply(originUid: Long): Option[InboundActorRefCompression] = {
|
||||||
val actorRefHitters = new TopHeavyHitters[ActorRef](settings.ActorRefs.Max)
|
val actorRefHitters = new TopHeavyHitters[ActorRef](settings.ActorRefs.Max)
|
||||||
|
|
@ -329,7 +329,8 @@ private[remote] abstract class InboundCompression[T >: Null](
|
||||||
|
|
||||||
case _ if incomingVersionIsAdvertisementInProgress(incomingTableVersion) ⇒
|
case _ if incomingVersionIsAdvertisementInProgress(incomingTableVersion) ⇒
|
||||||
log.debug(
|
log.debug(
|
||||||
"Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})",
|
"Received first value from originUid [{}] compressed using the advertised compression table, " +
|
||||||
|
"flipping to it (version: {})",
|
||||||
originUid, current.nextTable.version)
|
originUid, current.nextTable.version)
|
||||||
confirmAdvertisement(incomingTableVersion)
|
confirmAdvertisement(incomingTableVersion)
|
||||||
decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse
|
decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse
|
||||||
|
|
@ -404,7 +405,7 @@ private[remote] abstract class InboundCompression[T >: Null](
|
||||||
resendCount = 0
|
resendCount = 0
|
||||||
advertiseCompressionTable(association, table)
|
advertiseCompressionTable(association, table)
|
||||||
} else
|
} else
|
||||||
log.debug("Inbound compression table for originUid [{}] not changed, no need to advertise same.", originUid)
|
log.debug("{} for originUid [{}] not changed, no need to advertise same.", Logging.simpleName(tables.activeTable), originUid)
|
||||||
|
|
||||||
case OptionVal.None ⇒
|
case OptionVal.None ⇒
|
||||||
// otherwise it's too early, association not ready yet.
|
// otherwise it's too early, association not ready yet.
|
||||||
|
|
@ -451,7 +452,7 @@ private[remote] abstract class InboundCompression[T >: Null](
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString =
|
override def toString =
|
||||||
s"""${getClass.getSimpleName}(countMinSketch: $cms, heavyHitters: $heavyHitters)"""
|
s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: $heavyHitters)"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -261,9 +261,13 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat
|
||||||
}
|
}
|
||||||
|
|
||||||
"wrap around" in {
|
"wrap around" in {
|
||||||
val extraConfig = """
|
val extraConfig =
|
||||||
|
"""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
|
||||||
akka.remote.artery.advanced.compression {
|
akka.remote.artery.advanced.compression {
|
||||||
actor-refs.advertisement-interval = 10 millis
|
actor-refs.advertisement-interval = 10 millis
|
||||||
|
manifests.advertisement-interval = 10 minutes
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -280,35 +284,57 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat
|
||||||
expectMsgType[ActorIdentity].ref.get
|
expectMsgType[ActorIdentity].ref.get
|
||||||
}
|
}
|
||||||
|
|
||||||
val maxTableVersions = 130 // so table version wraps around at least once
|
var seenTableVersions = List.empty[Int]
|
||||||
var lastVersion = 0
|
// iterate from 2, since our assertion wants the locally created actor to be included in the table
|
||||||
|
// which will only happen in the 2nd advertisement the earliest.
|
||||||
|
val upToNTablesAcceptedAfterWrap = 6
|
||||||
|
var remainingExpectedTableVersions = (Iterator.from(2).take(126) ++ Iterator.from(0).take(upToNTablesAcceptedAfterWrap + 1)).toList
|
||||||
|
|
||||||
|
// so table version wraps around at least once
|
||||||
var lastTable: CompressionTable[ActorRef] = null
|
var lastTable: CompressionTable[ActorRef] = null
|
||||||
var allRefs: List[ActorRef] = Nil
|
var allRefs: List[ActorRef] = Nil
|
||||||
|
|
||||||
for (iteration ← 1 to maxTableVersions) {
|
within(3.minutes) {
|
||||||
val echoWrap = createAndIdentify(iteration) // create a different actor for every iteration
|
var iteration = 0
|
||||||
allRefs ::= echoWrap
|
while (remainingExpectedTableVersions.nonEmpty) {
|
||||||
|
iteration += 1
|
||||||
|
val echoWrap = createAndIdentify(iteration) // create a different actor for every iteration
|
||||||
|
allRefs ::= echoWrap
|
||||||
|
|
||||||
// cause echo to become a heavy hitter
|
// cause echo to become a heavy hitter
|
||||||
(1 to messagesToExchange).foreach { i ⇒ echoWrap ! TestMessage("hello") }
|
(1 to messagesToExchange).foreach { i ⇒ echoWrap ! TestMessage("hello") }
|
||||||
receiveN(messagesToExchange) // the replies
|
receiveN(messagesToExchange) // the replies
|
||||||
|
|
||||||
// discard duplicates with awaitAssert until we receive next version
|
var currentTable: CompressionTable[ActorRef] = null
|
||||||
var currentTable: CompressionTable[ActorRef] = null
|
receivedActorRefCompressionTableProbe.awaitAssert({
|
||||||
receivedActorRefCompressionTableProbe.awaitAssert {
|
// discard duplicates with awaitAssert until we receive next version
|
||||||
currentTable =
|
val receivedActorRefCompressionTable =
|
||||||
receivedActorRefCompressionTableProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds).table
|
receivedActorRefCompressionTableProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](10.seconds)
|
||||||
// Until we get a new version, discard duplicates or old advertisements.
|
|
||||||
// Please note that we might not get the advertisements in order
|
currentTable = receivedActorRefCompressionTable.table
|
||||||
allRefs.forall(ref ⇒ currentTable.dictionary.contains(ref)) should be(true)
|
seenTableVersions = currentTable.version :: seenTableVersions
|
||||||
|
}, max = 10.seconds)
|
||||||
|
|
||||||
|
// debugging: info("Seen versions: " + seenTableVersions)
|
||||||
|
lastTable = currentTable
|
||||||
|
|
||||||
|
// distance between delivered versions, must not be greater than 2
|
||||||
|
// - this allows for some "wiggle room" for redeliveries
|
||||||
|
// - this is linked with the number of old tables we keep around in the impl, see `keepOldTables`
|
||||||
|
(((currentTable.version - lastTable.version) & 127) <= 2) should be(true)
|
||||||
|
|
||||||
|
def removeFirst(l: List[Int], it: Int): List[Int] = l match {
|
||||||
|
case Nil ⇒ Nil
|
||||||
|
case `it` :: tail ⇒ tail
|
||||||
|
case other :: tail ⇒ other :: removeFirst(tail, it)
|
||||||
|
}
|
||||||
|
|
||||||
|
remainingExpectedTableVersions = removeFirst(remainingExpectedTableVersions, lastTable.version)
|
||||||
}
|
}
|
||||||
currentTable.version should !==(lastVersion)
|
|
||||||
lastTable = currentTable
|
|
||||||
(((currentTable.version - lastTable.version) & 0x7F) <= 2) should be(true)
|
|
||||||
lastVersion = lastTable.version
|
|
||||||
}
|
|
||||||
|
|
||||||
lastTable.version.toInt should be < (128)
|
remainingExpectedTableVersions should be('empty)
|
||||||
|
lastTable.version.toInt should be <= upToNTablesAcceptedAfterWrap // definitely, since we expected to wrap around and start from 0 again
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue