diff --git a/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes new file mode 100644 index 0000000000..465b305f2b --- /dev/null +++ b/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes @@ -0,0 +1,3 @@ +#23504 compression tables +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.compress.InboundCompression$State$") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.compress.InboundCompression$State") diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 2be4123c2a..3a9073bf3e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -204,30 +204,69 @@ private[remote] final class InboundManifestCompression( */ private[remote] object InboundCompression { - object State { - def empty[T] = State( - oldTable = DecompressionTable.disabled[T], + final val KeepOldTablesNumber = 3 // TODO could be configurable + + object Tables { + def empty[T] = Tables( + oldTables = List(DecompressionTable.disabled[T]), activeTable = DecompressionTable.empty[T], nextTable = DecompressionTable.empty[T].copy(version = 1), - advertisementInProgress = None) + advertisementInProgress = None, + keepOldTables = KeepOldTablesNumber) } - final case class State[T]( - oldTable: DecompressionTable[T], + /** + * Encapsulates the various compression tables that Inbound Compression uses. + * + * @param oldTables is guaranteed to always have at-least one and at-most [[keepOldTables]] elements. + * It starts with containing only a single "disabled" table (versioned as `DecompressionTable.DisabledVersion`), + * and from there on continiously accumulates at most [[keepOldTables]] recently used tables. + */ + final case class Tables[T]( + oldTables: List[DecompressionTable[T]], activeTable: DecompressionTable[T], nextTable: DecompressionTable[T], - advertisementInProgress: Option[CompressionTable[T]]) { + advertisementInProgress: Option[CompressionTable[T]], + keepOldTables: Int) { - def startUsingNextTable(): State[T] = { + def selectTable(version: Int): OptionVal[DecompressionTable[T]] = { + if (activeTable.version == version) { + if (ArterySettings.Compression.Debug) println(s"[compress] Found table [version: ${version}], was [ACTIVE]${activeTable}") + OptionVal.Some(activeTable) + } else { + @tailrec def find(tables: List[DecompressionTable[T]]): OptionVal[DecompressionTable[T]] = { + tables match { + case Nil ⇒ OptionVal.None + case t :: tail ⇒ + if (t.version == version) OptionVal.Some(t) + else find(tail) + } + } + val found = find(oldTables) + + if (ArterySettings.Compression.Debug) { + found match { + case OptionVal.Some(t) ⇒ + println(s"[compress] Found table [version: ${version}], was [OLD][${t}], old tables: [${oldTables.map(_.version)}]") + case OptionVal.None ⇒ + println(s"[compress] Did not find table [version: ${version}], old tables: [${oldTables.map(_.version)}], activeTable: ${activeTable}, nextTable: ${nextTable}") + } + } + found + } + } + + def startUsingNextTable(): Tables[T] = { def incrementTableVersion(version: Byte): Byte = if (version == 127) 0 else (version + 1).toByte - State( - oldTable = activeTable, + Tables( + oldTables = (activeTable :: oldTables).take(keepOldTables), activeTable = nextTable, nextTable = DecompressionTable.empty[T].copy(version = incrementTableVersion(nextTable.version)), - advertisementInProgress = None) + advertisementInProgress = None, + keepOldTables = keepOldTables) } } @@ -246,7 +285,7 @@ private[remote] abstract class InboundCompression[T >: Null]( inboundContext: InboundContext, val heavyHitters: TopHeavyHitters[T]) { - private[this] var state: InboundCompression.State[T] = InboundCompression.State.empty + private[this] var tables: InboundCompression.Tables[T] = InboundCompression.Tables.empty // We should not continue sending advertisements to an association that might be dead (not quarantined yet) @volatile private[this] var alive = true @@ -270,44 +309,48 @@ private[remote] abstract class InboundCompression[T >: Null]( */ @tailrec final def decompressInternal(incomingTableVersion: Byte, idx: Int, attemptCounter: Int): OptionVal[T] = { // effectively should never loop more than once, to avoid infinite recursion blow up eagerly - if (attemptCounter > 2) throw new IllegalStateException(s"Unable to decompress $idx from table $incomingTableVersion. Internal state: ${state}") + if (attemptCounter > 2) throw new IllegalStateException(s"Unable to decompress $idx from table $incomingTableVersion. Internal tables: $tables") - val current = state - val oldVersion = current.oldTable.version + val current = tables val activeVersion = current.activeTable.version + def incomingVersionIsAdvertisementInProgress(incomingTableVersion: Byte): Boolean = + current.advertisementInProgress.isDefined && + incomingTableVersion == current.advertisementInProgress.get.version - if (incomingTableVersion == DecompressionTable.DisabledVersion) OptionVal.None // no compression, bail out early - else if (incomingTableVersion == activeVersion) { - val value: T = current.activeTable.get(idx) - if (value != null) OptionVal.Some[T](value) - else throw new UnknownCompressedIdException(idx) - } else if (incomingTableVersion == oldVersion) { - // must handle one old table due to messages in flight during advertisement - val value: T = current.oldTable.get(idx) - if (value != null) OptionVal.Some[T](value) - else throw new UnknownCompressedIdException(idx) - } else if (current.advertisementInProgress.isDefined && incomingTableVersion == current.advertisementInProgress.get.version) { - log.debug( - "Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})", - originUid, current.nextTable.version) - confirmAdvertisement(incomingTableVersion) - decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse - } else { - // which means that incoming version was > nextTable.version, which likely that - // it is using a table that was built for previous incarnation of this system - log.warning( - "Inbound message from originUid [{}] is using unknown compression table version. " + - "It may have been sent with compression table built for previous incarnation of this system. " + - "Versions activeTable: {}, nextTable: {}, incomingTable: {}", - originUid, activeVersion, current.nextTable.version, incomingTableVersion) + if (incomingTableVersion == DecompressionTable.DisabledVersion) { + // no compression, bail out early OptionVal.None + } else { + current.selectTable(version = incomingTableVersion) match { + case OptionVal.Some(selectedTable) ⇒ + val value: T = selectedTable.get(idx) + if (value != null) OptionVal.Some[T](value) + else throw new UnknownCompressedIdException(idx) + + case _ if incomingVersionIsAdvertisementInProgress(incomingTableVersion) ⇒ + log.debug( + "Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})", + originUid, current.nextTable.version) + confirmAdvertisement(incomingTableVersion) + decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse + + case _ ⇒ + // which means that incoming version was > nextTable.version, which likely that + // it is using a table that was built for previous incarnation of this system + log.warning( + "Inbound message from originUid [{}] is using unknown compression table version. " + + "It may have been sent with compression table built for previous incarnation of this system. " + + "Versions activeTable: {}, nextTable: {}, incomingTable: {}", + originUid, activeVersion, current.nextTable.version, incomingTableVersion) + OptionVal.None + } } } final def confirmAdvertisement(tableVersion: Byte): Unit = { - state.advertisementInProgress match { + tables.advertisementInProgress match { case Some(inProgress) if tableVersion == inProgress.version ⇒ - state = state.startUsingNextTable() + tables = tables.startUsingNextTable() log.debug("Confirmed compression table version [{}] for originUid [{}]", tableVersion, originUid) case Some(inProgress) if tableVersion != inProgress.version ⇒ log.debug( @@ -347,16 +390,16 @@ private[remote] abstract class InboundCompression[T >: Null]( * Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing. */ private[remote] def runNextTableAdvertisement(): Unit = { - if (ArterySettings.Compression.Debug) println(s"[compress] runNextTableAdvertisement, state = ${state}") - state.advertisementInProgress match { + if (ArterySettings.Compression.Debug) println(s"[compress] runNextTableAdvertisement, tables = $tables") + tables.advertisementInProgress match { case None ⇒ inboundContext.association(originUid) match { case OptionVal.Some(association) ⇒ if (alive) { - val table = prepareCompressionAdvertisement(state.nextTable.version) + val table = prepareCompressionAdvertisement(tables.nextTable.version) // TODO expensive, check if building the other way wouldn't be faster? - val nextState = state.copy(nextTable = table.invert, advertisementInProgress = Some(table)) - state = nextState + val nextState = tables.copy(nextTable = table.invert, advertisementInProgress = Some(table)) + tables = nextState alive = false // will be set to true on first incoming message resendCount = 0 advertiseCompressionTable(association, table)