diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 0ea81fdfb2..72bc68fd46 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -12,7 +12,7 @@ import akka.serialization.{ BaseSerializer, ByteBufferSerializer, SerializationE import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl._ -import akka.util.{ ByteString, OptionVal } +import akka.util.OptionVal import com.typesafe.config.ConfigFactory import java.io.IOException import java.nio.ByteBuffer @@ -119,21 +119,18 @@ class CodecBenchmark { resolvedRef = actorOnSystemA.asInstanceOf[InternalActorRef] recipientStringB = remoteRefB.path.toSerializationFormatWithAddress(addressB) - val metadataContainer: ByteString = if (configType == RemoteInstrument) { - val map = MetadataMap[ByteString] - val instrument = new DummyRemoteInstrument() - val metadata = instrument.remoteMessageSent(remoteRefB, payload, actorOnSystemA) - map.set(instrument.identifier, metadata) - new MetadataMapRendering(map).render() + val remoteInstruments: RemoteInstruments = if (configType == RemoteInstrument) { + new RemoteInstruments(system.asInstanceOf[ExtendedActorSystem], system.log, Vector(new DummyRemoteInstrument())) } else null val envelope = new EnvelopeBuffer(envelopeTemplateBuffer) + val outboundEnvelope = OutboundEnvelope(OptionVal.None, payload, OptionVal.None) headerIn setVersion 1 headerIn setUid 42 headerIn setSenderActorRef actorOnSystemA headerIn setRecipientActorRef remoteRefB headerIn setManifest "" - headerIn setMetadataContainer metadataContainer - MessageSerializer.serializeForArtery(SerializationExtension(system), payload, headerIn, envelope) + headerIn setRemoteInstruments remoteInstruments + MessageSerializer.serializeForArtery(SerializationExtension(system), outboundEnvelope, headerIn, envelope) envelope.byteBuffer.flip() // Now build up the graphs @@ -290,23 +287,30 @@ object CodecBenchmark { // DummyRemoteInstrument that doesn't allocate unnecessary bytes during serialization/deserialization class DummyRemoteInstrument extends RemoteInstrument { - private val Metadata = ByteString("slevin".getBytes) + private val Metadata = "slevin".getBytes override def identifier: Byte = 7 // Lucky number slevin - override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = - Metadata + override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = { + buffer.putInt(Metadata.length) + buffer.put(Metadata) + } - override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = { + override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = { val length = Metadata.length + val metaLength = buffer.getInt @tailrec def compare(pos: Int): Boolean = { if (pos == length) true - else if (Metadata(pos) == metadata(pos)) compare(pos + 1) + else if (Metadata(pos) == buffer.get()) compare(pos + 1) else false } - if (metadata.length != length || !compare(0)) - throw new IOException(s"DummyInstrument deserialization error. Expected ${Metadata.toString} got ${metadata.toString}") + if (metaLength != length || !compare(0)) + throw new IOException(s"DummyInstrument deserialization error. Expected ${Metadata.toString}") } + + override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = () + + override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = () } } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index f1422c7a1a..677df951ac 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -7,7 +7,7 @@ package akka.remote import akka.remote.WireFormats._ import akka.protobuf.ByteString import akka.actor.ExtendedActorSystem -import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder } +import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope } import akka.serialization.Serialization import akka.serialization.ByteBufferSerializer import akka.serialization.SerializationExtension @@ -63,7 +63,8 @@ private[akka] object MessageSerializer { } } - def serializeForArtery(serialization: Serialization, message: AnyRef, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = { + def serializeForArtery(serialization: Serialization, outboundEnvelope: OutboundEnvelope, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = { + val message = outboundEnvelope.message val serializer = serialization.findSerializerFor(message) headerBuilder setSerializer serializer.identifier @@ -76,11 +77,11 @@ private[akka] object MessageSerializer { serializer match { case ser: ByteBufferSerializer ⇒ headerBuilder setManifest manifest - envelope.writeHeader(headerBuilder) + envelope.writeHeader(headerBuilder, outboundEnvelope) ser.toBinary(message, envelope.byteBuffer) case _ ⇒ headerBuilder setManifest manifest - envelope.writeHeader(headerBuilder) + envelope.writeHeader(headerBuilder, outboundEnvelope) envelope.byteBuffer.put(serializer.toBinary(message)) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c92684c727..21470a31eb 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -984,11 +984,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R createFlightRecorderEventSink())) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ - val originAddress = m.association match { - case OptionVal.Some(a) ⇒ OptionVal.Some(a.remoteAddress) - case OptionVal.None ⇒ OptionVal.None - } - messageDispatcher.dispatch(m.recipient.get, m.message, m.sender, originAddress) + messageDispatcher.dispatch(m) m match { case r: ReusableInboundEnvelope ⇒ inboundEnvelopePool.release(r) case _ ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index c2dcd094be..d59ad02c59 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -137,9 +137,6 @@ private[remote] sealed trait HeaderBuilder { def setUid(u: Long): Unit def uid: Long - /** Metadata SPI, internally multiple metadata sections can be represented. */ - def metadataContainer: ByteString - def setSenderActorRef(ref: ActorRef): Unit /** * Retrive the compressed ActorRef by the compressionId carried by this header. @@ -170,21 +167,19 @@ private[remote] sealed trait HeaderBuilder { */ def recipientActorRefPath: OptionVal[String] - def setMetadataContainer(container: ByteString): Unit - def clearMetadataContainer(): Unit - def setSerializer(serializer: Int): Unit def serializer: Int def setManifest(manifest: String): Unit def manifest(originUid: Long): OptionVal[String] + def setRemoteInstruments(instruments: RemoteInstruments): Unit + /** * Reset all fields that are related to an outbound message, * i.e. Encoder calls this as the first thing in onPush. */ def resetMessageFields(): Unit - } /** @@ -230,7 +225,7 @@ private[remote] final class HeaderBuilderImpl( var _manifest: String = null var _manifestIdx: Int = -1 - var _metadataContainer: ByteString = null + var _remoteInstruments: OptionVal[RemoteInstruments] = OptionVal.None override def resetMessageFields(): Unit = { _flags = 0 @@ -242,6 +237,8 @@ private[remote] final class HeaderBuilderImpl( _serializer = 0 _manifest = null _manifestIdx = -1 + + _remoteInstruments = OptionVal.None } override def setVersion(v: Byte) = _version = v @@ -342,17 +339,8 @@ private[remote] final class HeaderBuilderImpl( } } - /** Make sure to prefix the data with an Int-length */ - def setMetadataContainer(container: ByteString): Unit = { - setFlag(EnvelopeBuffer.MetadataPresentFlag, value = container != null) - _metadataContainer = container - } - /** Rendered metadata already contains int-length prefix, no need to add it manually */ - def metadataContainer: ByteString = - _metadataContainer - def clearMetadataContainer(): Unit = { - setFlag(EnvelopeBuffer.MetadataPresentFlag, value = false) - _metadataContainer = null + override def setRemoteInstruments(instruments: RemoteInstruments): Unit = { + _remoteInstruments = OptionVal(instruments) } override def toString = @@ -366,8 +354,7 @@ private[remote] final class HeaderBuilderImpl( "_recipientActorRefIdx:" + _recipientActorRefIdx + ", " + "_serializer:" + _serializer + ", " + "_manifest:" + _manifest + ", " + - "_manifestIdx:" + _manifestIdx + ", " + - "_metadataContainer:" + _metadataContainer + ")" + "_manifestIdx:" + _manifestIdx + ")" } @@ -381,7 +368,9 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { private var literalChars = Array.ofDim[Char](64) private var literalBytes = Array.ofDim[Byte](64) - def writeHeader(h: HeaderBuilder): Unit = { + def writeHeader(h: HeaderBuilder): Unit = writeHeader(h, null) + + def writeHeader(h: HeaderBuilder, oe: OutboundEnvelope): Unit = { val header = h.asInstanceOf[HeaderBuilderImpl] byteBuffer.clear() @@ -395,11 +384,16 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version) byteBuffer.put(ClassManifestCompressionTableVersionOffset, header.outboundClassManifestCompression.version) + // maybe write some metadata + // after metadata is written (or not), buffer is at correct position to continue writing literals byteBuffer.position(MetadataContainerAndLiteralSectionOffset) - if (header.flag(MetadataPresentFlag)) { - // tag if we have metadata or not, as the layout next follows different patterns depending on that - header.metadataContainer.copyToBuffer(byteBuffer) - // after metadata is written, buffer is at correct position to continue writing literals (they "moved forward") + if (header._remoteInstruments.isDefined) { + header._remoteInstruments.get.serialize(OptionVal(oe), byteBuffer) + if (byteBuffer.position() != MetadataContainerAndLiteralSectionOffset) { + // we actually wrote some metadata so update the flag field to reflect that + header.setFlag(MetadataPresentFlag, true) + byteBuffer.put(FlagsOffset, header.flags) + } } // Serialize sender @@ -419,7 +413,6 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.putInt(ClassManifestTagOffset, header._manifestIdx | TagTypeMask) else writeLiteral(ClassManifestTagOffset, header._manifest) - } def parseHeader(h: HeaderBuilder): Unit = { @@ -435,20 +428,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset) header._inboundClassManifestCompressionTableVersion = byteBuffer.get(ClassManifestCompressionTableVersionOffset) + byteBuffer.position(MetadataContainerAndLiteralSectionOffset) if (header.flag(MetadataPresentFlag)) { - byteBuffer.position(MetadataContainerAndLiteralSectionOffset) + // metadata present, so we need to fast forward to the literals that start right after val totalMetadataLength = byteBuffer.getInt() - - ensureLiteralCharsLength(totalMetadataLength) - val bytes = literalBytes - - byteBuffer.get(bytes, 0, totalMetadataLength) - header._metadataContainer = ByteString(bytes).take(totalMetadataLength) - // the literals section starts here, right after the metadata has ended - // thus, no need to move position the buffer again - } else { - // No metadata present, we position the buffer on the place where literals start - byteBuffer.position(MetadataContainerAndLiteralSectionOffset) + byteBuffer.position(byteBuffer.position() + totalMetadataLength) } // Deserialize sender diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 7f676796cf..e7ec13dae7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -64,9 +64,7 @@ private[remote] class Encoder( private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) - private val instruments: Vector[RemoteInstrument] = RemoteInstruments.create(system) - // by being backed by an Array, this allows us to not allocate any wrapper type for the metadata (since we need its ID) - private val serializedMetadatas: MetadataMap[ByteString] = MetadataMap() // TODO: possibly can be optimised a more for the specific access pattern (during write) + private val instruments: RemoteInstruments = RemoteInstruments(system) private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] { case (table, done) ⇒ @@ -120,8 +118,16 @@ private[remote] class Encoder( case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s } - applyAndRenderRemoteMessageSentMetadata(instruments, outboundEnvelope, headerBuilder) - MessageSerializer.serializeForArtery(serialization, outboundEnvelope.message, headerBuilder, envelope) + val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 + if (instruments.nonEmpty) + headerBuilder.setRemoteInstruments(instruments) + + MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope) + + if (instruments.nonEmpty) { + val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0 + instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time) + } } finally Serialization.currentTransportInformation.value = oldValue envelope.byteBuffer.flip() @@ -163,36 +169,6 @@ private[remote] class Encoder( override def onPull(): Unit = pull(in) - /** - * Renders metadata into `headerBuilder`. - * - * Replace all AnyRef's that were passed along with the [[OutboundEnvelope]] into their [[ByteString]] representations, - * by calling `remoteMessageSent` of each enabled instrumentation. If `context` was attached in the envelope it is passed - * into the instrument, otherwise it receives an OptionVal.None as context, and may still decide to attach rendered - * metadata by returning it. - */ - private def applyAndRenderRemoteMessageSentMetadata(instruments: Vector[RemoteInstrument], envelope: OutboundEnvelope, headerBuilder: HeaderBuilder): Unit = { - if (instruments.nonEmpty) { - val n = instruments.length - - var i = 0 - while (i < n) { - val instrument = instruments(i) - val instrumentId = instrument.identifier - - val metadata = instrument.remoteMessageSent(envelope.recipient.orNull, envelope.message, envelope.sender.orNull) - if (metadata ne null) serializedMetadatas.set(instrumentId, metadata) - - i += 1 - } - } - - if (serializedMetadatas.nonEmpty) { - MetadataEnvelopeSerializer.serialize(serializedMetadatas, headerBuilder) - serializedMetadatas.clear() - } - } - /** * External call from ChangeOutboundCompression materialized value */ @@ -511,7 +487,7 @@ private[remote] class Deserializer( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { - private val instruments: Vector[RemoteInstrument] = RemoteInstruments.create(system) + private val instruments: RemoteInstruments = RemoteInstruments(system) private val serialization = SerializationExtension(system) override protected def logSource = classOf[Deserializer] @@ -520,13 +496,18 @@ private[remote] class Deserializer( val envelope = grab(in) try { + val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 + val deserializedMessage = MessageSerializer.deserializeForArtery( system, envelope.originUid, serialization, envelope.serializer, envelope.classManifest, envelope.envelopeBuffer) val envelopeWithMessage = envelope.withMessage(deserializedMessage) - applyIncomingInstruments(envelopeWithMessage) - + if (instruments.nonEmpty) { + instruments.deserialize(envelopeWithMessage) + val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0 + instruments.messageReceived(envelopeWithMessage, envelope.envelopeBuffer.byteBuffer.limit(), time) + } push(out, envelopeWithMessage) } catch { case NonFatal(e) ⇒ @@ -543,22 +524,6 @@ private[remote] class Deserializer( override def onPull(): Unit = pull(in) - private def applyIncomingInstruments(envelope: InboundEnvelope): Unit = { - if (envelope.flag(EnvelopeBuffer.MetadataPresentFlag)) { - val length = instruments.length - if (length == 0) { - // TODO do we need to parse, or can we do a fast forward if debug logging is not enabled? - val metaMetadataEnvelope = MetadataMapParsing.parse(envelope) - if (log.isDebugEnabled) - log.debug("Incoming message envelope contains metadata for instruments: {}, " + - "however no RemoteInstrument was registered in local system!", metaMetadataEnvelope.metadataMap.keysWithValues.mkString("[", ",", "]")) - } else { - // we avoid emitting a MetadataMap and instead directly apply the instruments onto the received metadata - MetadataMapParsing.applyAllRemoteMessageReceived(instruments, envelope) - } - } - } - setHandlers(in, out, this) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala index cf2e8b3b54..02597c8eea 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala @@ -130,6 +130,11 @@ private[remote] final class ReusableInboundEnvelope extends InboundEnvelope { this } + def withEnvelopeBuffer(envelopeBuffer: EnvelopeBuffer): InboundEnvelope = { + _envelopeBuffer = envelopeBuffer + this + } + override def toString: String = s"InboundEnvelope($recipient, $message, $sender, $originUid, $association)" } diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala index 33c4fc917b..523c6bdf21 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -29,15 +29,18 @@ private[remote] class MessageDispatcher( private val log = Logging.withMarker(system, getClass.getName) private val debugLogEnabled = log.isDebugEnabled - def dispatch( - recipient: InternalActorRef, - message: AnyRef, - senderOption: OptionVal[ActorRef], - originAddress: OptionVal[Address]): Unit = { - + def dispatch(inboundEnvelope: InboundEnvelope): Unit = { import provider.remoteSettings.Artery._ import Logging.messageClassName + val recipient = inboundEnvelope.recipient.get + val message = inboundEnvelope.message + val senderOption = inboundEnvelope.sender + val originAddress = inboundEnvelope.association match { + case OptionVal.Some(a) ⇒ OptionVal.Some(a.remoteAddress) + case OptionVal.None ⇒ OptionVal.None + } + val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val originalReceiver = recipient.path diff --git a/akka-remote/src/main/scala/akka/remote/artery/MetadataEnvelopeSerializer.scala b/akka-remote/src/main/scala/akka/remote/artery/MetadataEnvelopeSerializer.scala deleted file mode 100644 index 4e2be61121..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/MetadataEnvelopeSerializer.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery - -import java.nio.{ ByteBuffer, ByteOrder } - -import akka.actor.ExtendedActorSystem -import akka.serialization.Serialization -import akka.util.ByteString.ByteString1C -import akka.util.{ ByteString, ByteStringBuilder } - -/** - * INTERNAL API - */ -private[akka] object MetadataEnvelopeSerializer { - - private[akka] val EmptyRendered = { - implicit val _ByteOrder = ByteOrder.LITTLE_ENDIAN - - val bsb = new ByteStringBuilder - bsb.putInt(0) // 0 length - bsb.result - } - - // key/length of a metadata element are encoded within a single integer: - // supports keys in the range of <0-31> - final val EntryKeyMask = Integer.parseInt("1111 1000 0000 0000 0000 0000 0000 000".replace(" ", ""), 2) - def maskEntryKey(k: Byte): Int = (k.toInt << 26) & EntryKeyMask - def unmaskEntryKey(kv: Int): Byte = ((kv & EntryKeyMask) >> 26).toByte - - final val EntryLengthMask = ~EntryKeyMask - - def maskEntryLength(k: Int): Int = k & EntryLengthMask - def unmaskEntryLength(kv: Int): Int = kv & EntryLengthMask - - def muxEntryKeyLength(k: Byte, l: Int): Int = { - maskEntryKey(k) | maskEntryLength(l) - } - - def serialize(metadatas: MetadataMap[ByteString], headerBuilder: HeaderBuilder): Unit = { - if (metadatas.isEmpty) headerBuilder.clearMetadataContainer() - else { - val container = new MetadataMapRendering(metadatas) - headerBuilder.setMetadataContainer(container.render()) - } - } - - def deserialize(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, - serializer: Int, classManifest: String, envelope: EnvelopeBuffer): AnyRef = { - serialization.deserializeByteBuffer(envelope.byteBuffer, serializer, classManifest) - } -} - -/** - * INTERNAL API - * - * The metadata section is stored as ByteString (prefixed with Int length field, - * the same way as any other literal), however the internal structure of it is as follows: - * - * {{{ - * Metadata entry: - * - * 0 1 2 3 - * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * | Key | Metadata entry length | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * | ... metadata entry ... | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * }}} - * - */ -private[akka] final class MetadataMapRendering(val metadataMap: MetadataMap[ByteString]) extends AnyVal { - import MetadataEnvelopeSerializer._ - - def render(): ByteString = - if (metadataMap.isEmpty) { - // usually no-one will want to render an empty metadata section - it should not be there at all - EmptyRendered - } else { - implicit val _ByteOrder = ByteOrder.LITTLE_ENDIAN - - // TODO optimise this, we could just count along the way and then prefix with the length - val totalSize = 4 /* length int field */ + metadataMap.usedSlots * 4 /* metadata length */ + metadataMap.foldLeftValues(0)(_ + _.length) - val b = new ByteStringBuilder // TODO could we reuse one? - b.sizeHint(totalSize) - - b.putInt(totalSize - 4 /* don't count itself, the length prefix */ ) - // TODO: move through and then prepend length - metadataMap.foreach { (key: Byte, value: ByteString) ⇒ - // TODO try to remove allocation? Iterator otherwise, but that's also allocation - val kl = muxEntryKeyLength(key, value.length) - b.putInt(kl) - value match { - case c: ByteString1C ⇒ c.appendToBuilder(b) // uses putByteArrayUnsafe - case _ ⇒ b ++= value - } - } - b.result() - } -} - -/** INTERNAL API */ -private[akka] object MetadataMapParsing { - import MetadataEnvelopeSerializer._ - - /** Allocates an MetadataMap */ - def parse(envelope: InboundEnvelope): MetadataMapRendering = { - val buf = envelope.envelopeBuffer.byteBuffer - buf.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset) - parseRaw(buf) - } - - /** - * INTERNAL API, only for testing - * The buffer MUST be already at the right position where the Metadata container starts. - */ - private[akka] def parseRaw(buf: ByteBuffer) = { - buf.order(ByteOrder.LITTLE_ENDIAN) - val metadataContainerLength = buf.getInt() - val endOfMetadataPos = metadataContainerLength + buf.position() - val map = MetadataMap[ByteString]() - - while (buf.position() < endOfMetadataPos) { - val kl = buf.getInt() - val k = unmaskEntryKey(kl) // k - val l = unmaskEntryLength(kl) // l - - val arr = Array.ofDim[Byte](l) - buf.get(arr) - val metadata = ByteString1C(arr) // avoids copying again - map.set(k, metadata) - } - - new MetadataMapRendering(map) - } - - /** Implemented in a way to avoid allocations of any envelopes or arrays */ - def applyAllRemoteMessageReceived(instruments: Vector[RemoteInstrument], envelope: InboundEnvelope): Unit = { - val buf = envelope.envelopeBuffer.byteBuffer - buf.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset) - applyAllRemoteMessageReceivedRaw(instruments, envelope, buf) - } - - /** - * INTERNAL API, only for testing - * The buffer MUST be already at the right position where the Metadata container starts. - */ - private[akka] def applyAllRemoteMessageReceivedRaw(instruments: Vector[RemoteInstrument], envelope: InboundEnvelope, buf: ByteBuffer): Unit = { - buf.order(ByteOrder.LITTLE_ENDIAN) - - val metadataContainerLength = buf.getInt() - val endOfMetadataPos = metadataContainerLength + buf.position() - - while (buf.position() < endOfMetadataPos) { - val keyAndLength = buf.getInt() - val key = unmaskEntryKey(keyAndLength) - val length = unmaskEntryLength(keyAndLength) - - val arr = Array.ofDim[Byte](length) // TODO can be optimised to re-cycle this array instead - buf.get(arr) - val data = ByteString(arr) // bytes - - instruments.find(_.identifier == key) match { - case Some(instr) ⇒ instr.remoteMessageReceived(envelope.recipient.orNull, envelope.message, envelope.sender.orNull, data) - case _ ⇒ throw new Exception(s"No RemoteInstrument for id $key available!") - - } - } - } -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala index 9da058947e..f7f8810659 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala @@ -5,7 +5,11 @@ package akka.remote.artery import akka.actor.{ ActorRef, ExtendedActorSystem } -import akka.util.{ ByteString, OptionVal } +import akka.event.{ Logging, LoggingAdapter } +import akka.util.OptionVal +import java.nio.ByteBuffer +import scala.annotation.tailrec +import scala.util.control.NonFatal /** * INTERNAL API @@ -16,48 +20,274 @@ import akka.util.{ ByteString, OptionVal } * Multiple instruments are automatically handled, however they MUST NOT overlap in their idenfitiers. * * Instances of `RemoteInstrument` are created from configuration. A new instance of RemoteInstrument - * will be created for each encoder and decoder. It's only called from the stage, so if it dosn't + * will be created for each encoder and decoder. It's only called from the stage, so if it doesn't * delegate to any shared instance it doesn't have to be thread-safe. */ abstract class RemoteInstrument { /** * Instrument identifier. * - * MUST be >=0 and <32. + * MUST be >=1 and <32. * - * Values between 0 and 7 are reserved for Akka internal use. + * Values between 1 and 7 are reserved for Akka internal use. */ def identifier: Byte /** - * Called right before putting the message onto the wire. - * Parameters MAY be `null` (except `message`)! - * - * @return `metadata` rendered to be serialized into the remove envelope, or `null` if no metadata should be attached + * Should the serialization be timed? Otherwise times are always 0. */ - def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString + def serializationTimingEnabled: Boolean = false /** - * Called once a message (containing a metadata field designated for this instrument) has been deserialized from the wire. + * Called while serializing the message. + * Parameters MAY be `null` (except `message` and `buffer`)! */ - def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit + def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit + /** + * Called right before putting the message onto the wire. + * Parameters MAY be `null` (except `message` and `buffer`)! + * + * The `size` is the total serialized size in bytes of the complete message including akka specific headers and any + * `RemoteInstrument` metadata. + * If `serializationTimingEnabled` returns true, then `time` will be the total time it took to serialize all data + * in the message in nanoseconds, otherwise it is 0. + */ + def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit + + /** + * Called while deserializing the message once a message (containing a metadata field designated for this instrument) is found. + */ + def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit + + /** + * Called when the message has been deserialized. + * + * The `size` is the total serialized size in bytes of the complete message including akka specific headers and any + * `RemoteInstrument` metadata. + * If `serializationTimingEnabled` returns true, then `time` will be the total time it took to deserialize all data + * in the message in nanoseconds, otherwise it is 0. + */ + def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit } -object NoopRemoteInstrument extends RemoteInstrument { - override def identifier: Byte = - -1 +/** + * INTERNAL API + * + * The metadata section is stored as raw bytes (prefixed with an Int length field, + * the same way as any other literal), however the internal structure of it is as follows: + * + * {{{ + * Metadata entry: + * + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Key | Metadata entry length | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | ... metadata entry ... | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * }}} + * + */ +private[remote] final class RemoteInstruments( + private val system: ExtendedActorSystem, + private val log: LoggingAdapter, + _instruments: Vector[RemoteInstrument]) { + import RemoteInstruments._ - override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = - null + def this(system: ExtendedActorSystem, log: LoggingAdapter) = this(system, log, RemoteInstruments.create(system, log)) + def this(system: ExtendedActorSystem) = this(system, Logging.getLogger(system, classOf[RemoteInstruments])) - override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = - () + // keep the remote instruments sorted by identifier to speed up deserialization + private val instruments: Vector[RemoteInstrument] = _instruments.sortBy(_.identifier) + // does any of the instruments want serialization timing? + private val serializationTimingEnabled = instruments.exists(_.serializationTimingEnabled) + + def serialize(outboundEnvelope: OptionVal[OutboundEnvelope], buffer: ByteBuffer): Unit = { + if (instruments.nonEmpty && outboundEnvelope.isDefined) { + val startPos = buffer.position() + val oe = outboundEnvelope.get + try { + buffer.putInt(0) + val dataPos = buffer.position() + var i = 0 + while (i < instruments.length) { + val rewindPos = buffer.position() + val instrument = instruments(i) + try { + serializeInstrument(instrument, oe, buffer) + } catch { + case NonFatal(t) ⇒ + log.debug( + "Skipping serialization of RemoteInstrument {} since it failed with {}", + instrument.identifier, t.getMessage) + buffer.position(rewindPos) + } + i += 1 + } + val endPos = buffer.position() + if (endPos == dataPos) { + // no instruments wrote anything so we need to rewind to start + buffer.position(startPos) + } else { + // some instruments wrote data, so write the total length + buffer.putInt(startPos, endPos - dataPos) + } + } catch { + case NonFatal(t) ⇒ + log.debug("Skipping serialization of all RemoteInstruments due to unhandled failure {}", t) + buffer.position(startPos) + } + } + } + + private def serializeInstrument(instrument: RemoteInstrument, outboundEnvelope: OutboundEnvelope, buffer: ByteBuffer): Unit = { + val startPos = buffer.position() + buffer.putInt(0) + val dataPos = buffer.position() + instrument.remoteWriteMetadata(outboundEnvelope.recipient.orNull, outboundEnvelope.message, outboundEnvelope.sender.orNull, buffer) + val endPos = buffer.position() + if (endPos == dataPos) { + // if the instrument didn't write anything, then rewind to the start + buffer.position(startPos) + } else { + // the instrument wrote something so we need to write the identifier and length + buffer.putInt(startPos, combineKeyLength(instrument.identifier, endPos - dataPos)) + } + } + + def deserialize(inboundEnvelope: InboundEnvelope): Unit = { + if (inboundEnvelope.flag(EnvelopeBuffer.MetadataPresentFlag)) { + inboundEnvelope.envelopeBuffer.byteBuffer.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset) + deserializeRaw(inboundEnvelope) + } + } + + def deserializeRaw(inboundEnvelope: InboundEnvelope): Unit = { + val buffer = inboundEnvelope.envelopeBuffer.byteBuffer + val length = buffer.getInt + val endPos = buffer.position() + length + try { + if (instruments.nonEmpty) { + var i = 0 + while (i < instruments.length && buffer.position() < endPos) { + val instrument = instruments(i) + val startPos = buffer.position() + val keyAndLength = buffer.getInt + val dataPos = buffer.position() + val key = getKey(keyAndLength) + val length = getLength(keyAndLength) + var nextPos = dataPos + length + val identifier = instrument.identifier + if (key == identifier) { + try { + deserializeInstrument(instrument, inboundEnvelope, buffer) + } catch { + case NonFatal(t) ⇒ + log.debug( + "Skipping deserialization of RemoteInstrument {} since it failed with {}", + instrument.identifier, t.getMessage) + } + i += 1 + } else if (key > identifier) { + // since instruments are sorted on both sides skip this local one and retry the serialized one + log.debug("Skipping local RemoteInstrument {} that has no matching data in the message", identifier) + nextPos = startPos + i += 1 + } else { + // since instruments are sorted on both sides skip the serialized one and retry the local one + log.debug("Skipping serialized data in message for RemoteInstrument {} that has no local match", key) + } + buffer.position(nextPos) + } + } else { + if (log.isDebugEnabled) log.debug( + "Skipping serialized data in message for RemoteInstrument(s) {} that has no local match", + remoteInstrumentIdIteratorRaw(buffer, endPos).mkString("[", ", ", "]")) + } + } catch { + case NonFatal(t) ⇒ + log.debug("Skipping further deserialization of remaining RemoteInstruments due to unhandled failure {}", t) + } finally { + buffer.position(endPos) + } + } + + private def deserializeInstrument(instrument: RemoteInstrument, inboundEnvelope: InboundEnvelope, buffer: ByteBuffer): Unit = { + instrument.remoteReadMetadata(inboundEnvelope.recipient.orNull, inboundEnvelope.message, inboundEnvelope.sender.orNull, buffer) + } + + def messageSent(outboundEnvelope: OutboundEnvelope, size: Int, time: Long): Unit = { + @tailrec def messageSent(pos: Int): Unit = { + if (pos < instruments.length) { + val instrument = instruments(pos) + try { + messageSentInstrument(instrument, outboundEnvelope, size, time) + } catch { + case NonFatal(t) ⇒ + log.debug("Message sent in RemoteInstrument {} failed with {}", instrument.identifier, t.getMessage) + } + messageSent(pos + 1) + } + } + messageSent(0) + } + + private def messageSentInstrument(instrument: RemoteInstrument, outboundEnvelope: OutboundEnvelope, size: Int, time: Long): Unit = { + instrument.remoteMessageSent(outboundEnvelope.recipient.orNull, outboundEnvelope.message, outboundEnvelope.sender.orNull, size, time) + } + + def messageReceived(inboundEnvelope: InboundEnvelope, size: Int, time: Long): Unit = { + @tailrec def messageRecieved(pos: Int): Unit = { + if (pos < instruments.length) { + val instrument = instruments(pos) + try { + messageReceivedInstrument(instrument, inboundEnvelope, size, time) + } catch { + case NonFatal(t) ⇒ + log.debug("Message received in RemoteInstrument {} failed with {}", instrument.identifier, t.getMessage) + } + messageRecieved(pos + 1) + } + } + messageRecieved(0) + } + + private def messageReceivedInstrument(instrument: RemoteInstrument, inboundEnvelope: InboundEnvelope, size: Int, time: Long): Unit = { + instrument.remoteMessageReceived(inboundEnvelope.recipient.orNull, inboundEnvelope.message, inboundEnvelope.sender.orNull, size, time) + } + + private def remoteInstrumentIdIteratorRaw(buffer: ByteBuffer, endPos: Int): Iterator[Int] = { + new Iterator[Int] { + override def hasNext: Boolean = buffer.position() < endPos + override def next(): Int = { + val keyAndLength = buffer.getInt + buffer.position(buffer.position() + getLength(keyAndLength)) + getKey(keyAndLength) + } + } + } + + def isEmpty: Boolean = instruments.isEmpty + def nonEmpty: Boolean = instruments.nonEmpty + def timeSerialization = serializationTimingEnabled } /** INTERNAL API */ private[remote] object RemoteInstruments { - def create(system: ExtendedActorSystem): Vector[RemoteInstrument] = { + + def apply(system: ExtendedActorSystem): RemoteInstruments = + new RemoteInstruments(system) + + // key/length of a metadata element are encoded within a single integer: + // supports keys in the range of <0-31> + private final val lengthMask: Int = ~(31 << 26) + def combineKeyLength(k: Byte, l: Int): Int = (k.toInt << 26) | (l & lengthMask) + def getKey(kl: Int): Byte = (kl >>> 26).toByte + def getLength(kl: Int): Int = kl & lengthMask + + def create(system: ExtendedActorSystem, log: LoggingAdapter): Vector[RemoteInstrument] = { val c = system.settings.config val path = "akka.remote.artery.advanced.instruments" import scala.collection.JavaConverters._ @@ -69,124 +299,3 @@ private[remote] object RemoteInstruments { }(collection.breakOut) } } - -/** - * INTERNAL API - * - * This datastructure is specialized for addressing directly into a known IDs slot. - * It is used when deserializing/serializing metadata and we know the ID we want to reach into. - * - * Mutable & NOT thread-safe. - * - * Fixed-size: 32-slots array-backed Map-like structure. - * Lazy: The backing array is allocated lazily only when needed, thus we pay no cost for the metadata array if - * the system is not using metadata. - * Life-cycle: Owned and linked to the lifecycle of an [[OutboundEnvelope]]. - * Re-cycled: Aimed to be re-cycled to produce the least possible GC-churn, by calling `clear()` when done with it. - * - * Reserved keys: The keys 0–7 are reserved for Akka internal purposes and future extensions. - */ -private[remote] final class MetadataMap[T >: Null] { - val capacity = 32 - - protected var backing: Array[T] = null // TODO re-think if a plain LinkedList wouldn't be fine here? - - private var _usedSlots = 0 - - def usedSlots = _usedSlots - - def apply(i: Int): OptionVal[T] = - if (backing == null) OptionVal.None - else OptionVal[T](backing(i)) - - def isEmpty = usedSlots == 0 - - def nonEmpty = !isEmpty - - def hasValueFor(i: Int) = nonEmpty && backing(i) != null - - // FIXME too specialized... - def foldLeftValues[A](zero: A)(f: (A, T) ⇒ A): A = { - var acc: A = zero - var hit = 0 - var i = 0 - while (i < capacity && hit < _usedSlots) { - val it = backing(i) - if (it != null) { - acc = f(acc, it) - hit += 1 - } - i += 1 - } - acc - } - - /** Heavy operation, only used for error logging */ - def keysWithValues: List[Int] = backing.zipWithIndex.filter({ case (t, id) ⇒ t != null }).map(_._2).toList - - def foreach(f: (Byte, T) ⇒ Unit) = { - var i = 0 - var hit = 0 - while (i < capacity && hit < _usedSlots) { - val t = backing(i) - if (t != null) { - f(i.toByte, t) - hit += 1 - } - i += 1 - } - } - - private def allocateIfNeeded(): Unit = - if (backing == null) backing = Array.ofDim[Object](capacity).asInstanceOf[Array[T]] - - /** - * Set a value at given index. - * Setting a null value removes the entry (the slot is marked as not used). - */ - def set(i: Int, t: T): Unit = - if (t == null) { - if (backing == null) () - else { - if (backing(i) != null) _usedSlots -= 1 // we're clearing a spot - backing(i) = null.asInstanceOf[T] - } - } else { - allocateIfNeeded() - - if (backing(i) == null) { - // was empty previously - _usedSlots += 1 - } else { - // replacing previous value, usedSlots remains unchanged - } - - backing(i) = t - } - - /** - * If the backing array was already allocated clears it, if it wasn't does nothing (no-op). - * This is so in order to pay no-cost when not using metadata - clearing then is instant. - */ - def clear() = - if (isEmpty) () - else { - var i = 0 - while (i < capacity) { - backing(i) = null.asInstanceOf[T] - i += 1 - } - _usedSlots = 0 - } - - override def toString() = - if (backing == null) s"MetadataMap()" - else s"MetadataMap(${backing.toList.mkString("[", ",", "]")})" -} - -/** - * INTERNAL API - */ -private[remote] object MetadataMap { - def apply[T >: Null]() = new MetadataMap[T] -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala index 7d953213d8..b6f7736f6f 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala @@ -4,30 +4,23 @@ package akka.remote.artery import java.util.concurrent.atomic.AtomicReference - -import scala.concurrent.duration._ - -import akka.actor.ActorRef -import akka.actor.ActorSelectionMessage -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem -import akka.actor.Extension -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider -import akka.remote.artery.MetadataCarryingSpy.{ RemoteMessageReceived, RemoteMessageSent } +import akka.actor._ import akka.testkit.ImplicitSender -import akka.testkit.SocketUtil._ import akka.testkit.TestActors import akka.testkit.TestProbe import akka.util.ByteString +import java.nio.{ ByteBuffer, CharBuffer } +import java.nio.charset.Charset object MetadataCarryingSpy extends ExtensionId[MetadataCarryingSpy] with ExtensionIdProvider { override def get(system: ActorSystem): MetadataCarryingSpy = super.get(system) override def lookup = MetadataCarryingSpy override def createExtension(system: ExtendedActorSystem): MetadataCarryingSpy = new MetadataCarryingSpy - final case class RemoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef) - final case class RemoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString) + final case class RemoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long) + final case class RemoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long) + final case class RemoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef) + final case class RemoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, metadata: String) } class MetadataCarryingSpy extends Extension { @@ -37,29 +30,66 @@ class MetadataCarryingSpy extends Extension { } class TestInstrument(system: ExtendedActorSystem) extends RemoteInstrument { + import akka.remote.artery.MetadataCarryingSpy._ + + private val charset = Charset.forName("UTF-8") + private val encoder = charset.newEncoder() + private val decoder = charset.newDecoder() override val identifier: Byte = 1 - override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = + override def serializationTimingEnabled: Boolean = true + + override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = message match { case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) ⇒ - val metadata = ByteString("!!!") - MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageSent(recipient, message, sender)) - metadata // this data will be attached to the remote message + val metadata = "!!!" + buffer.putInt(metadata.length) + encoder.encode(CharBuffer.wrap(metadata), buffer, true) + encoder.flush(buffer) + encoder.reset() + MetadataCarryingSpy(system).ref.foreach(_ ! RemoteWriteMetadata(recipient, message, sender)) case _ ⇒ - null } - override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = + override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = message match { case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) ⇒ - MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageReceived(recipient, message, sender, metadata)) + val size = buffer.getInt + val charBuffer = CharBuffer.allocate(size) + decoder.decode(buffer, charBuffer, false) + decoder.reset() + charBuffer.flip() + val metadata = charBuffer.toString + MetadataCarryingSpy(system).ref.foreach(_ ! RemoteReadMetadata(recipient, message, sender, metadata)) + case _ ⇒ + } + + override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = + message match { + case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) ⇒ + MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageSent(recipient, message, sender, size, time)) + case _ ⇒ + } + + override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = + message match { + case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) ⇒ + MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageReceived(recipient, message, sender, size, time)) case _ ⇒ } } object MetadataCarryingSpec { final case class Ping(payload: ByteString = ByteString.empty) + + class ProxyActor(local: ActorRef, remotePath: ActorPath) extends Actor { + val remote = context.system.actorSelection(remotePath) + override def receive = { + case message if sender() == local ⇒ remote ! message + case message ⇒ local ! message + } + } } class MetadataCarryingSpec extends ArteryMultiNodeSpec( @@ -72,6 +102,7 @@ class MetadataCarryingSpec extends ArteryMultiNodeSpec( """) with ImplicitSender { import MetadataCarryingSpec._ + import MetadataCarryingSpy._ "Metadata" should { @@ -85,17 +116,30 @@ class MetadataCarryingSpec extends ArteryMultiNodeSpec( MetadataCarryingSpy(systemB).setProbe(instrumentProbeB.ref) systemB.actorOf(TestActors.echoActorProps, "reply") - systemA.actorSelection(rootActorPath(systemB) / "user" / "reply") ! Ping() + val proxyA = systemA.actorOf(Props(classOf[ProxyActor], testActor, rootActorPath(systemB) / "user" / "reply")) + proxyA ! Ping() expectMsgType[Ping] + val writeA = instrumentProbeA.expectMsgType[RemoteWriteMetadata] val sentA = instrumentProbeA.expectMsgType[RemoteMessageSent] + val readB = instrumentProbeB.expectMsgType[RemoteReadMetadata] val recvdB = instrumentProbeB.expectMsgType[RemoteMessageReceived] - recvdB.metadata should ===(ByteString("!!!")) + readB.metadata should ===("!!!") + sentA.size should be > 0 + sentA.time should be > 0L + recvdB.size should ===(sentA.size) + recvdB.time should be > 0L // for the reply + val writeB = instrumentProbeB.expectMsgType[RemoteWriteMetadata] val sentB = instrumentProbeB.expectMsgType[RemoteMessageSent] + val readA = instrumentProbeA.expectMsgType[RemoteReadMetadata] val recvdA = instrumentProbeA.expectMsgType[RemoteMessageReceived] - recvdA.metadata should ===(ByteString("!!!")) + readA.metadata should ===("!!!") + sentB.size should be > 0 + sentB.time should be > 0L + recvdA.size should ===(sentB.size) + recvdA.time should be > 0L } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetadataContainerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetadataContainerSpec.scala deleted file mode 100644 index a1a4907100..0000000000 --- a/akka-remote/src/test/scala/akka/remote/artery/MetadataContainerSpec.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.artery - -import akka.actor.ActorRef -import akka.testkit.{ AkkaSpec, TestProbe } -import akka.util.ByteString -import scala.concurrent.duration._ - -class MetadataContainerSpec extends AkkaSpec { - - "MetadataContainer" should { - "parse, given empty map" in { - val map = new MetadataMap[ByteString] - val container = new MetadataMapRendering(map) - - val rendered = container.render() - val back = MetadataMapParsing.parseRaw(rendered.asByteBuffer) - - map.toString() should ===(back.metadataMap.toString()) - } - "parse, given 1 allocated in map" in { - val map = new MetadataMap[ByteString] - val container = new MetadataMapRendering(map) - map.set(1, ByteString("!!!")) - - val rendered = container.render() - val back = MetadataMapParsing.parseRaw(rendered.asByteBuffer) - - map.toString() should ===(back.metadataMap.toString()) - } - - "apply, given 3 allocated in map" in { - val map = new MetadataMap[ByteString] - val container = new MetadataMapRendering(map) - map.set(1, ByteString("!!!")) - map.set(10, ByteString("??????")) - map.set(31, ByteString(".........")) - - val p = TestProbe() - - def testInstrument(id: Int): RemoteInstrument = { - new RemoteInstrument { - override def identifier: Byte = id.toByte - override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = ??? - override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = - p.ref ! s"${identifier}-${metadata.utf8String}" - } - } - val instruments = Vector( - testInstrument(1), testInstrument(31), testInstrument(10) - ) - - val rendered = container.render() - - val mockEnvelope = new ReusableInboundEnvelope - MetadataMapParsing.applyAllRemoteMessageReceivedRaw(instruments, mockEnvelope, rendered.asByteBuffer) - - p.expectMsgAllOf("1-!!!", "10-??????", "31-.........") - p.expectNoMsg(100.millis) - } - - } -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetadataMapSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetadataMapSpec.scala deleted file mode 100644 index af0b6c69f9..0000000000 --- a/akka-remote/src/test/scala/akka/remote/artery/MetadataMapSpec.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.artery - -import akka.util.OptionVal -import org.scalatest.{ Matchers, WordSpec } - -class MetadataMapSpec extends WordSpec with Matchers { - - "MetadataMap" must { - "hasValueFor" in { - val a = MetadataMap[String]() - - a.hasValueFor(0) should ===(false) - a.set(0, "0") - a.hasValueFor(0) should ===(true) - a.hasValueFor(1) should ===(false) - - a.clear() - a.isEmpty should ===(true) - a.nonEmpty should ===(false) - a.hasValueFor(12) should ===(false) - a.hasValueFor(0) should ===(false) - a.set(0, "0") - a.hasValueFor(0) should ===(true) - } - "setting values" in { - val a = MetadataMap[String]() - - a(0) should ===(OptionVal.None) - a.usedSlots should ===(0) - a.set(0, "0") - a(0) should ===(OptionVal.Some("0")) - a.usedSlots should ===(1) - - a.set(0, "1") - a(0) should ===(OptionVal.Some("1")) - a.usedSlots should ===(1) - - a.set(0, null) - a(0) should ===(OptionVal.None) - a.usedSlots should ===(0) - } - } - -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSerializationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSerializationSpec.scala new file mode 100644 index 0000000000..cc6db063d3 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSerializationSpec.scala @@ -0,0 +1,192 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.{ ActorRef, ActorSystem, ExtendedActorSystem, InternalActorRef } +import akka.event._ +import akka.testkit.TestEvent.Mute +import akka.testkit.{ AkkaSpec, EventFilter, TestEvent, TestProbe } +import akka.util.OptionVal +import java.nio.{ ByteBuffer, CharBuffer } +import java.nio.charset.Charset +import scala.concurrent.duration._ + +class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG") { + import RemoteInstrumentsSerializationSpec._ + + def remoteInstruments(instruments: RemoteInstrument*): RemoteInstruments = { + val vec = Vector(instruments: _*) + new RemoteInstruments(system.asInstanceOf[ExtendedActorSystem], system.log, vec) + } + + def ensureDebugLog[T](messages: String*)(f: ⇒ T): T = { + if (messages.isEmpty) + f + else + EventFilter.debug(message = messages.head, occurrences = 1) intercept { + ensureDebugLog(messages.tail: _*)(f) + } + } + + "RemoteInstruments" should { + "not write anything in the buffer if not deserializing" in { + val buffer = ByteBuffer.allocate(1024) + serialize(remoteInstruments(), buffer) + buffer.position() should be(0) + } + + "serialize and deserialize a single remote instrument" in { + val p = TestProbe() + val ri = remoteInstruments(testInstrument(1, "!")) + serializeDeserialize(ri, ri, p.ref, "foo") + p.expectMsgAllOf("foo-1-!") + p.expectNoMsg(100.millis) + } + + "serialize and deserialize multiple remote instruments in the correct order" in { + val p = TestProbe() + val ri = remoteInstruments(testInstrument(1, "!"), testInstrument(31, "???"), testInstrument(10, "..")) + serializeDeserialize(ri, ri, p.ref, "bar") + p.expectMsgAllOf("bar-1-!", "bar-10-..", "bar-31-???") + p.expectNoMsg(100.millis) + } + + "skip exitsing remote instruments not in the message" in { + ensureDebugLog( + "Skipping local RemoteInstrument 10 that has no matching data in the message") { + val p = TestProbe() + val instruments = Seq(testInstrument(7, "!"), testInstrument(10, ".."), testInstrument(21, "???")) + val riS = remoteInstruments(instruments(0), instruments(2)) + val riD = remoteInstruments(instruments: _*) + serializeDeserialize(riS, riD, p.ref, "baz") + p.expectMsgAllOf("baz-7-!", "baz-21-???") + p.expectNoMsg(100.millis) + } + } + + "skip remote instruments in the message that are not existing" in { + ensureDebugLog( + "Skipping serialized data in message for RemoteInstrument 11 that has no local match") { + val p = TestProbe() + val instruments = Seq(testInstrument(6, "!"), testInstrument(11, ".."), testInstrument(19, "???")) + val riS = remoteInstruments(instruments: _*) + val riD = remoteInstruments(instruments(0), instruments(2)) + serializeDeserialize(riS, riD, p.ref, "buz") + p.expectMsgAllOf("buz-6-!", "buz-19-???") + p.expectNoMsg(100.millis) + } + } + + "skip all remote instruments in the message if none are existing" in { + ensureDebugLog( + "Skipping serialized data in message for RemoteInstrument(s) [1, 10, 31] that has no local match") { + val p = TestProbe() + val instruments = Seq(testInstrument(1, "!"), testInstrument(10, ".."), testInstrument(31, "???")) + val riS = remoteInstruments(instruments: _*) + val riD = remoteInstruments() + serializeDeserialize(riS, riD, p.ref, "boz") + p.expectNoMsg(100.millis) + } + } + + "skip serializing remote instrument that fails" in { + ensureDebugLog( + "Skipping serialization of RemoteInstrument 7 since it failed with boom", + "Skipping local RemoteInstrument 7 that has no matching data in the message") { + val p = TestProbe() + val instruments = Seq( + testInstrument(7, "!", sentThrowable = boom), testInstrument(10, ".."), testInstrument(21, "???")) + val ri = remoteInstruments(instruments: _*) + serializeDeserialize(ri, ri, p.ref, "woot") + p.expectMsgAllOf("woot-10-..", "woot-21-???") + p.expectNoMsg(100.millis) + } + } + + "skip deserializing remote instrument that fails" in { + ensureDebugLog( + "Skipping deserialization of RemoteInstrument 7 since it failed with boom", + "Skipping deserialization of RemoteInstrument 21 since it failed with boom") { + val p = TestProbe() + val instruments = Seq( + testInstrument(7, "!", receiveThrowable = boom), testInstrument(10, ".."), + testInstrument(21, "???", receiveThrowable = boom)) + val ri = remoteInstruments(instruments: _*) + serializeDeserialize(ri, ri, p.ref, "waat") + p.expectMsgAllOf("waat-10-..") + p.expectNoMsg(100.millis) + } + } + } +} + +object RemoteInstrumentsSerializationSpec { + + class Filter(settings: ActorSystem.Settings, stream: EventStream) extends LoggingFilter { + stream.publish(Mute(EventFilter.debug())) + + override def isErrorEnabled(logClass: Class[_], logSource: String): Boolean = true + + override def isWarningEnabled(logClass: Class[_], logSource: String): Boolean = true + + override def isInfoEnabled(logClass: Class[_], logSource: String): Boolean = true + + override def isDebugEnabled(logClass: Class[_], logSource: String): Boolean = logSource == "DebugSource" + } + + def testInstrument(id: Int, metadata: String, sentThrowable: Throwable = null, receiveThrowable: Throwable = null): RemoteInstrument = { + new RemoteInstrument { + private val charset = Charset.forName("UTF-8") + private val encoder = charset.newEncoder() + private val decoder = charset.newDecoder() + + override def identifier: Byte = id.toByte + + override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = { + buffer.putInt(metadata.length) + if (sentThrowable ne null) throw sentThrowable + encoder.encode(CharBuffer.wrap(metadata), buffer, true) + encoder.flush(buffer) + encoder.reset() + } + + override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = { + val size = buffer.getInt + if (receiveThrowable ne null) throw receiveThrowable + val charBuffer = CharBuffer.allocate(size) + decoder.decode(buffer, charBuffer, false) + decoder.reset() + charBuffer.flip() + val string = charBuffer.toString + recipient ! s"$message-$identifier-$string" + } + + override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = () + + override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = () + } + } + + def serialize(ri: RemoteInstruments, buffer: ByteBuffer): Unit = { + val mockOutbound = new ReusableOutboundEnvelope() + ri.serialize(OptionVal(mockOutbound), buffer) + } + + def deserialize(ri: RemoteInstruments, buffer: ByteBuffer, recipient: ActorRef, message: AnyRef): Unit = { + val r = recipient.asInstanceOf[InternalActorRef] + val envelopeBuffer = new EnvelopeBuffer(buffer) + val mockInbound = + new ReusableInboundEnvelope().withEnvelopeBuffer(envelopeBuffer).withRecipient(r).withMessage(message) + ri.deserializeRaw(mockInbound) + } + + def serializeDeserialize(riS: RemoteInstruments, riD: RemoteInstruments, recipient: ActorRef, message: AnyRef): Unit = { + val buffer = ByteBuffer.allocate(1024) + serialize(riS, buffer) + buffer.flip() + deserialize(riD, buffer, recipient, message) + } + + val boom = new IllegalArgumentException("boom") +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetaMetadataSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSpec.scala similarity index 50% rename from akka-remote/src/test/scala/akka/remote/artery/MetaMetadataSerializerSpec.scala rename to akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSpec.scala index 2652282994..e6ef103360 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/MetaMetadataSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSpec.scala @@ -7,7 +7,7 @@ import org.scalacheck.{ Arbitrary, Gen } import org.scalatest.prop.Checkers import org.scalatest.{ Matchers, WordSpec } -class MetaMetadataSerializerSpec extends WordSpec with Matchers with Checkers { +class RemoteInstrumentsSpec extends WordSpec with Matchers with Checkers { case class KeyLen(k: Key, l: Len) { override def toString = s" key = ${k}, len = ${l}" @@ -22,48 +22,50 @@ class MetaMetadataSerializerSpec extends WordSpec with Matchers with Checkers { } yield KeyLen(key, len) } - "MetaMetadataSerializer" must { + "RemoteInstruments" must { - "perform roundtrip masking/unmasking of entry key+length" in { + "combine and decompose single key and length" in { val key: Byte = 17 val len = 812 - val kl = MetadataEnvelopeSerializer.muxEntryKeyLength(key, len) + val kl = RemoteInstruments.combineKeyLength(key, len) - val key2 = MetadataEnvelopeSerializer.unmaskEntryKey(kl) + val key2 = RemoteInstruments.getKey(kl) key2 should ===(key) - val len2 = MetadataEnvelopeSerializer.unmaskEntryLength(kl) + val len2 = RemoteInstruments.getLength(kl) len2 should ===(len) } - "perform key roundtrip using mask/unmask" in { + "combine and decompose key with 0 multiple times" in { check { (kl: KeyLen) ⇒ val k = kl.k - val masked = MetadataEnvelopeSerializer.maskEntryKey(k) - val uk = MetadataEnvelopeSerializer.unmaskEntryKey(masked) + val masked = RemoteInstruments.combineKeyLength(k, 0) + val uk = RemoteInstruments.getKey(masked) uk should ===(k) uk == k } } - "perform length roundtrip using mask/unmask" in { + + "combine and decompose length with 0 multiple times" in { check { (kl: KeyLen) ⇒ val l = kl.l - val masked = MetadataEnvelopeSerializer.maskEntryLength(l) - val ul = MetadataEnvelopeSerializer.unmaskEntryLength(masked) + val masked = RemoteInstruments.combineKeyLength(0, l) + val ul = RemoteInstruments.getLength(masked) ul should ===(l) ul == l } } - "perform muxed roundtrip using mask/unmask" in { + + "combine and decompose key and length multiple times" in { check { (kl: KeyLen) ⇒ val k = kl.k val l = kl.l - val masked = MetadataEnvelopeSerializer.muxEntryKeyLength(k, l) - val uk = MetadataEnvelopeSerializer.unmaskEntryKey(masked) + val masked = RemoteInstruments.combineKeyLength(k, l) + val uk = RemoteInstruments.getKey(masked) uk should ===(k) - val ul = MetadataEnvelopeSerializer.unmaskEntryLength(masked) + val ul = RemoteInstruments.getLength(masked) ul should ===(l) ul == l && uk == k }