diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index d2600320d8..9cb952efad 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -178,11 +178,15 @@ object ByteString { val copyLength = Math.min(buffer.remaining, offset + length) if (copyLength > 0) { buffer.put(bytes, offset, copyLength) - drop(copyLength) } copyLength } + /** INTERNAL API: Specialized for internal use, appending ByteString1C to a ByteStringBuilder. */ + private[akka] def appendToBuilder(buffer: ByteStringBuilder) = { + buffer.putByteArrayUnsafe(bytes) + } + } /** INTERNAL API: ByteString backed by exactly one array, with start / end markers */ diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index be9f3efd90..0e6927c705 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -140,7 +140,7 @@ class CodecBenchmark { val N = 100000 val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, outboundEnvelopePool, envelopePool)) + Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) @@ -197,7 +197,7 @@ class CodecBenchmark { val N = 100000 val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, outboundEnvelopePool, envelopePool)) + Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool)) val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) val provider = RARP(system).provider diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 9b8c83ce5d..3d6a596822 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -320,6 +320,17 @@ akka { advertisement-interval = 1 minute # TODO find good number as default, for benchmarks trigger immediately } } + + # List of fully qualified class names of remote instruments which should + # be initialized and used for monitoring of remote messages. + # The class must extend akka.remote.artery.RemoteInstrument and + # have a public constructor with empty parameters or one ExtendedActorSystem + # parameter. + # A new instance of RemoteInstrument will be created for each encoder and decoder. + # It's only called from the stage, so if it dosn't delegate to any shared instance + # it doesn't have to be thread-safe. + # Refer to `akka.remote.artery.RemoteInstrument` for more information. + instruments = ${?akka.remote.artery.advanced.instruments} [] } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 9f7368a73c..b4b98c34ca 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -995,7 +995,7 @@ private[remote] object ArteryTransport { val ProtocolName = "akka" - val Version = 0 + val Version: Byte = 0 class AeronTerminated(e: Throwable) extends RuntimeException(e) diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 8871c14eda..1786f527c9 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -13,9 +13,12 @@ import akka.remote.artery.compress.CompressionProtocol._ import akka.remote.artery.compress.{ CompressionTable, InboundCompressions } import akka.serialization.Serialization import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } -import akka.util.{ OptionVal, Unsafe } +import akka.util.{ ByteString, CompactByteString, OptionVal, Unsafe } import akka.remote.artery.compress.NoInboundCompressions +import akka.util.ByteString.ByteString1C + +import scala.annotation.tailrec /** * INTERNAL API @@ -44,6 +47,19 @@ private[remote] class EnvelopeBufferPool(maximumPayload: Int, maximumBuffers: In } +/** INTERNAL API */ +private[remote] final class ByteFlag(val mask: Byte) extends AnyVal { + def isEnabled(byteFlags: Byte): Boolean = (byteFlags.toInt & mask) != 0 + override def toString = s"ByteFlag(${ByteFlag.binaryLeftPad(mask)})" +} +object ByteFlag { + def binaryLeftPad(byte: Byte): String = { + val string = Integer.toBinaryString(byte) + val pad = "0" * (8 - string.length) // leftPad + pad + string + } +} + /** * INTERNAL API */ @@ -52,17 +68,24 @@ private[remote] object EnvelopeBuffer { val TagTypeMask = 0xFF000000 val TagValueMask = 0x0000FFFF - val VersionOffset = 0 // Int + // Flags (1 byte allocated for them) + val MetadataPresentFlag = new ByteFlag(0x1) + + val VersionOffset = 0 // Byte + val FlagsOffset = 1 // Byte + // 2 bytes free // TODO re-align values to not have this empty space val UidOffset = 4 // Long val SerializerOffset = 12 // Int val SenderActorRefTagOffset = 16 // Int val RecipientActorRefTagOffset = 20 // Int val ClassManifestTagOffset = 24 // Int - val ActorRefCompressionTableVersionTagOffset = 28 // Int - val ClassManifestCompressionTableVersionTagOffset = 32 // Int + val ActorRefCompressionTableVersionTagOffset = 28 // Int // TODO handle roll-over and move to Short + val ClassManifestCompressionTableVersionTagOffset = 32 // Int // TODO handle roll-over and move to Short - val LiteralsSectionOffset = 36 + // EITHER metadata followed by literals directly OR literals directly in this spot. + // Mode depends on the `MetadataPresentFlag`. + val MetadataContainerAndLiteralSectionOffset = 36 // Int val UsAscii = Charset.forName("US-ASCII") @@ -89,8 +112,13 @@ private[remote] object HeaderBuilder { * INTERNAL API */ private[remote] sealed trait HeaderBuilder { - def setVersion(v: Int): Unit - def version: Int + def setVersion(v: Byte): Unit + def version: Byte + + def setFlags(v: Byte): Unit + def flags: Byte + def flag(byteFlag: ByteFlag): Boolean + def setFlag(byteFlag: ByteFlag, value: Boolean): Unit def inboundActorRefCompressionTableVersion: Int def inboundClassManifestCompressionTableVersion: Int @@ -104,6 +132,9 @@ private[remote] sealed trait HeaderBuilder { def setUid(u: Long): Unit def uid: Long + /** Metadata SPI, internally multiple metadata sections can be represented. */ + def metadataContainer: ByteString + def setSenderActorRef(ref: ActorRef): Unit /** * Retrive the compressed ActorRef by the compressionId carried by this header. @@ -134,6 +165,9 @@ private[remote] sealed trait HeaderBuilder { */ def recipientActorRefPath: OptionVal[String] + def setMetadataContainer(container: ByteString): Unit + def clearMetadataContainer(): Unit + def setSerializer(serializer: Int): Unit def serializer: Int @@ -167,7 +201,8 @@ private[remote] final class HeaderBuilderImpl( private[this] val toSerializationFormat: SerializationFormatCache = new SerializationFormatCache // Fields only available for EnvelopeBuffer - var _version: Int = _ + var _version: Byte = _ + var _flags: Byte = _ var _uid: Long = _ var _inboundActorRefCompressionTableVersion: Int = 0 var _inboundClassManifestCompressionTableVersion: Int = 0 @@ -181,9 +216,18 @@ private[remote] final class HeaderBuilderImpl( var _manifest: String = null var _manifestIdx: Int = -1 - override def setVersion(v: Int) = _version = v + var _metadataContainer: ByteString = null + + override def setVersion(v: Byte) = _version = v override def version = _version + override def setFlags(v: Byte) = _flags = v + override def flags = _flags + override def flag(byteFlag: ByteFlag): Boolean = (_flags.toInt & byteFlag.mask) != 0 + override def setFlag(byteFlag: ByteFlag, value: Boolean): Unit = + if (value) _flags = (flags | byteFlag.mask).toByte + else _flags = (flags & ~byteFlag.mask).toByte + override def setUid(uid: Long) = _uid = uid override def uid: Long = _uid @@ -257,9 +301,23 @@ private[remote] final class HeaderBuilderImpl( } } + /** Make sure to prefix the data with an Int-length */ + def setMetadataContainer(container: ByteString): Unit = { + setFlag(EnvelopeBuffer.MetadataPresentFlag, value = container != null) + _metadataContainer = container + } + /** Rendered metadata already contains int-length prefix, no need to add it manually */ + def metadataContainer: ByteString = + _metadataContainer + def clearMetadataContainer(): Unit = { + setFlag(EnvelopeBuffer.MetadataPresentFlag, value = false) + _metadataContainer = null + } + override def toString = "HeaderBuilderImpl(" + "version:" + version + ", " + + "flags:" + ByteFlag.binaryLeftPad(flags) + ", " + "uid:" + uid + ", " + "_senderActorRef:" + _senderActorRef + ", " + "_senderActorRefIdx:" + _senderActorRefIdx + ", " + @@ -267,7 +325,8 @@ private[remote] final class HeaderBuilderImpl( "_recipientActorRefIdx:" + _recipientActorRefIdx + ", " + "_serializer:" + _serializer + ", " + "_manifest:" + _manifest + ", " + - "_manifestIdx:" + _manifestIdx + ")" + "_manifestIdx:" + _manifestIdx + ", " + + "_metadataContainer:" + _metadataContainer + ")" } @@ -286,17 +345,30 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.clear() // Write fixed length parts - byteBuffer.putInt(header.version) + byteBuffer.put(header.version) + byteBuffer.put(header.flags) + // 1 empty byte slot // TODO avoid having these empty slots + // 1 empty byte slot + byteBuffer.position(UidOffset) // skips the above 2 empty slots byteBuffer.putLong(header.uid) byteBuffer.putInt(header.serializer) // compression table version numbers byteBuffer.putInt(ActorRefCompressionTableVersionTagOffset, header.outboundActorRefCompression.version | TagTypeMask) byteBuffer.putInt(ClassManifestCompressionTableVersionTagOffset, header.outboundClassManifestCompression.version | TagTypeMask) + byteBuffer.putInt(SenderActorRefTagOffset, header._senderActorRefIdx | TagTypeMask) - // Write compressable, variable-length parts always to the actual position of the buffer - // Write tag values explicitly in their proper offset - byteBuffer.position(LiteralsSectionOffset) + if (header.flag(MetadataPresentFlag)) { + // tag if we have metadata or not, as the layout next follows different patterns depending on that + byteBuffer.position(MetadataContainerAndLiteralSectionOffset) + + header.metadataContainer.copyToBuffer(byteBuffer) + // after metadata is written, buffer is at correct position to continue writing literals (they "moved forward") + } else { + // Write compressable, variable-length parts always to the actual position of the buffer + // Write tag values explicitly in their proper offset + byteBuffer.position(MetadataContainerAndLiteralSectionOffset) + } // Serialize sender if (header._senderActorRefIdx != -1) @@ -315,13 +387,17 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.putInt(ClassManifestTagOffset, header._manifestIdx | TagTypeMask) else writeLiteral(ClassManifestTagOffset, header._manifest) + } def parseHeader(h: HeaderBuilder): Unit = { val header = h.asInstanceOf[HeaderBuilderImpl] // Read fixed length parts - header setVersion byteBuffer.getInt + header setVersion byteBuffer.get() + header setFlags byteBuffer.get() + byteBuffer.get() // skip 1 byte + byteBuffer.get() // skip 1 byte header setUid byteBuffer.getLong header setSerializer byteBuffer.getInt @@ -335,9 +411,21 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { header._inboundClassManifestCompressionTableVersion = manifestCompressionVersionTag & TagValueMask } - // Read compressable, variable-length parts always from the actual position of the buffer - // Read tag values explicitly from their proper offset - byteBuffer.position(LiteralsSectionOffset) + if (header.flag(MetadataPresentFlag)) { + byteBuffer.position(MetadataContainerAndLiteralSectionOffset) + val totalMetadataLength = byteBuffer.getInt() + + ensureLiteralCharsLength(totalMetadataLength) + val bytes = literalBytes + + byteBuffer.get(bytes, 0, totalMetadataLength) + header._metadataContainer = ByteString(bytes).take(totalMetadataLength) + // the literals section starts here, right after the metadata has ended + // thus, no need to move position the buffer again + } else { + // No metadata present, we position the buffer on the place where literals start + byteBuffer.position(MetadataContainerAndLiteralSectionOffset) + } // Deserialize sender val senderTag = byteBuffer.getInt(SenderActorRefTagOffset) @@ -372,8 +460,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { private def readLiteral(): String = { val length = byteBuffer.getShort - if (length == 0) - "" + if (length == 0) "" else { ensureLiteralCharsLength(length) val chars = literalChars @@ -399,7 +486,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { if (length == 0) { byteBuffer.putShort(0) } else { - byteBuffer.putShort(literal.length.toShort) + byteBuffer.putShort(length.toShort) ensureLiteralCharsLength(length) val bytes = literalBytes val chars = Unsafe.instance.getObject(literal, StringValueFieldOffset).asInstanceOf[Array[Char]] diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index e2ee25c58e..2ef13b9b3f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -23,7 +23,8 @@ import akka.Done import akka.stream.stage.GraphStageWithMaterializedValue import scala.concurrent.Promise -import java.util.concurrent.atomic.AtomicInteger + +import scala.annotation.switch /** * INTERNAL API @@ -45,7 +46,7 @@ private[remote] object Encoder { */ private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, - system: ActorSystem, + system: ExtendedActorSystem, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], bufferPool: EnvelopeBufferPool) extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.ChangeOutboundCompression] { @@ -65,6 +66,10 @@ private[remote] class Encoder( private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) + private val instruments: Vector[RemoteInstrument] = RemoteInstruments.create(system) + // by being backed by an Array, this allows us to not allocate any wrapper type for the metadata (since we need its ID) + private val serializedMetadatas: MetadataMap[ByteString] = MetadataMap() // TODO: possibly can be optimised a more for the specific access pattern (during write) + private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] { case (table, done) ⇒ headerBuilder.setOutboundActorRefCompression(table) @@ -106,6 +111,7 @@ private[remote] class Encoder( case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s } + applyAndRenderRemoteMessageSentMetadata(instruments, outboundEnvelope, headerBuilder) MessageSerializer.serializeForArtery(serialization, outboundEnvelope.message, headerBuilder, envelope) } finally Serialization.currentTransportInformation.value = oldValue @@ -131,14 +137,43 @@ private[remote] class Encoder( } finally { outboundEnvelope match { case r: ReusableOutboundEnvelope ⇒ outboundEnvelopePool.release(r) - case _ ⇒ + case _ ⇒ // no need to release it } } - } override def onPull(): Unit = pull(in) + /** + * Renders metadata into `headerBuilder`. + * + * Replace all AnyRef's that were passed along with the [[OutboundEnvelope]] into their [[ByteString]] representations, + * by calling `remoteMessageSent` of each enabled instrumentation. If `context` was attached in the envelope it is passed + * into the instrument, otherwise it receives an OptionVal.None as context, and may still decide to attach rendered + * metadata by returning it. + */ + private def applyAndRenderRemoteMessageSentMetadata(instruments: Vector[RemoteInstrument], envelope: OutboundEnvelope, headerBuilder: HeaderBuilder): Unit = { + if (instruments.nonEmpty) { + val n = instruments.length + + var i = 0 + while (i < n) { + val instrument = instruments(i) + val instrumentId = instrument.identifier + + val metadata = instrument.remoteMessageSent(envelope.recipient.orNull, envelope.message, envelope.sender.orNull) + if (metadata ne null) serializedMetadatas.set(instrumentId, metadata) + + i += 1 + } + } + + if (serializedMetadatas.nonEmpty) { + MetadataEnvelopeSerializer.serialize(serializedMetadatas, headerBuilder) + serializedMetadatas.clear() + } + } + /** * External call from ChangeOutboundCompression materialized value */ @@ -319,6 +354,7 @@ private[remote] class Decoder( originUid, headerBuilder.serializer, classManifest, + headerBuilder.flags, envelope, association) @@ -421,6 +457,7 @@ private[remote] class Deserializer( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + private val instruments: Vector[RemoteInstrument] = RemoteInstruments.create(system) private val serialization = SerializationExtension(system) override protected def logSource = classOf[Deserializer] @@ -432,7 +469,11 @@ private[remote] class Deserializer( val deserializedMessage = MessageSerializer.deserializeForArtery( system, envelope.originUid, serialization, envelope.serializer, envelope.classManifest, envelope.envelopeBuffer) - push(out, envelope.withMessage(deserializedMessage)) + val envelopeWithMessage = envelope.withMessage(deserializedMessage) + + applyIncomingInstruments(envelopeWithMessage) + + push(out, envelopeWithMessage) } catch { case NonFatal(e) ⇒ log.warning( @@ -448,6 +489,22 @@ private[remote] class Deserializer( override def onPull(): Unit = pull(in) + private def applyIncomingInstruments(envelope: InboundEnvelope): Unit = { + if (envelope.flag(EnvelopeBuffer.MetadataPresentFlag)) { + val length = instruments.length + if (length == 0) { + // TODO do we need to parse, or can we do a fast forward if debug logging is not enabled? + val metaMetadataEnvelope = MetadataMapParsing.parse(envelope) + if (log.isDebugEnabled) + log.debug("Incoming message envelope contains metadata for instruments: {}, " + + "however no RemoteInstrument was registered in local system!", metaMetadataEnvelope.metadataMap.keysWithValues.mkString("[", ",", "]")) + } else { + // we avoid emitting a MetadataMap and instead directly apply the instruments onto the received metadata + MetadataMapParsing.applyAllRemoteMessageReceived(instruments, envelope) + } + } + } + setHandlers(in, out, this) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala index bd15681ef4..c017d73107 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala @@ -23,7 +23,7 @@ private[remote] object InboundEnvelope { originUid: Long, association: OptionVal[OutboundContext]): InboundEnvelope = { val env = new ReusableInboundEnvelope - env.init(recipient, recipientAddress, sender, originUid, -1, "", null, association) + env.init(recipient, recipientAddress, sender, originUid, -1, "", 0, null, association) .withMessage(message) } @@ -44,6 +44,9 @@ private[remote] trait InboundEnvelope { def message: AnyRef def envelopeBuffer: EnvelopeBuffer + def flags: Byte + def flag(byteFlag: ByteFlag): Boolean + def withMessage(message: AnyRef): InboundEnvelope def releaseEnvelopeBuffer(): InboundEnvelope @@ -71,6 +74,7 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { private var _association: OptionVal[OutboundContext] = OptionVal.None private var _serializer: Int = -1 private var _classManifest: String = null + private var _flags: Byte = 0 private var _message: AnyRef = null private var _envelopeBuffer: EnvelopeBuffer = null @@ -84,6 +88,9 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { override def message: AnyRef = _message override def envelopeBuffer: EnvelopeBuffer = _envelopeBuffer + override def flags: Byte = _flags + override def flag(byteFlag: ByteFlag): Boolean = byteFlag.isEnabled(_flags) + override def withMessage(message: AnyRef): InboundEnvelope = { _message = message this @@ -115,6 +122,7 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { originUid: Long, serializer: Int, classManifest: String, + flags: Byte, envelopeBuffer: EnvelopeBuffer, association: OptionVal[OutboundContext]): InboundEnvelope = { _recipient = recipient @@ -123,6 +131,7 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { _originUid = originUid _serializer = serializer _classManifest = classManifest + _flags = flags _envelopeBuffer = envelopeBuffer _association = association this diff --git a/akka-remote/src/main/scala/akka/remote/artery/MetadataEnvelopeSerializer.scala b/akka-remote/src/main/scala/akka/remote/artery/MetadataEnvelopeSerializer.scala new file mode 100644 index 0000000000..4e2be61121 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/MetadataEnvelopeSerializer.scala @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery + +import java.nio.{ ByteBuffer, ByteOrder } + +import akka.actor.ExtendedActorSystem +import akka.serialization.Serialization +import akka.util.ByteString.ByteString1C +import akka.util.{ ByteString, ByteStringBuilder } + +/** + * INTERNAL API + */ +private[akka] object MetadataEnvelopeSerializer { + + private[akka] val EmptyRendered = { + implicit val _ByteOrder = ByteOrder.LITTLE_ENDIAN + + val bsb = new ByteStringBuilder + bsb.putInt(0) // 0 length + bsb.result + } + + // key/length of a metadata element are encoded within a single integer: + // supports keys in the range of <0-31> + final val EntryKeyMask = Integer.parseInt("1111 1000 0000 0000 0000 0000 0000 000".replace(" ", ""), 2) + def maskEntryKey(k: Byte): Int = (k.toInt << 26) & EntryKeyMask + def unmaskEntryKey(kv: Int): Byte = ((kv & EntryKeyMask) >> 26).toByte + + final val EntryLengthMask = ~EntryKeyMask + + def maskEntryLength(k: Int): Int = k & EntryLengthMask + def unmaskEntryLength(kv: Int): Int = kv & EntryLengthMask + + def muxEntryKeyLength(k: Byte, l: Int): Int = { + maskEntryKey(k) | maskEntryLength(l) + } + + def serialize(metadatas: MetadataMap[ByteString], headerBuilder: HeaderBuilder): Unit = { + if (metadatas.isEmpty) headerBuilder.clearMetadataContainer() + else { + val container = new MetadataMapRendering(metadatas) + headerBuilder.setMetadataContainer(container.render()) + } + } + + def deserialize(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, + serializer: Int, classManifest: String, envelope: EnvelopeBuffer): AnyRef = { + serialization.deserializeByteBuffer(envelope.byteBuffer, serializer, classManifest) + } +} + +/** + * INTERNAL API + * + * The metadata section is stored as ByteString (prefixed with Int length field, + * the same way as any other literal), however the internal structure of it is as follows: + * + * {{{ + * Metadata entry: + * + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Key | Metadata entry length | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | ... metadata entry ... | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * }}} + * + */ +private[akka] final class MetadataMapRendering(val metadataMap: MetadataMap[ByteString]) extends AnyVal { + import MetadataEnvelopeSerializer._ + + def render(): ByteString = + if (metadataMap.isEmpty) { + // usually no-one will want to render an empty metadata section - it should not be there at all + EmptyRendered + } else { + implicit val _ByteOrder = ByteOrder.LITTLE_ENDIAN + + // TODO optimise this, we could just count along the way and then prefix with the length + val totalSize = 4 /* length int field */ + metadataMap.usedSlots * 4 /* metadata length */ + metadataMap.foldLeftValues(0)(_ + _.length) + val b = new ByteStringBuilder // TODO could we reuse one? + b.sizeHint(totalSize) + + b.putInt(totalSize - 4 /* don't count itself, the length prefix */ ) + // TODO: move through and then prepend length + metadataMap.foreach { (key: Byte, value: ByteString) ⇒ + // TODO try to remove allocation? Iterator otherwise, but that's also allocation + val kl = muxEntryKeyLength(key, value.length) + b.putInt(kl) + value match { + case c: ByteString1C ⇒ c.appendToBuilder(b) // uses putByteArrayUnsafe + case _ ⇒ b ++= value + } + } + b.result() + } +} + +/** INTERNAL API */ +private[akka] object MetadataMapParsing { + import MetadataEnvelopeSerializer._ + + /** Allocates an MetadataMap */ + def parse(envelope: InboundEnvelope): MetadataMapRendering = { + val buf = envelope.envelopeBuffer.byteBuffer + buf.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset) + parseRaw(buf) + } + + /** + * INTERNAL API, only for testing + * The buffer MUST be already at the right position where the Metadata container starts. + */ + private[akka] def parseRaw(buf: ByteBuffer) = { + buf.order(ByteOrder.LITTLE_ENDIAN) + val metadataContainerLength = buf.getInt() + val endOfMetadataPos = metadataContainerLength + buf.position() + val map = MetadataMap[ByteString]() + + while (buf.position() < endOfMetadataPos) { + val kl = buf.getInt() + val k = unmaskEntryKey(kl) // k + val l = unmaskEntryLength(kl) // l + + val arr = Array.ofDim[Byte](l) + buf.get(arr) + val metadata = ByteString1C(arr) // avoids copying again + map.set(k, metadata) + } + + new MetadataMapRendering(map) + } + + /** Implemented in a way to avoid allocations of any envelopes or arrays */ + def applyAllRemoteMessageReceived(instruments: Vector[RemoteInstrument], envelope: InboundEnvelope): Unit = { + val buf = envelope.envelopeBuffer.byteBuffer + buf.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset) + applyAllRemoteMessageReceivedRaw(instruments, envelope, buf) + } + + /** + * INTERNAL API, only for testing + * The buffer MUST be already at the right position where the Metadata container starts. + */ + private[akka] def applyAllRemoteMessageReceivedRaw(instruments: Vector[RemoteInstrument], envelope: InboundEnvelope, buf: ByteBuffer): Unit = { + buf.order(ByteOrder.LITTLE_ENDIAN) + + val metadataContainerLength = buf.getInt() + val endOfMetadataPos = metadataContainerLength + buf.position() + + while (buf.position() < endOfMetadataPos) { + val keyAndLength = buf.getInt() + val key = unmaskEntryKey(keyAndLength) + val length = unmaskEntryLength(keyAndLength) + + val arr = Array.ofDim[Byte](length) // TODO can be optimised to re-cycle this array instead + buf.get(arr) + val data = ByteString(arr) // bytes + + instruments.find(_.identifier == key) match { + case Some(instr) ⇒ instr.remoteMessageReceived(envelope.recipient.orNull, envelope.message, envelope.sender.orNull, data) + case _ ⇒ throw new Exception(s"No RemoteInstrument for id $key available!") + + } + } + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala index 6a1e9ba84c..3625f612dd 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala @@ -3,11 +3,9 @@ */ package akka.remote.artery -import akka.actor.InternalActorRef -import akka.util.OptionVal -import akka.actor.Address import akka.actor.ActorRef import akka.remote.RemoteActorRef +import akka.util.OptionVal /** * INTERNAL API diff --git a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala new file mode 100644 index 0000000000..e9a8764e35 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery + +import akka.actor.{ ActorRef, ExtendedActorSystem } +import akka.util.{ ByteString, OptionVal } + +/** + * INTERNAL API + * + * Part of the monitoring SPI which allows attaching metadata to outbound remote messages, + * and reading in metadata from incoming messages. + * + * Multiple instruments are automatically handled, however they MUST NOT overlap in their idenfitiers. + * + * Instances of `RemoteInstrument` are created from configuration. A new instance of RemoteInstrument + * will be created for each encoder and decoder. It's only called from the stage, so if it dosn't + * delegate to any shared instance it doesn't have to be thread-safe. + */ +abstract class RemoteInstrument { + /** + * Instrument identifier. + * + * MUST be >=0 and <32. + * + * Values between 0 and 7 are reserved for Akka internal use. + */ + def identifier: Byte + + /** + * Called right before putting the message onto the wire. + * Parameters MAY be `null` (except `message`)! + * + * @return `metadata` rendered to be serialized into the remove envelope, or `null` if no metadata should be attached + */ + def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString + + /** + * Called once a message (containing a metadata field designated for this instrument) has been deserialized from the wire. + */ + def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit + +} + +object NoopRemoteInstrument extends RemoteInstrument { + override def identifier: Byte = + -1 + + override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = + null + + override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = + () +} + +/** INTERNAL API */ +private[remote] object RemoteInstruments { + def create(system: ExtendedActorSystem): Vector[RemoteInstrument] = { + val c = system.settings.config + val path = "akka.remote.artery.advanced.instruments" + import scala.collection.JavaConverters._ + c.getStringList(path).asScala.map { fqcn ⇒ + system + .dynamicAccess.createInstanceFor[RemoteInstrument](fqcn, Nil) + .orElse(system.dynamicAccess.createInstanceFor[RemoteInstrument](fqcn, List(classOf[ExtendedActorSystem] → system))) + .get + }(collection.breakOut) + } +} + +/** + * INTERNAL API + * + * This datastructure is specialized for addressing directly into a known IDs slot. + * It is used when deserializing/serializing metadata and we know the ID we want to reach into. + * + * Mutable & NOT thread-safe. + * + * Fixed-size: 32-slots array-backed Map-like structure. + * Lazy: The backing array is allocated lazily only when needed, thus we pay no cost for the metadata array if + * the system is not using metadata. + * Life-cycle: Owned and linked to the lifecycle of an [[OutboundEnvelope]]. + * Re-cycled: Aimed to be re-cycled to produce the least possible GC-churn, by calling `clear()` when done with it. + * + * Reserved keys: The keys 0–7 are reserved for Akka internal purposes and future extensions. + */ +private[remote] final class MetadataMap[T >: Null] { + val capacity = 32 + + protected var backing: Array[T] = null // TODO re-think if a plain LinkedList wouldn't be fine here? + + private var _usedSlots = 0 + + def usedSlots = _usedSlots + + def apply(i: Int): OptionVal[T] = + if (backing == null) OptionVal.None + else OptionVal[T](backing(i)) + + def isEmpty = usedSlots == 0 + + def nonEmpty = !isEmpty + + def hasValueFor(i: Int) = nonEmpty && backing(i) != null + + // FIXME too specialized... + def foldLeftValues[A](zero: A)(f: (A, T) ⇒ A): A = { + var acc: A = zero + var hit = 0 + var i = 0 + while (i < capacity && hit < _usedSlots) { + val it = backing(i) + if (it != null) { + acc = f(acc, it) + hit += 1 + } + i += 1 + } + acc + } + + /** Heavy operation, only used for error logging */ + def keysWithValues: List[Int] = backing.zipWithIndex.filter({ case (t, id) ⇒ t != null }).map(_._2).toList + + def foreach(f: (Byte, T) ⇒ Unit) = { + var i = 0 + var hit = 0 + while (i < capacity && hit < _usedSlots) { + val t = backing(i) + if (t != null) { + f(i.toByte, t) + hit += 1 + } + i += 1 + } + } + + private def allocateIfNeeded(): Unit = + if (backing == null) backing = Array.ofDim[Object](capacity).asInstanceOf[Array[T]] + + /** + * Set a value at given index. + * Setting a null value removes the entry (the slot is marked as not used). + */ + def set(i: Int, t: T): Unit = + if (t == null) { + if (backing == null) () + else { + if (backing(i) != null) _usedSlots -= 1 // we're clearing a spot + backing(i) = null.asInstanceOf[T] + } + } else { + allocateIfNeeded() + + if (backing(i) == null) { + // was empty previously + _usedSlots += 1 + } else { + // replacing previous value, usedSlots remains unchanged + } + + backing(i) = t + } + + /** + * If the backing array was already allocated clears it, if it wasn't does nothing (no-op). + * This is so in order to pay no-cost when not using metadata - clearing then is instant. + */ + def clear() = + if (isEmpty) () + else { + var i = 0 + while (i < capacity) { + backing(i) = null.asInstanceOf[T] + i += 1 + } + _usedSlots = 0 + } + + override def toString() = + if (backing == null) s"MetadataMap()" + else s"MetadataMap(${backing.toList.mkString("[", ",", "]")})" +} + +object MetadataMap { + def apply[T >: Null]() = new MetadataMap[T] +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala index 922672bf6b..e3f093e0f6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala @@ -3,16 +3,16 @@ */ package akka.remote.artery -import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} +import java.util.concurrent.TimeUnit.{ MICROSECONDS, MILLISECONDS } import akka.Done import akka.actor.ExtendedActorSystem -import akka.dispatch.{AbstractNodeQueue, MonitorableThreadFactory} +import akka.dispatch.{ AbstractNodeQueue, MonitorableThreadFactory } import akka.event.Logging -import org.agrona.concurrent.{BackoffIdleStrategy, BusySpinIdleStrategy, IdleStrategy, SleepingIdleStrategy} +import org.agrona.concurrent.{ BackoffIdleStrategy, BusySpinIdleStrategy, IdleStrategy, SleepingIdleStrategy } import scala.annotation.tailrec -import scala.concurrent.{Future, Promise} +import scala.concurrent.{ Future, Promise } import scala.reflect.ClassTag import scala.util.control.NonFatal diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index 1bebe48629..c6f85a2e98 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -69,7 +69,7 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn setManifest "manifest1" envelope.writeHeader(headerIn) - envelope.byteBuffer.position() should ===(EnvelopeBuffer.LiteralsSectionOffset) // Fully compressed header + envelope.byteBuffer.position() should ===(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset) // Fully compressed header envelope.byteBuffer.flip() envelope.parseHeader(headerOut) @@ -98,7 +98,7 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn setManifest "uncompressable3333" val expectedHeaderLength = - EnvelopeBuffer.LiteralsSectionOffset + // Constant header part + EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset + // Constant header part 2 + lengthOfSerializedActorRefPath(senderRef) + // Length field + literal 2 + lengthOfSerializedActorRefPath(recipientRef) + // Length field + literal 2 + "uncompressable3333".length // Length field + literal @@ -131,7 +131,7 @@ class EnvelopeBufferSpec extends AkkaSpec { envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===( - EnvelopeBuffer.LiteralsSectionOffset + + EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset + 2 + lengthOfSerializedActorRefPath(recipientRef)) envelope.byteBuffer.flip() @@ -157,7 +157,7 @@ class EnvelopeBufferSpec extends AkkaSpec { envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===( - EnvelopeBuffer.LiteralsSectionOffset + + EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset + 2 + lengthOfSerializedActorRefPath(senderRef) + 2 + "longlonglongliteralmanifest".length) diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetaMetadataSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetaMetadataSerializerSpec.scala new file mode 100644 index 0000000000..2652282994 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/MetaMetadataSerializerSpec.scala @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import org.scalacheck.{ Arbitrary, Gen } +import org.scalatest.prop.Checkers +import org.scalatest.{ Matchers, WordSpec } + +class MetaMetadataSerializerSpec extends WordSpec with Matchers with Checkers { + + case class KeyLen(k: Key, l: Len) { + override def toString = s" key = ${k}, len = ${l}" + } + type Key = Byte + type Len = Int + + implicit val arbitraryKeyLength: Arbitrary[KeyLen] = Arbitrary { + for { + key ← Gen.chooseNum(0.toByte, 31.toByte) + len ← Gen.chooseNum(1, 1024) + } yield KeyLen(key, len) + } + + "MetaMetadataSerializer" must { + + "perform roundtrip masking/unmasking of entry key+length" in { + val key: Byte = 17 + val len = 812 + val kl = MetadataEnvelopeSerializer.muxEntryKeyLength(key, len) + + val key2 = MetadataEnvelopeSerializer.unmaskEntryKey(kl) + key2 should ===(key) + val len2 = MetadataEnvelopeSerializer.unmaskEntryLength(kl) + len2 should ===(len) + } + + "perform key roundtrip using mask/unmask" in { + check { (kl: KeyLen) ⇒ + val k = kl.k + + val masked = MetadataEnvelopeSerializer.maskEntryKey(k) + val uk = MetadataEnvelopeSerializer.unmaskEntryKey(masked) + uk should ===(k) + uk == k + } + } + "perform length roundtrip using mask/unmask" in { + check { (kl: KeyLen) ⇒ + val l = kl.l + + val masked = MetadataEnvelopeSerializer.maskEntryLength(l) + val ul = MetadataEnvelopeSerializer.unmaskEntryLength(masked) + ul should ===(l) + ul == l + } + } + "perform muxed roundtrip using mask/unmask" in { + check { (kl: KeyLen) ⇒ + val k = kl.k + val l = kl.l + + val masked = MetadataEnvelopeSerializer.muxEntryKeyLength(k, l) + val uk = MetadataEnvelopeSerializer.unmaskEntryKey(masked) + uk should ===(k) + val ul = MetadataEnvelopeSerializer.unmaskEntryLength(masked) + ul should ===(l) + ul == l && uk == k + } + } + + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala new file mode 100644 index 0000000000..7d953213d8 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.atomic.AtomicReference + +import scala.concurrent.duration._ + +import akka.actor.ActorRef +import akka.actor.ActorSelectionMessage +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.remote.artery.MetadataCarryingSpy.{ RemoteMessageReceived, RemoteMessageSent } +import akka.testkit.ImplicitSender +import akka.testkit.SocketUtil._ +import akka.testkit.TestActors +import akka.testkit.TestProbe +import akka.util.ByteString + +object MetadataCarryingSpy extends ExtensionId[MetadataCarryingSpy] with ExtensionIdProvider { + override def get(system: ActorSystem): MetadataCarryingSpy = super.get(system) + override def lookup = MetadataCarryingSpy + override def createExtension(system: ExtendedActorSystem): MetadataCarryingSpy = new MetadataCarryingSpy + + final case class RemoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef) + final case class RemoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString) +} + +class MetadataCarryingSpy extends Extension { + def ref: Option[ActorRef] = Option(_ref.get()) + def setProbe(bs: ActorRef): Unit = _ref.set(bs) + private[this] val _ref = new AtomicReference[ActorRef]() +} + +class TestInstrument(system: ExtendedActorSystem) extends RemoteInstrument { + + override val identifier: Byte = 1 + + override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = + message match { + case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) ⇒ + val metadata = ByteString("!!!") + MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageSent(recipient, message, sender)) + metadata // this data will be attached to the remote message + case _ ⇒ + null + } + + override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = + message match { + case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) ⇒ + MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageReceived(recipient, message, sender, metadata)) + case _ ⇒ + } +} + +object MetadataCarryingSpec { + final case class Ping(payload: ByteString = ByteString.empty) +} + +class MetadataCarryingSpec extends ArteryMultiNodeSpec( + """ + akka { + remote.artery.advanced { + instruments = [ "akka.remote.artery.TestInstrument" ] + } + } + """) with ImplicitSender { + + import MetadataCarryingSpec._ + + "Metadata" should { + + "be included in remote messages" in { + val systemA = localSystem + val systemB = newRemoteSystem(name = Some("systemB")) + + val instrumentProbeA = TestProbe()(systemA) + MetadataCarryingSpy(systemA).setProbe(instrumentProbeA.ref) + val instrumentProbeB = TestProbe()(systemB) + MetadataCarryingSpy(systemB).setProbe(instrumentProbeB.ref) + + systemB.actorOf(TestActors.echoActorProps, "reply") + systemA.actorSelection(rootActorPath(systemB) / "user" / "reply") ! Ping() + expectMsgType[Ping] + + val sentA = instrumentProbeA.expectMsgType[RemoteMessageSent] + val recvdB = instrumentProbeB.expectMsgType[RemoteMessageReceived] + recvdB.metadata should ===(ByteString("!!!")) + + // for the reply + val sentB = instrumentProbeB.expectMsgType[RemoteMessageSent] + val recvdA = instrumentProbeA.expectMsgType[RemoteMessageReceived] + recvdA.metadata should ===(ByteString("!!!")) + } + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetadataContainerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetadataContainerSpec.scala new file mode 100644 index 0000000000..a1a4907100 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/MetadataContainerSpec.scala @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.ActorRef +import akka.testkit.{ AkkaSpec, TestProbe } +import akka.util.ByteString +import scala.concurrent.duration._ + +class MetadataContainerSpec extends AkkaSpec { + + "MetadataContainer" should { + "parse, given empty map" in { + val map = new MetadataMap[ByteString] + val container = new MetadataMapRendering(map) + + val rendered = container.render() + val back = MetadataMapParsing.parseRaw(rendered.asByteBuffer) + + map.toString() should ===(back.metadataMap.toString()) + } + "parse, given 1 allocated in map" in { + val map = new MetadataMap[ByteString] + val container = new MetadataMapRendering(map) + map.set(1, ByteString("!!!")) + + val rendered = container.render() + val back = MetadataMapParsing.parseRaw(rendered.asByteBuffer) + + map.toString() should ===(back.metadataMap.toString()) + } + + "apply, given 3 allocated in map" in { + val map = new MetadataMap[ByteString] + val container = new MetadataMapRendering(map) + map.set(1, ByteString("!!!")) + map.set(10, ByteString("??????")) + map.set(31, ByteString(".........")) + + val p = TestProbe() + + def testInstrument(id: Int): RemoteInstrument = { + new RemoteInstrument { + override def identifier: Byte = id.toByte + override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = ??? + override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = + p.ref ! s"${identifier}-${metadata.utf8String}" + } + } + val instruments = Vector( + testInstrument(1), testInstrument(31), testInstrument(10) + ) + + val rendered = container.render() + + val mockEnvelope = new ReusableInboundEnvelope + MetadataMapParsing.applyAllRemoteMessageReceivedRaw(instruments, mockEnvelope, rendered.asByteBuffer) + + p.expectMsgAllOf("1-!!!", "10-??????", "31-.........") + p.expectNoMsg(100.millis) + } + + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetadataMapSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetadataMapSpec.scala new file mode 100644 index 0000000000..af0b6c69f9 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/MetadataMapSpec.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.util.OptionVal +import org.scalatest.{ Matchers, WordSpec } + +class MetadataMapSpec extends WordSpec with Matchers { + + "MetadataMap" must { + "hasValueFor" in { + val a = MetadataMap[String]() + + a.hasValueFor(0) should ===(false) + a.set(0, "0") + a.hasValueFor(0) should ===(true) + a.hasValueFor(1) should ===(false) + + a.clear() + a.isEmpty should ===(true) + a.nonEmpty should ===(false) + a.hasValueFor(12) should ===(false) + a.hasValueFor(0) should ===(false) + a.set(0, "0") + a.hasValueFor(0) should ===(true) + } + "setting values" in { + val a = MetadataMap[String]() + + a(0) should ===(OptionVal.None) + a.usedSlots should ===(0) + a.set(0, "0") + a(0) should ===(OptionVal.Some("0")) + a.usedSlots should ===(1) + + a.set(0, "1") + a(0) should ===(OptionVal.Some("1")) + a.usedSlots should ===(1) + + a.set(0, null) + a(0) should ===(OptionVal.None) + a.usedSlots should ===(0) + } + } + +}