Merge pull request #23546 from akka/wip-23504-compression-patriknw
=art #23504 Keep more old decompression tables (3)
This commit is contained in:
commit
369763f065
2 changed files with 92 additions and 46 deletions
|
|
@ -0,0 +1,3 @@
|
|||
#23504 compression tables
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.compress.InboundCompression$State$")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.compress.InboundCompression$State")
|
||||
|
|
@ -204,30 +204,69 @@ private[remote] final class InboundManifestCompression(
|
|||
*/
|
||||
private[remote] object InboundCompression {
|
||||
|
||||
object State {
|
||||
def empty[T] = State(
|
||||
oldTable = DecompressionTable.disabled[T],
|
||||
final val KeepOldTablesNumber = 3 // TODO could be configurable
|
||||
|
||||
object Tables {
|
||||
def empty[T] = Tables(
|
||||
oldTables = List(DecompressionTable.disabled[T]),
|
||||
activeTable = DecompressionTable.empty[T],
|
||||
nextTable = DecompressionTable.empty[T].copy(version = 1),
|
||||
advertisementInProgress = None)
|
||||
advertisementInProgress = None,
|
||||
keepOldTables = KeepOldTablesNumber)
|
||||
}
|
||||
|
||||
final case class State[T](
|
||||
oldTable: DecompressionTable[T],
|
||||
/**
|
||||
* Encapsulates the various compression tables that Inbound Compression uses.
|
||||
*
|
||||
* @param oldTables is guaranteed to always have at-least one and at-most [[keepOldTables]] elements.
|
||||
* It starts with containing only a single "disabled" table (versioned as `DecompressionTable.DisabledVersion`),
|
||||
* and from there on continiously accumulates at most [[keepOldTables]] recently used tables.
|
||||
*/
|
||||
final case class Tables[T](
|
||||
oldTables: List[DecompressionTable[T]],
|
||||
activeTable: DecompressionTable[T],
|
||||
nextTable: DecompressionTable[T],
|
||||
advertisementInProgress: Option[CompressionTable[T]]) {
|
||||
advertisementInProgress: Option[CompressionTable[T]],
|
||||
keepOldTables: Int) {
|
||||
|
||||
def startUsingNextTable(): State[T] = {
|
||||
def selectTable(version: Int): OptionVal[DecompressionTable[T]] = {
|
||||
if (activeTable.version == version) {
|
||||
if (ArterySettings.Compression.Debug) println(s"[compress] Found table [version: ${version}], was [ACTIVE]${activeTable}")
|
||||
OptionVal.Some(activeTable)
|
||||
} else {
|
||||
@tailrec def find(tables: List[DecompressionTable[T]]): OptionVal[DecompressionTable[T]] = {
|
||||
tables match {
|
||||
case Nil ⇒ OptionVal.None
|
||||
case t :: tail ⇒
|
||||
if (t.version == version) OptionVal.Some(t)
|
||||
else find(tail)
|
||||
}
|
||||
}
|
||||
val found = find(oldTables)
|
||||
|
||||
if (ArterySettings.Compression.Debug) {
|
||||
found match {
|
||||
case OptionVal.Some(t) ⇒
|
||||
println(s"[compress] Found table [version: ${version}], was [OLD][${t}], old tables: [${oldTables.map(_.version)}]")
|
||||
case OptionVal.None ⇒
|
||||
println(s"[compress] Did not find table [version: ${version}], old tables: [${oldTables.map(_.version)}], activeTable: ${activeTable}, nextTable: ${nextTable}")
|
||||
}
|
||||
}
|
||||
found
|
||||
}
|
||||
}
|
||||
|
||||
def startUsingNextTable(): Tables[T] = {
|
||||
def incrementTableVersion(version: Byte): Byte =
|
||||
if (version == 127) 0
|
||||
else (version + 1).toByte
|
||||
|
||||
State(
|
||||
oldTable = activeTable,
|
||||
Tables(
|
||||
oldTables = (activeTable :: oldTables).take(keepOldTables),
|
||||
activeTable = nextTable,
|
||||
nextTable = DecompressionTable.empty[T].copy(version = incrementTableVersion(nextTable.version)),
|
||||
advertisementInProgress = None)
|
||||
advertisementInProgress = None,
|
||||
keepOldTables = keepOldTables)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -246,7 +285,7 @@ private[remote] abstract class InboundCompression[T >: Null](
|
|||
inboundContext: InboundContext,
|
||||
val heavyHitters: TopHeavyHitters[T]) {
|
||||
|
||||
private[this] var state: InboundCompression.State[T] = InboundCompression.State.empty
|
||||
private[this] var tables: InboundCompression.Tables[T] = InboundCompression.Tables.empty
|
||||
|
||||
// We should not continue sending advertisements to an association that might be dead (not quarantined yet)
|
||||
@volatile private[this] var alive = true
|
||||
|
|
@ -270,44 +309,48 @@ private[remote] abstract class InboundCompression[T >: Null](
|
|||
*/
|
||||
@tailrec final def decompressInternal(incomingTableVersion: Byte, idx: Int, attemptCounter: Int): OptionVal[T] = {
|
||||
// effectively should never loop more than once, to avoid infinite recursion blow up eagerly
|
||||
if (attemptCounter > 2) throw new IllegalStateException(s"Unable to decompress $idx from table $incomingTableVersion. Internal state: ${state}")
|
||||
if (attemptCounter > 2) throw new IllegalStateException(s"Unable to decompress $idx from table $incomingTableVersion. Internal tables: $tables")
|
||||
|
||||
val current = state
|
||||
val oldVersion = current.oldTable.version
|
||||
val current = tables
|
||||
val activeVersion = current.activeTable.version
|
||||
def incomingVersionIsAdvertisementInProgress(incomingTableVersion: Byte): Boolean =
|
||||
current.advertisementInProgress.isDefined &&
|
||||
incomingTableVersion == current.advertisementInProgress.get.version
|
||||
|
||||
if (incomingTableVersion == DecompressionTable.DisabledVersion) OptionVal.None // no compression, bail out early
|
||||
else if (incomingTableVersion == activeVersion) {
|
||||
val value: T = current.activeTable.get(idx)
|
||||
if (value != null) OptionVal.Some[T](value)
|
||||
else throw new UnknownCompressedIdException(idx)
|
||||
} else if (incomingTableVersion == oldVersion) {
|
||||
// must handle one old table due to messages in flight during advertisement
|
||||
val value: T = current.oldTable.get(idx)
|
||||
if (value != null) OptionVal.Some[T](value)
|
||||
else throw new UnknownCompressedIdException(idx)
|
||||
} else if (current.advertisementInProgress.isDefined && incomingTableVersion == current.advertisementInProgress.get.version) {
|
||||
log.debug(
|
||||
"Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})",
|
||||
originUid, current.nextTable.version)
|
||||
confirmAdvertisement(incomingTableVersion)
|
||||
decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse
|
||||
} else {
|
||||
// which means that incoming version was > nextTable.version, which likely that
|
||||
// it is using a table that was built for previous incarnation of this system
|
||||
log.warning(
|
||||
"Inbound message from originUid [{}] is using unknown compression table version. " +
|
||||
"It may have been sent with compression table built for previous incarnation of this system. " +
|
||||
"Versions activeTable: {}, nextTable: {}, incomingTable: {}",
|
||||
originUid, activeVersion, current.nextTable.version, incomingTableVersion)
|
||||
if (incomingTableVersion == DecompressionTable.DisabledVersion) {
|
||||
// no compression, bail out early
|
||||
OptionVal.None
|
||||
} else {
|
||||
current.selectTable(version = incomingTableVersion) match {
|
||||
case OptionVal.Some(selectedTable) ⇒
|
||||
val value: T = selectedTable.get(idx)
|
||||
if (value != null) OptionVal.Some[T](value)
|
||||
else throw new UnknownCompressedIdException(idx)
|
||||
|
||||
case _ if incomingVersionIsAdvertisementInProgress(incomingTableVersion) ⇒
|
||||
log.debug(
|
||||
"Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})",
|
||||
originUid, current.nextTable.version)
|
||||
confirmAdvertisement(incomingTableVersion)
|
||||
decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse
|
||||
|
||||
case _ ⇒
|
||||
// which means that incoming version was > nextTable.version, which likely that
|
||||
// it is using a table that was built for previous incarnation of this system
|
||||
log.warning(
|
||||
"Inbound message from originUid [{}] is using unknown compression table version. " +
|
||||
"It may have been sent with compression table built for previous incarnation of this system. " +
|
||||
"Versions activeTable: {}, nextTable: {}, incomingTable: {}",
|
||||
originUid, activeVersion, current.nextTable.version, incomingTableVersion)
|
||||
OptionVal.None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final def confirmAdvertisement(tableVersion: Byte): Unit = {
|
||||
state.advertisementInProgress match {
|
||||
tables.advertisementInProgress match {
|
||||
case Some(inProgress) if tableVersion == inProgress.version ⇒
|
||||
state = state.startUsingNextTable()
|
||||
tables = tables.startUsingNextTable()
|
||||
log.debug("Confirmed compression table version [{}] for originUid [{}]", tableVersion, originUid)
|
||||
case Some(inProgress) if tableVersion != inProgress.version ⇒
|
||||
log.debug(
|
||||
|
|
@ -347,16 +390,16 @@ private[remote] abstract class InboundCompression[T >: Null](
|
|||
* Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing.
|
||||
*/
|
||||
private[remote] def runNextTableAdvertisement(): Unit = {
|
||||
if (ArterySettings.Compression.Debug) println(s"[compress] runNextTableAdvertisement, state = ${state}")
|
||||
state.advertisementInProgress match {
|
||||
if (ArterySettings.Compression.Debug) println(s"[compress] runNextTableAdvertisement, tables = $tables")
|
||||
tables.advertisementInProgress match {
|
||||
case None ⇒
|
||||
inboundContext.association(originUid) match {
|
||||
case OptionVal.Some(association) ⇒
|
||||
if (alive) {
|
||||
val table = prepareCompressionAdvertisement(state.nextTable.version)
|
||||
val table = prepareCompressionAdvertisement(tables.nextTable.version)
|
||||
// TODO expensive, check if building the other way wouldn't be faster?
|
||||
val nextState = state.copy(nextTable = table.invert, advertisementInProgress = Some(table))
|
||||
state = nextState
|
||||
val nextState = tables.copy(nextTable = table.invert, advertisementInProgress = Some(table))
|
||||
tables = nextState
|
||||
alive = false // will be set to true on first incoming message
|
||||
resendCount = 0
|
||||
advertiseCompressionTable(association, table)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue