From ba1cf38e53343111afb295f7436033c9b30f564f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 29 Sep 2016 13:52:48 +0200 Subject: [PATCH] Realign artery header data (#21581) * Minor cleanup in version calculation * Table versions before tag-fields * Single handling block of unknown compression table version #21580 * Compression table versions use -1 only as special number #21448 * Align to 4 byte boundaries in header --- .../akka/remote/artery/ArterySettings.scala | 2 +- .../scala/akka/remote/artery/BufferPool.scala | 51 +++++++------------ .../artery/compress/CompressionTable.scala | 6 ++- .../artery/compress/DecompressionTable.scala | 10 +++- .../artery/compress/InboundCompressions.scala | 20 +++----- .../remote/artery/EnvelopeBufferSpec.scala | 8 +-- .../compress/CompressionIntegrationSpec.scala | 7 +-- 7 files changed, 45 insertions(+), 59 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index fe94f28c8a..23fee6d16c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -170,7 +170,7 @@ private[akka] object ArterySettings { } object Compression { // Compile time constants - final val Debug = false // unlocks additional very verbose debug logging of compression events (on DEBUG log level) + final val Debug = false // unlocks additional very verbose debug logging of compression events (to stdout) } def getHostname(key: String, config: Config) = config.getString(key) match { diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index cc9aeab3f9..c2dcd094be 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -76,19 +76,19 @@ private[remote] object EnvelopeBuffer { val VersionOffset = 0 // Byte val FlagsOffset = 1 // Byte - // 2 bytes free // TODO re-align values to not have this empty space + val ActorRefCompressionTableVersionOffset = 2 // Byte + val ClassManifestCompressionTableVersionOffset = 3 // Byte + val UidOffset = 4 // Long val SerializerOffset = 12 // Int val SenderActorRefTagOffset = 16 // Int val RecipientActorRefTagOffset = 20 // Int val ClassManifestTagOffset = 24 // Int - val ActorRefCompressionTableVersionTagOffset = 28 // Int // TODO handle roll-over and move to Short - val ClassManifestCompressionTableVersionTagOffset = 32 // Int // TODO handle roll-over and move to Short // EITHER metadata followed by literals directly OR literals directly in this spot. // Mode depends on the `MetadataPresentFlag`. - val MetadataContainerAndLiteralSectionOffset = 36 // Int + val MetadataContainerAndLiteralSectionOffset = 28 // Int val UsAscii = Charset.forName("US-ASCII") @@ -386,29 +386,20 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.clear() // Write fixed length parts - byteBuffer.put(header.version) - byteBuffer.put(header.flags) - // 1 empty byte slot // TODO avoid having these empty slots - // 1 empty byte slot - byteBuffer.position(UidOffset) // skips the above 2 empty slots - byteBuffer.putLong(header.uid) - byteBuffer.putInt(header.serializer) + byteBuffer.put(VersionOffset, header.version) + byteBuffer.put(FlagsOffset, header.flags) + byteBuffer.putLong(UidOffset, header.uid) + byteBuffer.putInt(SerializerOffset, header.serializer) // compression table version numbers - byteBuffer.putInt(ActorRefCompressionTableVersionTagOffset, header.outboundActorRefCompression.version | TagTypeMask) - byteBuffer.putInt(ClassManifestCompressionTableVersionTagOffset, header.outboundClassManifestCompression.version | TagTypeMask) - byteBuffer.putInt(SenderActorRefTagOffset, header._senderActorRefIdx | TagTypeMask) + byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version) + byteBuffer.put(ClassManifestCompressionTableVersionOffset, header.outboundClassManifestCompression.version) + byteBuffer.position(MetadataContainerAndLiteralSectionOffset) if (header.flag(MetadataPresentFlag)) { // tag if we have metadata or not, as the layout next follows different patterns depending on that - byteBuffer.position(MetadataContainerAndLiteralSectionOffset) - header.metadataContainer.copyToBuffer(byteBuffer) // after metadata is written, buffer is at correct position to continue writing literals (they "moved forward") - } else { - // Write compressable, variable-length parts always to the actual position of the buffer - // Write tag values explicitly in their proper offset - byteBuffer.position(MetadataContainerAndLiteralSectionOffset) } // Serialize sender @@ -435,22 +426,14 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { val header = h.asInstanceOf[HeaderBuilderImpl] // Read fixed length parts - header setVersion byteBuffer.get() - header setFlags byteBuffer.get() - byteBuffer.get() // skip 1 byte - byteBuffer.get() // skip 1 byte - header setUid byteBuffer.getLong - header setSerializer byteBuffer.getInt + header.setVersion(byteBuffer.get(VersionOffset)) + header.setFlags(byteBuffer.get(FlagsOffset)) + header.setUid(byteBuffer.getLong(UidOffset)) + header.setSerializer(byteBuffer.getInt(SerializerOffset)) // compression table versions (stored in the Tag) - val refCompressionVersionTag = byteBuffer.getInt(ActorRefCompressionTableVersionTagOffset) - if ((refCompressionVersionTag & TagTypeMask) != 0) { - header._inboundActorRefCompressionTableVersion = (refCompressionVersionTag & TagValueMask).byteValue - } - val manifestCompressionVersionTag = byteBuffer.getInt(ClassManifestCompressionTableVersionTagOffset) - if ((manifestCompressionVersionTag & TagTypeMask) != 0) { - header._inboundClassManifestCompressionTableVersion = (manifestCompressionVersionTag & TagValueMask).byteValue - } + header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset) + header._inboundClassManifestCompressionTableVersion = byteBuffer.get(ClassManifestCompressionTableVersionOffset) if (header.flag(MetadataPresentFlag)) { byteBuffer.position(MetadataContainerAndLiteralSectionOffset) diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala index 06da2785e3..88444bb7f5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala @@ -7,7 +7,11 @@ package akka.remote.artery.compress import java.util import java.util.Comparator -/** INTERNAL API: Versioned compression table to be advertised between systems */ +/** + * INTERNAL API: Versioned compression table to be advertised between systems + * + * @param version Either -1 for disabled or a version between 0 and 127 + */ private[remote] final case class CompressionTable[T](originUid: Long, version: Byte, dictionary: Map[T, Int]) { import CompressionTable.NotCompressedId diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala index dfc6734f35..efb02181f1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala @@ -4,7 +4,11 @@ package akka.remote.artery.compress -/** INTERNAL API */ +/** + * INTERNAL API + * + * @param version Either -1 for disabled or a version between 0 and 127 + */ private[remote] final case class DecompressionTable[T](originUid: Long, version: Byte, table: Array[T]) { private[this] val length = table.length @@ -27,6 +31,10 @@ private[remote] final case class DecompressionTable[T](originUid: Long, version: /** INTERNAL API */ private[remote] object DecompressionTable { + + val DisabledVersion: Byte = -1 + private[this] val _empty = DecompressionTable(0, 0, Array.empty) def empty[T] = _empty.asInstanceOf[DecompressionTable[T]] + def disabled[T] = empty[T].copy(version = DisabledVersion) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 9762fb8834..8e326371de 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -234,7 +234,7 @@ private[remote] object InboundCompression { object State { def empty[T] = State( - oldTable = DecompressionTable.empty[T].copy(version = -1), + oldTable = DecompressionTable.disabled[T], activeTable = DecompressionTable.empty[T], nextTable = DecompressionTable.empty[T].copy(version = 1), advertisementInProgress = None) @@ -247,13 +247,14 @@ private[remote] object InboundCompression { advertisementInProgress: Option[CompressionTable[T]]) { def startUsingNextTable(): State[T] = { - // wrap around to positive values - val nextVersion = (nextTable.version + 1) & 0x7F + def incrementTableVersion(version: Byte): Byte = + if (version == 127) 0 + else (version + 1).toByte + State( oldTable = activeTable, activeTable = nextTable, - // skip 0 when wrapped around - nextTable = DecompressionTable.empty[T].copy(version = (if (nextVersion == 0) 1 else nextVersion).byteValue), + nextTable = DecompressionTable.empty[T].copy(version = incrementTableVersion(nextTable.version)), advertisementInProgress = None) } } @@ -317,7 +318,7 @@ private[remote] abstract class InboundCompression[T >: Null]( val oldVersion = current.oldTable.version val activeVersion = current.activeTable.version - if (incomingTableVersion == -1) OptionVal.None // no compression, bail out early + if (incomingTableVersion == DecompressionTable.DisabledVersion) OptionVal.None // no compression, bail out early else if (incomingTableVersion == activeVersion) { val value: T = current.activeTable.get(idx) if (value != null) OptionVal.Some[T](value) @@ -327,11 +328,6 @@ private[remote] abstract class InboundCompression[T >: Null]( val value: T = current.oldTable.get(idx) if (value != null) OptionVal.Some[T](value) else throw new UnknownCompressedIdException(idx) - } else if (incomingTableVersion < activeVersion) { - log.debug( - "Received value from originUid [{}] compressed with old table: [{}], current table version is: [{}]", - originUid, incomingTableVersion, activeVersion) - OptionVal.None } else if (current.advertisementInProgress.isDefined && incomingTableVersion == current.advertisementInProgress.get.version) { log.debug( "Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})", @@ -343,7 +339,7 @@ private[remote] abstract class InboundCompression[T >: Null]( // it is using a table that was built for previous incarnation of this system log.warning( "Inbound message from originUid [{}] is using unknown compression table version. " + - "It was probably sent with compression table built for previous incarnation of this system. " + + "It may have been sent with compression table built for previous incarnation of this system. " + "Versions activeTable: {}, nextTable: {}, incomingTable: {}", originUid, activeVersion, current.nextTable.version, incomingTableVersion) OptionVal.None diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index b2c2be8b6b..bb1865762a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -33,10 +33,10 @@ class EnvelopeBufferSpec extends AkkaSpec { val idxToManifest = manifestToIdx.map(_.swap) val outboundActorRefTable: CompressionTable[ActorRef] = - CompressionTable(17L, version = 0xCA.byteValue, refToIdx) + CompressionTable(17L, version = 28.toByte, refToIdx) val outboundClassManifestTable: CompressionTable[String] = - CompressionTable(17L, version = 0xBA.byteValue, manifestToIdx) + CompressionTable(17L, version = 35.toByte, manifestToIdx) override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Byte, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx)) @@ -78,8 +78,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) - headerOut.inboundActorRefCompressionTableVersion should ===(0xCA.byteValue) - headerOut.inboundClassManifestCompressionTableVersion should ===(0xBA.byteValue) + headerOut.inboundActorRefCompressionTableVersion should ===(28.toByte) + headerOut.inboundClassManifestCompressionTableVersion should ===(35.toByte) headerOut.serializer should ===(4) headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/compressable0") headerOut.senderActorRefPath should ===(OptionVal.None) diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index 497677317c..655b8f7401 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -281,7 +281,6 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat val maxDuplicateTables = 40 // max duplicate tables that will not fail the test var tableVersionsSeen = 0 var lastTableVersion = 0 - var wrapAroundCount = 0 var iteration = 0 while (tableVersionsSeen < maxTableVersions) { @@ -303,12 +302,8 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat if (currentTableVersion != lastTableVersion) { // if we get a new table lastTableVersion = currentTableVersion tableVersionsSeen += 1 - - if ((tableVersionsSeen & 0x7F) == 0) { - wrapAroundCount += 1 - } } - currentTableVersion should ===((tableVersionsSeen & 0x7F) + wrapAroundCount) + currentTableVersion should ===(tableVersionsSeen & 0x7F) } }