=rem Streamline Artery encoding of metadata to avoid allocations

* Change RemoteInstrument API to work with ByteBuffers
* Remove the MetadataMap since serialization is done in buffer
This commit is contained in:
Björn Antonsson 2016-12-06 09:10:12 +01:00 committed by Björn Antonsson
parent 913c166f88
commit 8d05592e2e
14 changed files with 609 additions and 589 deletions

View file

@ -12,7 +12,7 @@ import akka.serialization.{ BaseSerializer, ByteBufferSerializer, SerializationE
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.util.{ ByteString, OptionVal } import akka.util.OptionVal
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import java.io.IOException import java.io.IOException
import java.nio.ByteBuffer import java.nio.ByteBuffer
@ -119,21 +119,18 @@ class CodecBenchmark {
resolvedRef = actorOnSystemA.asInstanceOf[InternalActorRef] resolvedRef = actorOnSystemA.asInstanceOf[InternalActorRef]
recipientStringB = remoteRefB.path.toSerializationFormatWithAddress(addressB) recipientStringB = remoteRefB.path.toSerializationFormatWithAddress(addressB)
val metadataContainer: ByteString = if (configType == RemoteInstrument) { val remoteInstruments: RemoteInstruments = if (configType == RemoteInstrument) {
val map = MetadataMap[ByteString] new RemoteInstruments(system.asInstanceOf[ExtendedActorSystem], system.log, Vector(new DummyRemoteInstrument()))
val instrument = new DummyRemoteInstrument()
val metadata = instrument.remoteMessageSent(remoteRefB, payload, actorOnSystemA)
map.set(instrument.identifier, metadata)
new MetadataMapRendering(map).render()
} else null } else null
val envelope = new EnvelopeBuffer(envelopeTemplateBuffer) val envelope = new EnvelopeBuffer(envelopeTemplateBuffer)
val outboundEnvelope = OutboundEnvelope(OptionVal.None, payload, OptionVal.None)
headerIn setVersion 1 headerIn setVersion 1
headerIn setUid 42 headerIn setUid 42
headerIn setSenderActorRef actorOnSystemA headerIn setSenderActorRef actorOnSystemA
headerIn setRecipientActorRef remoteRefB headerIn setRecipientActorRef remoteRefB
headerIn setManifest "" headerIn setManifest ""
headerIn setMetadataContainer metadataContainer headerIn setRemoteInstruments remoteInstruments
MessageSerializer.serializeForArtery(SerializationExtension(system), payload, headerIn, envelope) MessageSerializer.serializeForArtery(SerializationExtension(system), outboundEnvelope, headerIn, envelope)
envelope.byteBuffer.flip() envelope.byteBuffer.flip()
// Now build up the graphs // Now build up the graphs
@ -290,23 +287,30 @@ object CodecBenchmark {
// DummyRemoteInstrument that doesn't allocate unnecessary bytes during serialization/deserialization // DummyRemoteInstrument that doesn't allocate unnecessary bytes during serialization/deserialization
class DummyRemoteInstrument extends RemoteInstrument { class DummyRemoteInstrument extends RemoteInstrument {
private val Metadata = ByteString("slevin".getBytes) private val Metadata = "slevin".getBytes
override def identifier: Byte = 7 // Lucky number slevin override def identifier: Byte = 7 // Lucky number slevin
override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = {
Metadata buffer.putInt(Metadata.length)
buffer.put(Metadata)
}
override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = { override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = {
val length = Metadata.length val length = Metadata.length
val metaLength = buffer.getInt
@tailrec @tailrec
def compare(pos: Int): Boolean = { def compare(pos: Int): Boolean = {
if (pos == length) true if (pos == length) true
else if (Metadata(pos) == metadata(pos)) compare(pos + 1) else if (Metadata(pos) == buffer.get()) compare(pos + 1)
else false else false
} }
if (metadata.length != length || !compare(0)) if (metaLength != length || !compare(0))
throw new IOException(s"DummyInstrument deserialization error. Expected ${Metadata.toString} got ${metadata.toString}") throw new IOException(s"DummyInstrument deserialization error. Expected ${Metadata.toString}")
} }
override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = ()
override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = ()
} }
} }

View file

@ -7,7 +7,7 @@ package akka.remote
import akka.remote.WireFormats._ import akka.remote.WireFormats._
import akka.protobuf.ByteString import akka.protobuf.ByteString
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder } import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope }
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.serialization.ByteBufferSerializer import akka.serialization.ByteBufferSerializer
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
@ -63,7 +63,8 @@ private[akka] object MessageSerializer {
} }
} }
def serializeForArtery(serialization: Serialization, message: AnyRef, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = { def serializeForArtery(serialization: Serialization, outboundEnvelope: OutboundEnvelope, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = {
val message = outboundEnvelope.message
val serializer = serialization.findSerializerFor(message) val serializer = serialization.findSerializerFor(message)
headerBuilder setSerializer serializer.identifier headerBuilder setSerializer serializer.identifier
@ -76,11 +77,11 @@ private[akka] object MessageSerializer {
serializer match { serializer match {
case ser: ByteBufferSerializer case ser: ByteBufferSerializer
headerBuilder setManifest manifest headerBuilder setManifest manifest
envelope.writeHeader(headerBuilder) envelope.writeHeader(headerBuilder, outboundEnvelope)
ser.toBinary(message, envelope.byteBuffer) ser.toBinary(message, envelope.byteBuffer)
case _ case _
headerBuilder setManifest manifest headerBuilder setManifest manifest
envelope.writeHeader(headerBuilder) envelope.writeHeader(headerBuilder, outboundEnvelope)
envelope.byteBuffer.put(serializer.toBinary(message)) envelope.byteBuffer.put(serializer.toBinary(message))
} }
} }

View file

@ -984,11 +984,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
createFlightRecorderEventSink())) createFlightRecorderEventSink()))
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
val originAddress = m.association match { messageDispatcher.dispatch(m)
case OptionVal.Some(a) OptionVal.Some(a.remoteAddress)
case OptionVal.None OptionVal.None
}
messageDispatcher.dispatch(m.recipient.get, m.message, m.sender, originAddress)
m match { m match {
case r: ReusableInboundEnvelope inboundEnvelopePool.release(r) case r: ReusableInboundEnvelope inboundEnvelopePool.release(r)
case _ case _

View file

@ -137,9 +137,6 @@ private[remote] sealed trait HeaderBuilder {
def setUid(u: Long): Unit def setUid(u: Long): Unit
def uid: Long def uid: Long
/** Metadata SPI, internally multiple metadata sections can be represented. */
def metadataContainer: ByteString
def setSenderActorRef(ref: ActorRef): Unit def setSenderActorRef(ref: ActorRef): Unit
/** /**
* Retrive the compressed ActorRef by the compressionId carried by this header. * Retrive the compressed ActorRef by the compressionId carried by this header.
@ -170,21 +167,19 @@ private[remote] sealed trait HeaderBuilder {
*/ */
def recipientActorRefPath: OptionVal[String] def recipientActorRefPath: OptionVal[String]
def setMetadataContainer(container: ByteString): Unit
def clearMetadataContainer(): Unit
def setSerializer(serializer: Int): Unit def setSerializer(serializer: Int): Unit
def serializer: Int def serializer: Int
def setManifest(manifest: String): Unit def setManifest(manifest: String): Unit
def manifest(originUid: Long): OptionVal[String] def manifest(originUid: Long): OptionVal[String]
def setRemoteInstruments(instruments: RemoteInstruments): Unit
/** /**
* Reset all fields that are related to an outbound message, * Reset all fields that are related to an outbound message,
* i.e. Encoder calls this as the first thing in onPush. * i.e. Encoder calls this as the first thing in onPush.
*/ */
def resetMessageFields(): Unit def resetMessageFields(): Unit
} }
/** /**
@ -230,7 +225,7 @@ private[remote] final class HeaderBuilderImpl(
var _manifest: String = null var _manifest: String = null
var _manifestIdx: Int = -1 var _manifestIdx: Int = -1
var _metadataContainer: ByteString = null var _remoteInstruments: OptionVal[RemoteInstruments] = OptionVal.None
override def resetMessageFields(): Unit = { override def resetMessageFields(): Unit = {
_flags = 0 _flags = 0
@ -242,6 +237,8 @@ private[remote] final class HeaderBuilderImpl(
_serializer = 0 _serializer = 0
_manifest = null _manifest = null
_manifestIdx = -1 _manifestIdx = -1
_remoteInstruments = OptionVal.None
} }
override def setVersion(v: Byte) = _version = v override def setVersion(v: Byte) = _version = v
@ -342,17 +339,8 @@ private[remote] final class HeaderBuilderImpl(
} }
} }
/** Make sure to prefix the data with an Int-length */ override def setRemoteInstruments(instruments: RemoteInstruments): Unit = {
def setMetadataContainer(container: ByteString): Unit = { _remoteInstruments = OptionVal(instruments)
setFlag(EnvelopeBuffer.MetadataPresentFlag, value = container != null)
_metadataContainer = container
}
/** Rendered metadata already contains int-length prefix, no need to add it manually */
def metadataContainer: ByteString =
_metadataContainer
def clearMetadataContainer(): Unit = {
setFlag(EnvelopeBuffer.MetadataPresentFlag, value = false)
_metadataContainer = null
} }
override def toString = override def toString =
@ -366,8 +354,7 @@ private[remote] final class HeaderBuilderImpl(
"_recipientActorRefIdx:" + _recipientActorRefIdx + ", " + "_recipientActorRefIdx:" + _recipientActorRefIdx + ", " +
"_serializer:" + _serializer + ", " + "_serializer:" + _serializer + ", " +
"_manifest:" + _manifest + ", " + "_manifest:" + _manifest + ", " +
"_manifestIdx:" + _manifestIdx + ", " + "_manifestIdx:" + _manifestIdx + ")"
"_metadataContainer:" + _metadataContainer + ")"
} }
@ -381,7 +368,9 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
private var literalChars = Array.ofDim[Char](64) private var literalChars = Array.ofDim[Char](64)
private var literalBytes = Array.ofDim[Byte](64) private var literalBytes = Array.ofDim[Byte](64)
def writeHeader(h: HeaderBuilder): Unit = { def writeHeader(h: HeaderBuilder): Unit = writeHeader(h, null)
def writeHeader(h: HeaderBuilder, oe: OutboundEnvelope): Unit = {
val header = h.asInstanceOf[HeaderBuilderImpl] val header = h.asInstanceOf[HeaderBuilderImpl]
byteBuffer.clear() byteBuffer.clear()
@ -395,11 +384,16 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version) byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version)
byteBuffer.put(ClassManifestCompressionTableVersionOffset, header.outboundClassManifestCompression.version) byteBuffer.put(ClassManifestCompressionTableVersionOffset, header.outboundClassManifestCompression.version)
// maybe write some metadata
// after metadata is written (or not), buffer is at correct position to continue writing literals
byteBuffer.position(MetadataContainerAndLiteralSectionOffset) byteBuffer.position(MetadataContainerAndLiteralSectionOffset)
if (header.flag(MetadataPresentFlag)) { if (header._remoteInstruments.isDefined) {
// tag if we have metadata or not, as the layout next follows different patterns depending on that header._remoteInstruments.get.serialize(OptionVal(oe), byteBuffer)
header.metadataContainer.copyToBuffer(byteBuffer) if (byteBuffer.position() != MetadataContainerAndLiteralSectionOffset) {
// after metadata is written, buffer is at correct position to continue writing literals (they "moved forward") // we actually wrote some metadata so update the flag field to reflect that
header.setFlag(MetadataPresentFlag, true)
byteBuffer.put(FlagsOffset, header.flags)
}
} }
// Serialize sender // Serialize sender
@ -419,7 +413,6 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
byteBuffer.putInt(ClassManifestTagOffset, header._manifestIdx | TagTypeMask) byteBuffer.putInt(ClassManifestTagOffset, header._manifestIdx | TagTypeMask)
else else
writeLiteral(ClassManifestTagOffset, header._manifest) writeLiteral(ClassManifestTagOffset, header._manifest)
} }
def parseHeader(h: HeaderBuilder): Unit = { def parseHeader(h: HeaderBuilder): Unit = {
@ -435,20 +428,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset) header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset)
header._inboundClassManifestCompressionTableVersion = byteBuffer.get(ClassManifestCompressionTableVersionOffset) header._inboundClassManifestCompressionTableVersion = byteBuffer.get(ClassManifestCompressionTableVersionOffset)
byteBuffer.position(MetadataContainerAndLiteralSectionOffset)
if (header.flag(MetadataPresentFlag)) { if (header.flag(MetadataPresentFlag)) {
byteBuffer.position(MetadataContainerAndLiteralSectionOffset) // metadata present, so we need to fast forward to the literals that start right after
val totalMetadataLength = byteBuffer.getInt() val totalMetadataLength = byteBuffer.getInt()
byteBuffer.position(byteBuffer.position() + totalMetadataLength)
ensureLiteralCharsLength(totalMetadataLength)
val bytes = literalBytes
byteBuffer.get(bytes, 0, totalMetadataLength)
header._metadataContainer = ByteString(bytes).take(totalMetadataLength)
// the literals section starts here, right after the metadata has ended
// thus, no need to move position the buffer again
} else {
// No metadata present, we position the buffer on the place where literals start
byteBuffer.position(MetadataContainerAndLiteralSectionOffset)
} }
// Deserialize sender // Deserialize sender

View file

@ -64,9 +64,7 @@ private[remote] class Encoder(
private val serialization = SerializationExtension(system) private val serialization = SerializationExtension(system)
private val serializationInfo = Serialization.Information(localAddress, system) private val serializationInfo = Serialization.Information(localAddress, system)
private val instruments: Vector[RemoteInstrument] = RemoteInstruments.create(system) private val instruments: RemoteInstruments = RemoteInstruments(system)
// by being backed by an Array, this allows us to not allocate any wrapper type for the metadata (since we need its ID)
private val serializedMetadatas: MetadataMap[ByteString] = MetadataMap() // TODO: possibly can be optimised a more for the specific access pattern (during write)
private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] { private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] {
case (table, done) case (table, done)
@ -120,8 +118,16 @@ private[remote] class Encoder(
case OptionVal.Some(s) headerBuilder setSenderActorRef s case OptionVal.Some(s) headerBuilder setSenderActorRef s
} }
applyAndRenderRemoteMessageSentMetadata(instruments, outboundEnvelope, headerBuilder) val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
MessageSerializer.serializeForArtery(serialization, outboundEnvelope.message, headerBuilder, envelope) if (instruments.nonEmpty)
headerBuilder.setRemoteInstruments(instruments)
MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope)
if (instruments.nonEmpty) {
val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0
instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time)
}
} finally Serialization.currentTransportInformation.value = oldValue } finally Serialization.currentTransportInformation.value = oldValue
envelope.byteBuffer.flip() envelope.byteBuffer.flip()
@ -163,36 +169,6 @@ private[remote] class Encoder(
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
/**
* Renders metadata into `headerBuilder`.
*
* Replace all AnyRef's that were passed along with the [[OutboundEnvelope]] into their [[ByteString]] representations,
* by calling `remoteMessageSent` of each enabled instrumentation. If `context` was attached in the envelope it is passed
* into the instrument, otherwise it receives an OptionVal.None as context, and may still decide to attach rendered
* metadata by returning it.
*/
private def applyAndRenderRemoteMessageSentMetadata(instruments: Vector[RemoteInstrument], envelope: OutboundEnvelope, headerBuilder: HeaderBuilder): Unit = {
if (instruments.nonEmpty) {
val n = instruments.length
var i = 0
while (i < n) {
val instrument = instruments(i)
val instrumentId = instrument.identifier
val metadata = instrument.remoteMessageSent(envelope.recipient.orNull, envelope.message, envelope.sender.orNull)
if (metadata ne null) serializedMetadatas.set(instrumentId, metadata)
i += 1
}
}
if (serializedMetadatas.nonEmpty) {
MetadataEnvelopeSerializer.serialize(serializedMetadatas, headerBuilder)
serializedMetadatas.clear()
}
}
/** /**
* External call from ChangeOutboundCompression materialized value * External call from ChangeOutboundCompression materialized value
*/ */
@ -511,7 +487,7 @@ private[remote] class Deserializer(
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private val instruments: Vector[RemoteInstrument] = RemoteInstruments.create(system) private val instruments: RemoteInstruments = RemoteInstruments(system)
private val serialization = SerializationExtension(system) private val serialization = SerializationExtension(system)
override protected def logSource = classOf[Deserializer] override protected def logSource = classOf[Deserializer]
@ -520,13 +496,18 @@ private[remote] class Deserializer(
val envelope = grab(in) val envelope = grab(in)
try { try {
val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
val deserializedMessage = MessageSerializer.deserializeForArtery( val deserializedMessage = MessageSerializer.deserializeForArtery(
system, envelope.originUid, serialization, envelope.serializer, envelope.classManifest, envelope.envelopeBuffer) system, envelope.originUid, serialization, envelope.serializer, envelope.classManifest, envelope.envelopeBuffer)
val envelopeWithMessage = envelope.withMessage(deserializedMessage) val envelopeWithMessage = envelope.withMessage(deserializedMessage)
applyIncomingInstruments(envelopeWithMessage) if (instruments.nonEmpty) {
instruments.deserialize(envelopeWithMessage)
val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0
instruments.messageReceived(envelopeWithMessage, envelope.envelopeBuffer.byteBuffer.limit(), time)
}
push(out, envelopeWithMessage) push(out, envelopeWithMessage)
} catch { } catch {
case NonFatal(e) case NonFatal(e)
@ -543,22 +524,6 @@ private[remote] class Deserializer(
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
private def applyIncomingInstruments(envelope: InboundEnvelope): Unit = {
if (envelope.flag(EnvelopeBuffer.MetadataPresentFlag)) {
val length = instruments.length
if (length == 0) {
// TODO do we need to parse, or can we do a fast forward if debug logging is not enabled?
val metaMetadataEnvelope = MetadataMapParsing.parse(envelope)
if (log.isDebugEnabled)
log.debug("Incoming message envelope contains metadata for instruments: {}, " +
"however no RemoteInstrument was registered in local system!", metaMetadataEnvelope.metadataMap.keysWithValues.mkString("[", ",", "]"))
} else {
// we avoid emitting a MetadataMap and instead directly apply the instruments onto the received metadata
MetadataMapParsing.applyAllRemoteMessageReceived(instruments, envelope)
}
}
}
setHandlers(in, out, this) setHandlers(in, out, this)
} }
} }

View file

@ -130,6 +130,11 @@ private[remote] final class ReusableInboundEnvelope extends InboundEnvelope {
this this
} }
def withEnvelopeBuffer(envelopeBuffer: EnvelopeBuffer): InboundEnvelope = {
_envelopeBuffer = envelopeBuffer
this
}
override def toString: String = override def toString: String =
s"InboundEnvelope($recipient, $message, $sender, $originUid, $association)" s"InboundEnvelope($recipient, $message, $sender, $originUid, $association)"
} }

View file

@ -29,15 +29,18 @@ private[remote] class MessageDispatcher(
private val log = Logging.withMarker(system, getClass.getName) private val log = Logging.withMarker(system, getClass.getName)
private val debugLogEnabled = log.isDebugEnabled private val debugLogEnabled = log.isDebugEnabled
def dispatch( def dispatch(inboundEnvelope: InboundEnvelope): Unit = {
recipient: InternalActorRef,
message: AnyRef,
senderOption: OptionVal[ActorRef],
originAddress: OptionVal[Address]): Unit = {
import provider.remoteSettings.Artery._ import provider.remoteSettings.Artery._
import Logging.messageClassName import Logging.messageClassName
val recipient = inboundEnvelope.recipient.get
val message = inboundEnvelope.message
val senderOption = inboundEnvelope.sender
val originAddress = inboundEnvelope.association match {
case OptionVal.Some(a) OptionVal.Some(a.remoteAddress)
case OptionVal.None OptionVal.None
}
val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
val originalReceiver = recipient.path val originalReceiver = recipient.path

View file

@ -1,173 +0,0 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.nio.{ ByteBuffer, ByteOrder }
import akka.actor.ExtendedActorSystem
import akka.serialization.Serialization
import akka.util.ByteString.ByteString1C
import akka.util.{ ByteString, ByteStringBuilder }
/**
* INTERNAL API
*/
private[akka] object MetadataEnvelopeSerializer {
private[akka] val EmptyRendered = {
implicit val _ByteOrder = ByteOrder.LITTLE_ENDIAN
val bsb = new ByteStringBuilder
bsb.putInt(0) // 0 length
bsb.result
}
// key/length of a metadata element are encoded within a single integer:
// supports keys in the range of <0-31>
final val EntryKeyMask = Integer.parseInt("1111 1000 0000 0000 0000 0000 0000 000".replace(" ", ""), 2)
def maskEntryKey(k: Byte): Int = (k.toInt << 26) & EntryKeyMask
def unmaskEntryKey(kv: Int): Byte = ((kv & EntryKeyMask) >> 26).toByte
final val EntryLengthMask = ~EntryKeyMask
def maskEntryLength(k: Int): Int = k & EntryLengthMask
def unmaskEntryLength(kv: Int): Int = kv & EntryLengthMask
def muxEntryKeyLength(k: Byte, l: Int): Int = {
maskEntryKey(k) | maskEntryLength(l)
}
def serialize(metadatas: MetadataMap[ByteString], headerBuilder: HeaderBuilder): Unit = {
if (metadatas.isEmpty) headerBuilder.clearMetadataContainer()
else {
val container = new MetadataMapRendering(metadatas)
headerBuilder.setMetadataContainer(container.render())
}
}
def deserialize(system: ExtendedActorSystem, originUid: Long, serialization: Serialization,
serializer: Int, classManifest: String, envelope: EnvelopeBuffer): AnyRef = {
serialization.deserializeByteBuffer(envelope.byteBuffer, serializer, classManifest)
}
}
/**
* INTERNAL API
*
* The metadata section is stored as ByteString (prefixed with Int length field,
* the same way as any other literal), however the internal structure of it is as follows:
*
* {{{
* Metadata entry:
*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | Key | Metadata entry length |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | ... metadata entry ... |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* }}}
*
*/
private[akka] final class MetadataMapRendering(val metadataMap: MetadataMap[ByteString]) extends AnyVal {
import MetadataEnvelopeSerializer._
def render(): ByteString =
if (metadataMap.isEmpty) {
// usually no-one will want to render an empty metadata section - it should not be there at all
EmptyRendered
} else {
implicit val _ByteOrder = ByteOrder.LITTLE_ENDIAN
// TODO optimise this, we could just count along the way and then prefix with the length
val totalSize = 4 /* length int field */ + metadataMap.usedSlots * 4 /* metadata length */ + metadataMap.foldLeftValues(0)(_ + _.length)
val b = new ByteStringBuilder // TODO could we reuse one?
b.sizeHint(totalSize)
b.putInt(totalSize - 4 /* don't count itself, the length prefix */ )
// TODO: move through and then prepend length
metadataMap.foreach { (key: Byte, value: ByteString)
// TODO try to remove allocation? Iterator otherwise, but that's also allocation
val kl = muxEntryKeyLength(key, value.length)
b.putInt(kl)
value match {
case c: ByteString1C c.appendToBuilder(b) // uses putByteArrayUnsafe
case _ b ++= value
}
}
b.result()
}
}
/** INTERNAL API */
private[akka] object MetadataMapParsing {
import MetadataEnvelopeSerializer._
/** Allocates an MetadataMap */
def parse(envelope: InboundEnvelope): MetadataMapRendering = {
val buf = envelope.envelopeBuffer.byteBuffer
buf.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset)
parseRaw(buf)
}
/**
* INTERNAL API, only for testing
* The buffer MUST be already at the right position where the Metadata container starts.
*/
private[akka] def parseRaw(buf: ByteBuffer) = {
buf.order(ByteOrder.LITTLE_ENDIAN)
val metadataContainerLength = buf.getInt()
val endOfMetadataPos = metadataContainerLength + buf.position()
val map = MetadataMap[ByteString]()
while (buf.position() < endOfMetadataPos) {
val kl = buf.getInt()
val k = unmaskEntryKey(kl) // k
val l = unmaskEntryLength(kl) // l
val arr = Array.ofDim[Byte](l)
buf.get(arr)
val metadata = ByteString1C(arr) // avoids copying again
map.set(k, metadata)
}
new MetadataMapRendering(map)
}
/** Implemented in a way to avoid allocations of any envelopes or arrays */
def applyAllRemoteMessageReceived(instruments: Vector[RemoteInstrument], envelope: InboundEnvelope): Unit = {
val buf = envelope.envelopeBuffer.byteBuffer
buf.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset)
applyAllRemoteMessageReceivedRaw(instruments, envelope, buf)
}
/**
* INTERNAL API, only for testing
* The buffer MUST be already at the right position where the Metadata container starts.
*/
private[akka] def applyAllRemoteMessageReceivedRaw(instruments: Vector[RemoteInstrument], envelope: InboundEnvelope, buf: ByteBuffer): Unit = {
buf.order(ByteOrder.LITTLE_ENDIAN)
val metadataContainerLength = buf.getInt()
val endOfMetadataPos = metadataContainerLength + buf.position()
while (buf.position() < endOfMetadataPos) {
val keyAndLength = buf.getInt()
val key = unmaskEntryKey(keyAndLength)
val length = unmaskEntryLength(keyAndLength)
val arr = Array.ofDim[Byte](length) // TODO can be optimised to re-cycle this array instead
buf.get(arr)
val data = ByteString(arr) // bytes
instruments.find(_.identifier == key) match {
case Some(instr) instr.remoteMessageReceived(envelope.recipient.orNull, envelope.message, envelope.sender.orNull, data)
case _ throw new Exception(s"No RemoteInstrument for id $key available!")
}
}
}
}

View file

@ -5,7 +5,11 @@
package akka.remote.artery package akka.remote.artery
import akka.actor.{ ActorRef, ExtendedActorSystem } import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.util.{ ByteString, OptionVal } import akka.event.{ Logging, LoggingAdapter }
import akka.util.OptionVal
import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.util.control.NonFatal
/** /**
* INTERNAL API * INTERNAL API
@ -16,48 +20,274 @@ import akka.util.{ ByteString, OptionVal }
* Multiple instruments are automatically handled, however they MUST NOT overlap in their idenfitiers. * Multiple instruments are automatically handled, however they MUST NOT overlap in their idenfitiers.
* *
* Instances of `RemoteInstrument` are created from configuration. A new instance of RemoteInstrument * Instances of `RemoteInstrument` are created from configuration. A new instance of RemoteInstrument
* will be created for each encoder and decoder. It's only called from the stage, so if it dosn't * will be created for each encoder and decoder. It's only called from the stage, so if it doesn't
* delegate to any shared instance it doesn't have to be thread-safe. * delegate to any shared instance it doesn't have to be thread-safe.
*/ */
abstract class RemoteInstrument { abstract class RemoteInstrument {
/** /**
* Instrument identifier. * Instrument identifier.
* *
* MUST be >=0 and <32. * MUST be >=1 and <32.
* *
* Values between 0 and 7 are reserved for Akka internal use. * Values between 1 and 7 are reserved for Akka internal use.
*/ */
def identifier: Byte def identifier: Byte
/** /**
* Called right before putting the message onto the wire. * Should the serialization be timed? Otherwise times are always 0.
* Parameters MAY be `null` (except `message`)!
*
* @return `metadata` rendered to be serialized into the remove envelope, or `null` if no metadata should be attached
*/ */
def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString def serializationTimingEnabled: Boolean = false
/** /**
* Called once a message (containing a metadata field designated for this instrument) has been deserialized from the wire. * Called while serializing the message.
* Parameters MAY be `null` (except `message` and `buffer`)!
*/ */
def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit
/**
* Called right before putting the message onto the wire.
* Parameters MAY be `null` (except `message` and `buffer`)!
*
* The `size` is the total serialized size in bytes of the complete message including akka specific headers and any
* `RemoteInstrument` metadata.
* If `serializationTimingEnabled` returns true, then `time` will be the total time it took to serialize all data
* in the message in nanoseconds, otherwise it is 0.
*/
def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit
/**
* Called while deserializing the message once a message (containing a metadata field designated for this instrument) is found.
*/
def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit
/**
* Called when the message has been deserialized.
*
* The `size` is the total serialized size in bytes of the complete message including akka specific headers and any
* `RemoteInstrument` metadata.
* If `serializationTimingEnabled` returns true, then `time` will be the total time it took to deserialize all data
* in the message in nanoseconds, otherwise it is 0.
*/
def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit
} }
object NoopRemoteInstrument extends RemoteInstrument { /**
override def identifier: Byte = * INTERNAL API
-1 *
* The metadata section is stored as raw bytes (prefixed with an Int length field,
* the same way as any other literal), however the internal structure of it is as follows:
*
* {{{
* Metadata entry:
*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | Key | Metadata entry length |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | ... metadata entry ... |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* }}}
*
*/
private[remote] final class RemoteInstruments(
private val system: ExtendedActorSystem,
private val log: LoggingAdapter,
_instruments: Vector[RemoteInstrument]) {
import RemoteInstruments._
override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = def this(system: ExtendedActorSystem, log: LoggingAdapter) = this(system, log, RemoteInstruments.create(system, log))
null def this(system: ExtendedActorSystem) = this(system, Logging.getLogger(system, classOf[RemoteInstruments]))
override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = // keep the remote instruments sorted by identifier to speed up deserialization
() private val instruments: Vector[RemoteInstrument] = _instruments.sortBy(_.identifier)
// does any of the instruments want serialization timing?
private val serializationTimingEnabled = instruments.exists(_.serializationTimingEnabled)
def serialize(outboundEnvelope: OptionVal[OutboundEnvelope], buffer: ByteBuffer): Unit = {
if (instruments.nonEmpty && outboundEnvelope.isDefined) {
val startPos = buffer.position()
val oe = outboundEnvelope.get
try {
buffer.putInt(0)
val dataPos = buffer.position()
var i = 0
while (i < instruments.length) {
val rewindPos = buffer.position()
val instrument = instruments(i)
try {
serializeInstrument(instrument, oe, buffer)
} catch {
case NonFatal(t)
log.debug(
"Skipping serialization of RemoteInstrument {} since it failed with {}",
instrument.identifier, t.getMessage)
buffer.position(rewindPos)
}
i += 1
}
val endPos = buffer.position()
if (endPos == dataPos) {
// no instruments wrote anything so we need to rewind to start
buffer.position(startPos)
} else {
// some instruments wrote data, so write the total length
buffer.putInt(startPos, endPos - dataPos)
}
} catch {
case NonFatal(t)
log.debug("Skipping serialization of all RemoteInstruments due to unhandled failure {}", t)
buffer.position(startPos)
}
}
}
private def serializeInstrument(instrument: RemoteInstrument, outboundEnvelope: OutboundEnvelope, buffer: ByteBuffer): Unit = {
val startPos = buffer.position()
buffer.putInt(0)
val dataPos = buffer.position()
instrument.remoteWriteMetadata(outboundEnvelope.recipient.orNull, outboundEnvelope.message, outboundEnvelope.sender.orNull, buffer)
val endPos = buffer.position()
if (endPos == dataPos) {
// if the instrument didn't write anything, then rewind to the start
buffer.position(startPos)
} else {
// the instrument wrote something so we need to write the identifier and length
buffer.putInt(startPos, combineKeyLength(instrument.identifier, endPos - dataPos))
}
}
def deserialize(inboundEnvelope: InboundEnvelope): Unit = {
if (inboundEnvelope.flag(EnvelopeBuffer.MetadataPresentFlag)) {
inboundEnvelope.envelopeBuffer.byteBuffer.position(EnvelopeBuffer.MetadataContainerAndLiteralSectionOffset)
deserializeRaw(inboundEnvelope)
}
}
def deserializeRaw(inboundEnvelope: InboundEnvelope): Unit = {
val buffer = inboundEnvelope.envelopeBuffer.byteBuffer
val length = buffer.getInt
val endPos = buffer.position() + length
try {
if (instruments.nonEmpty) {
var i = 0
while (i < instruments.length && buffer.position() < endPos) {
val instrument = instruments(i)
val startPos = buffer.position()
val keyAndLength = buffer.getInt
val dataPos = buffer.position()
val key = getKey(keyAndLength)
val length = getLength(keyAndLength)
var nextPos = dataPos + length
val identifier = instrument.identifier
if (key == identifier) {
try {
deserializeInstrument(instrument, inboundEnvelope, buffer)
} catch {
case NonFatal(t)
log.debug(
"Skipping deserialization of RemoteInstrument {} since it failed with {}",
instrument.identifier, t.getMessage)
}
i += 1
} else if (key > identifier) {
// since instruments are sorted on both sides skip this local one and retry the serialized one
log.debug("Skipping local RemoteInstrument {} that has no matching data in the message", identifier)
nextPos = startPos
i += 1
} else {
// since instruments are sorted on both sides skip the serialized one and retry the local one
log.debug("Skipping serialized data in message for RemoteInstrument {} that has no local match", key)
}
buffer.position(nextPos)
}
} else {
if (log.isDebugEnabled) log.debug(
"Skipping serialized data in message for RemoteInstrument(s) {} that has no local match",
remoteInstrumentIdIteratorRaw(buffer, endPos).mkString("[", ", ", "]"))
}
} catch {
case NonFatal(t)
log.debug("Skipping further deserialization of remaining RemoteInstruments due to unhandled failure {}", t)
} finally {
buffer.position(endPos)
}
}
private def deserializeInstrument(instrument: RemoteInstrument, inboundEnvelope: InboundEnvelope, buffer: ByteBuffer): Unit = {
instrument.remoteReadMetadata(inboundEnvelope.recipient.orNull, inboundEnvelope.message, inboundEnvelope.sender.orNull, buffer)
}
def messageSent(outboundEnvelope: OutboundEnvelope, size: Int, time: Long): Unit = {
@tailrec def messageSent(pos: Int): Unit = {
if (pos < instruments.length) {
val instrument = instruments(pos)
try {
messageSentInstrument(instrument, outboundEnvelope, size, time)
} catch {
case NonFatal(t)
log.debug("Message sent in RemoteInstrument {} failed with {}", instrument.identifier, t.getMessage)
}
messageSent(pos + 1)
}
}
messageSent(0)
}
private def messageSentInstrument(instrument: RemoteInstrument, outboundEnvelope: OutboundEnvelope, size: Int, time: Long): Unit = {
instrument.remoteMessageSent(outboundEnvelope.recipient.orNull, outboundEnvelope.message, outboundEnvelope.sender.orNull, size, time)
}
def messageReceived(inboundEnvelope: InboundEnvelope, size: Int, time: Long): Unit = {
@tailrec def messageRecieved(pos: Int): Unit = {
if (pos < instruments.length) {
val instrument = instruments(pos)
try {
messageReceivedInstrument(instrument, inboundEnvelope, size, time)
} catch {
case NonFatal(t)
log.debug("Message received in RemoteInstrument {} failed with {}", instrument.identifier, t.getMessage)
}
messageRecieved(pos + 1)
}
}
messageRecieved(0)
}
private def messageReceivedInstrument(instrument: RemoteInstrument, inboundEnvelope: InboundEnvelope, size: Int, time: Long): Unit = {
instrument.remoteMessageReceived(inboundEnvelope.recipient.orNull, inboundEnvelope.message, inboundEnvelope.sender.orNull, size, time)
}
private def remoteInstrumentIdIteratorRaw(buffer: ByteBuffer, endPos: Int): Iterator[Int] = {
new Iterator[Int] {
override def hasNext: Boolean = buffer.position() < endPos
override def next(): Int = {
val keyAndLength = buffer.getInt
buffer.position(buffer.position() + getLength(keyAndLength))
getKey(keyAndLength)
}
}
}
def isEmpty: Boolean = instruments.isEmpty
def nonEmpty: Boolean = instruments.nonEmpty
def timeSerialization = serializationTimingEnabled
} }
/** INTERNAL API */ /** INTERNAL API */
private[remote] object RemoteInstruments { private[remote] object RemoteInstruments {
def create(system: ExtendedActorSystem): Vector[RemoteInstrument] = {
def apply(system: ExtendedActorSystem): RemoteInstruments =
new RemoteInstruments(system)
// key/length of a metadata element are encoded within a single integer:
// supports keys in the range of <0-31>
private final val lengthMask: Int = ~(31 << 26)
def combineKeyLength(k: Byte, l: Int): Int = (k.toInt << 26) | (l & lengthMask)
def getKey(kl: Int): Byte = (kl >>> 26).toByte
def getLength(kl: Int): Int = kl & lengthMask
def create(system: ExtendedActorSystem, log: LoggingAdapter): Vector[RemoteInstrument] = {
val c = system.settings.config val c = system.settings.config
val path = "akka.remote.artery.advanced.instruments" val path = "akka.remote.artery.advanced.instruments"
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -69,124 +299,3 @@ private[remote] object RemoteInstruments {
}(collection.breakOut) }(collection.breakOut)
} }
} }
/**
* INTERNAL API
*
* This datastructure is specialized for addressing directly into a known IDs slot.
* It is used when deserializing/serializing metadata and we know the ID we want to reach into.
*
* Mutable & NOT thread-safe.
*
* Fixed-size: 32-slots array-backed Map-like structure.
* Lazy: The backing array is allocated lazily only when needed, thus we pay no cost for the metadata array if
* the system is not using metadata.
* Life-cycle: Owned and linked to the lifecycle of an [[OutboundEnvelope]].
* Re-cycled: Aimed to be re-cycled to produce the least possible GC-churn, by calling `clear()` when done with it.
*
* Reserved keys: The keys 07 are reserved for Akka internal purposes and future extensions.
*/
private[remote] final class MetadataMap[T >: Null] {
val capacity = 32
protected var backing: Array[T] = null // TODO re-think if a plain LinkedList wouldn't be fine here?
private var _usedSlots = 0
def usedSlots = _usedSlots
def apply(i: Int): OptionVal[T] =
if (backing == null) OptionVal.None
else OptionVal[T](backing(i))
def isEmpty = usedSlots == 0
def nonEmpty = !isEmpty
def hasValueFor(i: Int) = nonEmpty && backing(i) != null
// FIXME too specialized...
def foldLeftValues[A](zero: A)(f: (A, T) A): A = {
var acc: A = zero
var hit = 0
var i = 0
while (i < capacity && hit < _usedSlots) {
val it = backing(i)
if (it != null) {
acc = f(acc, it)
hit += 1
}
i += 1
}
acc
}
/** Heavy operation, only used for error logging */
def keysWithValues: List[Int] = backing.zipWithIndex.filter({ case (t, id) t != null }).map(_._2).toList
def foreach(f: (Byte, T) Unit) = {
var i = 0
var hit = 0
while (i < capacity && hit < _usedSlots) {
val t = backing(i)
if (t != null) {
f(i.toByte, t)
hit += 1
}
i += 1
}
}
private def allocateIfNeeded(): Unit =
if (backing == null) backing = Array.ofDim[Object](capacity).asInstanceOf[Array[T]]
/**
* Set a value at given index.
* Setting a null value removes the entry (the slot is marked as not used).
*/
def set(i: Int, t: T): Unit =
if (t == null) {
if (backing == null) ()
else {
if (backing(i) != null) _usedSlots -= 1 // we're clearing a spot
backing(i) = null.asInstanceOf[T]
}
} else {
allocateIfNeeded()
if (backing(i) == null) {
// was empty previously
_usedSlots += 1
} else {
// replacing previous value, usedSlots remains unchanged
}
backing(i) = t
}
/**
* If the backing array was already allocated clears it, if it wasn't does nothing (no-op).
* This is so in order to pay no-cost when not using metadata - clearing then is instant.
*/
def clear() =
if (isEmpty) ()
else {
var i = 0
while (i < capacity) {
backing(i) = null.asInstanceOf[T]
i += 1
}
_usedSlots = 0
}
override def toString() =
if (backing == null) s"MetadataMap(<empty>)"
else s"MetadataMap(${backing.toList.mkString("[", ",", "]")})"
}
/**
* INTERNAL API
*/
private[remote] object MetadataMap {
def apply[T >: Null]() = new MetadataMap[T]
}

View file

@ -4,30 +4,23 @@
package akka.remote.artery package akka.remote.artery
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.actor._
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.remote.artery.MetadataCarryingSpy.{ RemoteMessageReceived, RemoteMessageSent }
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.testkit.SocketUtil._
import akka.testkit.TestActors import akka.testkit.TestActors
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.util.ByteString import akka.util.ByteString
import java.nio.{ ByteBuffer, CharBuffer }
import java.nio.charset.Charset
object MetadataCarryingSpy extends ExtensionId[MetadataCarryingSpy] with ExtensionIdProvider { object MetadataCarryingSpy extends ExtensionId[MetadataCarryingSpy] with ExtensionIdProvider {
override def get(system: ActorSystem): MetadataCarryingSpy = super.get(system) override def get(system: ActorSystem): MetadataCarryingSpy = super.get(system)
override def lookup = MetadataCarryingSpy override def lookup = MetadataCarryingSpy
override def createExtension(system: ExtendedActorSystem): MetadataCarryingSpy = new MetadataCarryingSpy override def createExtension(system: ExtendedActorSystem): MetadataCarryingSpy = new MetadataCarryingSpy
final case class RemoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef) final case class RemoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long)
final case class RemoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString) final case class RemoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long)
final case class RemoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef)
final case class RemoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, metadata: String)
} }
class MetadataCarryingSpy extends Extension { class MetadataCarryingSpy extends Extension {
@ -37,29 +30,66 @@ class MetadataCarryingSpy extends Extension {
} }
class TestInstrument(system: ExtendedActorSystem) extends RemoteInstrument { class TestInstrument(system: ExtendedActorSystem) extends RemoteInstrument {
import akka.remote.artery.MetadataCarryingSpy._
private val charset = Charset.forName("UTF-8")
private val encoder = charset.newEncoder()
private val decoder = charset.newDecoder()
override val identifier: Byte = 1 override val identifier: Byte = 1
override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = override def serializationTimingEnabled: Boolean = true
override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit =
message match { message match {
case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _)
val metadata = ByteString("!!!") val metadata = "!!!"
MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageSent(recipient, message, sender)) buffer.putInt(metadata.length)
metadata // this data will be attached to the remote message encoder.encode(CharBuffer.wrap(metadata), buffer, true)
encoder.flush(buffer)
encoder.reset()
MetadataCarryingSpy(system).ref.foreach(_ ! RemoteWriteMetadata(recipient, message, sender))
case _ case _
null
} }
override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit = override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit =
message match { message match {
case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _) case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _)
MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageReceived(recipient, message, sender, metadata)) val size = buffer.getInt
val charBuffer = CharBuffer.allocate(size)
decoder.decode(buffer, charBuffer, false)
decoder.reset()
charBuffer.flip()
val metadata = charBuffer.toString
MetadataCarryingSpy(system).ref.foreach(_ ! RemoteReadMetadata(recipient, message, sender, metadata))
case _
}
override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit =
message match {
case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _)
MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageSent(recipient, message, sender, size, time))
case _
}
override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit =
message match {
case _: MetadataCarryingSpec.Ping | ActorSelectionMessage(_: MetadataCarryingSpec.Ping, _, _)
MetadataCarryingSpy(system).ref.foreach(_ ! RemoteMessageReceived(recipient, message, sender, size, time))
case _ case _
} }
} }
object MetadataCarryingSpec { object MetadataCarryingSpec {
final case class Ping(payload: ByteString = ByteString.empty) final case class Ping(payload: ByteString = ByteString.empty)
class ProxyActor(local: ActorRef, remotePath: ActorPath) extends Actor {
val remote = context.system.actorSelection(remotePath)
override def receive = {
case message if sender() == local remote ! message
case message local ! message
}
}
} }
class MetadataCarryingSpec extends ArteryMultiNodeSpec( class MetadataCarryingSpec extends ArteryMultiNodeSpec(
@ -72,6 +102,7 @@ class MetadataCarryingSpec extends ArteryMultiNodeSpec(
""") with ImplicitSender { """) with ImplicitSender {
import MetadataCarryingSpec._ import MetadataCarryingSpec._
import MetadataCarryingSpy._
"Metadata" should { "Metadata" should {
@ -85,17 +116,30 @@ class MetadataCarryingSpec extends ArteryMultiNodeSpec(
MetadataCarryingSpy(systemB).setProbe(instrumentProbeB.ref) MetadataCarryingSpy(systemB).setProbe(instrumentProbeB.ref)
systemB.actorOf(TestActors.echoActorProps, "reply") systemB.actorOf(TestActors.echoActorProps, "reply")
systemA.actorSelection(rootActorPath(systemB) / "user" / "reply") ! Ping() val proxyA = systemA.actorOf(Props(classOf[ProxyActor], testActor, rootActorPath(systemB) / "user" / "reply"))
proxyA ! Ping()
expectMsgType[Ping] expectMsgType[Ping]
val writeA = instrumentProbeA.expectMsgType[RemoteWriteMetadata]
val sentA = instrumentProbeA.expectMsgType[RemoteMessageSent] val sentA = instrumentProbeA.expectMsgType[RemoteMessageSent]
val readB = instrumentProbeB.expectMsgType[RemoteReadMetadata]
val recvdB = instrumentProbeB.expectMsgType[RemoteMessageReceived] val recvdB = instrumentProbeB.expectMsgType[RemoteMessageReceived]
recvdB.metadata should ===(ByteString("!!!")) readB.metadata should ===("!!!")
sentA.size should be > 0
sentA.time should be > 0L
recvdB.size should ===(sentA.size)
recvdB.time should be > 0L
// for the reply // for the reply
val writeB = instrumentProbeB.expectMsgType[RemoteWriteMetadata]
val sentB = instrumentProbeB.expectMsgType[RemoteMessageSent] val sentB = instrumentProbeB.expectMsgType[RemoteMessageSent]
val readA = instrumentProbeA.expectMsgType[RemoteReadMetadata]
val recvdA = instrumentProbeA.expectMsgType[RemoteMessageReceived] val recvdA = instrumentProbeA.expectMsgType[RemoteMessageReceived]
recvdA.metadata should ===(ByteString("!!!")) readA.metadata should ===("!!!")
sentB.size should be > 0
sentB.time should be > 0L
recvdA.size should ===(sentB.size)
recvdA.time should be > 0L
} }
} }

View file

@ -1,65 +0,0 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.actor.ActorRef
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.util.ByteString
import scala.concurrent.duration._
class MetadataContainerSpec extends AkkaSpec {
"MetadataContainer" should {
"parse, given empty map" in {
val map = new MetadataMap[ByteString]
val container = new MetadataMapRendering(map)
val rendered = container.render()
val back = MetadataMapParsing.parseRaw(rendered.asByteBuffer)
map.toString() should ===(back.metadataMap.toString())
}
"parse, given 1 allocated in map" in {
val map = new MetadataMap[ByteString]
val container = new MetadataMapRendering(map)
map.set(1, ByteString("!!!"))
val rendered = container.render()
val back = MetadataMapParsing.parseRaw(rendered.asByteBuffer)
map.toString() should ===(back.metadataMap.toString())
}
"apply, given 3 allocated in map" in {
val map = new MetadataMap[ByteString]
val container = new MetadataMapRendering(map)
map.set(1, ByteString("!!!"))
map.set(10, ByteString("??????"))
map.set(31, ByteString("........."))
val p = TestProbe()
def testInstrument(id: Int): RemoteInstrument = {
new RemoteInstrument {
override def identifier: Byte = id.toByte
override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef): ByteString = ???
override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, metadata: ByteString): Unit =
p.ref ! s"${identifier}-${metadata.utf8String}"
}
}
val instruments = Vector(
testInstrument(1), testInstrument(31), testInstrument(10)
)
val rendered = container.render()
val mockEnvelope = new ReusableInboundEnvelope
MetadataMapParsing.applyAllRemoteMessageReceivedRaw(instruments, mockEnvelope, rendered.asByteBuffer)
p.expectMsgAllOf("1-!!!", "10-??????", "31-.........")
p.expectNoMsg(100.millis)
}
}
}

View file

@ -1,47 +0,0 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.util.OptionVal
import org.scalatest.{ Matchers, WordSpec }
class MetadataMapSpec extends WordSpec with Matchers {
"MetadataMap" must {
"hasValueFor" in {
val a = MetadataMap[String]()
a.hasValueFor(0) should ===(false)
a.set(0, "0")
a.hasValueFor(0) should ===(true)
a.hasValueFor(1) should ===(false)
a.clear()
a.isEmpty should ===(true)
a.nonEmpty should ===(false)
a.hasValueFor(12) should ===(false)
a.hasValueFor(0) should ===(false)
a.set(0, "0")
a.hasValueFor(0) should ===(true)
}
"setting values" in {
val a = MetadataMap[String]()
a(0) should ===(OptionVal.None)
a.usedSlots should ===(0)
a.set(0, "0")
a(0) should ===(OptionVal.Some("0"))
a.usedSlots should ===(1)
a.set(0, "1")
a(0) should ===(OptionVal.Some("1"))
a.usedSlots should ===(1)
a.set(0, null)
a(0) should ===(OptionVal.None)
a.usedSlots should ===(0)
}
}
}

View file

@ -0,0 +1,192 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.actor.{ ActorRef, ActorSystem, ExtendedActorSystem, InternalActorRef }
import akka.event._
import akka.testkit.TestEvent.Mute
import akka.testkit.{ AkkaSpec, EventFilter, TestEvent, TestProbe }
import akka.util.OptionVal
import java.nio.{ ByteBuffer, CharBuffer }
import java.nio.charset.Charset
import scala.concurrent.duration._
class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG") {
import RemoteInstrumentsSerializationSpec._
def remoteInstruments(instruments: RemoteInstrument*): RemoteInstruments = {
val vec = Vector(instruments: _*)
new RemoteInstruments(system.asInstanceOf[ExtendedActorSystem], system.log, vec)
}
def ensureDebugLog[T](messages: String*)(f: T): T = {
if (messages.isEmpty)
f
else
EventFilter.debug(message = messages.head, occurrences = 1) intercept {
ensureDebugLog(messages.tail: _*)(f)
}
}
"RemoteInstruments" should {
"not write anything in the buffer if not deserializing" in {
val buffer = ByteBuffer.allocate(1024)
serialize(remoteInstruments(), buffer)
buffer.position() should be(0)
}
"serialize and deserialize a single remote instrument" in {
val p = TestProbe()
val ri = remoteInstruments(testInstrument(1, "!"))
serializeDeserialize(ri, ri, p.ref, "foo")
p.expectMsgAllOf("foo-1-!")
p.expectNoMsg(100.millis)
}
"serialize and deserialize multiple remote instruments in the correct order" in {
val p = TestProbe()
val ri = remoteInstruments(testInstrument(1, "!"), testInstrument(31, "???"), testInstrument(10, ".."))
serializeDeserialize(ri, ri, p.ref, "bar")
p.expectMsgAllOf("bar-1-!", "bar-10-..", "bar-31-???")
p.expectNoMsg(100.millis)
}
"skip exitsing remote instruments not in the message" in {
ensureDebugLog(
"Skipping local RemoteInstrument 10 that has no matching data in the message") {
val p = TestProbe()
val instruments = Seq(testInstrument(7, "!"), testInstrument(10, ".."), testInstrument(21, "???"))
val riS = remoteInstruments(instruments(0), instruments(2))
val riD = remoteInstruments(instruments: _*)
serializeDeserialize(riS, riD, p.ref, "baz")
p.expectMsgAllOf("baz-7-!", "baz-21-???")
p.expectNoMsg(100.millis)
}
}
"skip remote instruments in the message that are not existing" in {
ensureDebugLog(
"Skipping serialized data in message for RemoteInstrument 11 that has no local match") {
val p = TestProbe()
val instruments = Seq(testInstrument(6, "!"), testInstrument(11, ".."), testInstrument(19, "???"))
val riS = remoteInstruments(instruments: _*)
val riD = remoteInstruments(instruments(0), instruments(2))
serializeDeserialize(riS, riD, p.ref, "buz")
p.expectMsgAllOf("buz-6-!", "buz-19-???")
p.expectNoMsg(100.millis)
}
}
"skip all remote instruments in the message if none are existing" in {
ensureDebugLog(
"Skipping serialized data in message for RemoteInstrument(s) [1, 10, 31] that has no local match") {
val p = TestProbe()
val instruments = Seq(testInstrument(1, "!"), testInstrument(10, ".."), testInstrument(31, "???"))
val riS = remoteInstruments(instruments: _*)
val riD = remoteInstruments()
serializeDeserialize(riS, riD, p.ref, "boz")
p.expectNoMsg(100.millis)
}
}
"skip serializing remote instrument that fails" in {
ensureDebugLog(
"Skipping serialization of RemoteInstrument 7 since it failed with boom",
"Skipping local RemoteInstrument 7 that has no matching data in the message") {
val p = TestProbe()
val instruments = Seq(
testInstrument(7, "!", sentThrowable = boom), testInstrument(10, ".."), testInstrument(21, "???"))
val ri = remoteInstruments(instruments: _*)
serializeDeserialize(ri, ri, p.ref, "woot")
p.expectMsgAllOf("woot-10-..", "woot-21-???")
p.expectNoMsg(100.millis)
}
}
"skip deserializing remote instrument that fails" in {
ensureDebugLog(
"Skipping deserialization of RemoteInstrument 7 since it failed with boom",
"Skipping deserialization of RemoteInstrument 21 since it failed with boom") {
val p = TestProbe()
val instruments = Seq(
testInstrument(7, "!", receiveThrowable = boom), testInstrument(10, ".."),
testInstrument(21, "???", receiveThrowable = boom))
val ri = remoteInstruments(instruments: _*)
serializeDeserialize(ri, ri, p.ref, "waat")
p.expectMsgAllOf("waat-10-..")
p.expectNoMsg(100.millis)
}
}
}
}
object RemoteInstrumentsSerializationSpec {
class Filter(settings: ActorSystem.Settings, stream: EventStream) extends LoggingFilter {
stream.publish(Mute(EventFilter.debug()))
override def isErrorEnabled(logClass: Class[_], logSource: String): Boolean = true
override def isWarningEnabled(logClass: Class[_], logSource: String): Boolean = true
override def isInfoEnabled(logClass: Class[_], logSource: String): Boolean = true
override def isDebugEnabled(logClass: Class[_], logSource: String): Boolean = logSource == "DebugSource"
}
def testInstrument(id: Int, metadata: String, sentThrowable: Throwable = null, receiveThrowable: Throwable = null): RemoteInstrument = {
new RemoteInstrument {
private val charset = Charset.forName("UTF-8")
private val encoder = charset.newEncoder()
private val decoder = charset.newDecoder()
override def identifier: Byte = id.toByte
override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = {
buffer.putInt(metadata.length)
if (sentThrowable ne null) throw sentThrowable
encoder.encode(CharBuffer.wrap(metadata), buffer, true)
encoder.flush(buffer)
encoder.reset()
}
override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = {
val size = buffer.getInt
if (receiveThrowable ne null) throw receiveThrowable
val charBuffer = CharBuffer.allocate(size)
decoder.decode(buffer, charBuffer, false)
decoder.reset()
charBuffer.flip()
val string = charBuffer.toString
recipient ! s"$message-$identifier-$string"
}
override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = ()
override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = ()
}
}
def serialize(ri: RemoteInstruments, buffer: ByteBuffer): Unit = {
val mockOutbound = new ReusableOutboundEnvelope()
ri.serialize(OptionVal(mockOutbound), buffer)
}
def deserialize(ri: RemoteInstruments, buffer: ByteBuffer, recipient: ActorRef, message: AnyRef): Unit = {
val r = recipient.asInstanceOf[InternalActorRef]
val envelopeBuffer = new EnvelopeBuffer(buffer)
val mockInbound =
new ReusableInboundEnvelope().withEnvelopeBuffer(envelopeBuffer).withRecipient(r).withMessage(message)
ri.deserializeRaw(mockInbound)
}
def serializeDeserialize(riS: RemoteInstruments, riD: RemoteInstruments, recipient: ActorRef, message: AnyRef): Unit = {
val buffer = ByteBuffer.allocate(1024)
serialize(riS, buffer)
buffer.flip()
deserialize(riD, buffer, recipient, message)
}
val boom = new IllegalArgumentException("boom")
}

View file

@ -7,7 +7,7 @@ import org.scalacheck.{ Arbitrary, Gen }
import org.scalatest.prop.Checkers import org.scalatest.prop.Checkers
import org.scalatest.{ Matchers, WordSpec } import org.scalatest.{ Matchers, WordSpec }
class MetaMetadataSerializerSpec extends WordSpec with Matchers with Checkers { class RemoteInstrumentsSpec extends WordSpec with Matchers with Checkers {
case class KeyLen(k: Key, l: Len) { case class KeyLen(k: Key, l: Len) {
override def toString = s" key = ${k}, len = ${l}" override def toString = s" key = ${k}, len = ${l}"
@ -22,48 +22,50 @@ class MetaMetadataSerializerSpec extends WordSpec with Matchers with Checkers {
} yield KeyLen(key, len) } yield KeyLen(key, len)
} }
"MetaMetadataSerializer" must { "RemoteInstruments" must {
"perform roundtrip masking/unmasking of entry key+length" in { "combine and decompose single key and length" in {
val key: Byte = 17 val key: Byte = 17
val len = 812 val len = 812
val kl = MetadataEnvelopeSerializer.muxEntryKeyLength(key, len) val kl = RemoteInstruments.combineKeyLength(key, len)
val key2 = MetadataEnvelopeSerializer.unmaskEntryKey(kl) val key2 = RemoteInstruments.getKey(kl)
key2 should ===(key) key2 should ===(key)
val len2 = MetadataEnvelopeSerializer.unmaskEntryLength(kl) val len2 = RemoteInstruments.getLength(kl)
len2 should ===(len) len2 should ===(len)
} }
"perform key roundtrip using mask/unmask" in { "combine and decompose key with 0 multiple times" in {
check { (kl: KeyLen) check { (kl: KeyLen)
val k = kl.k val k = kl.k
val masked = MetadataEnvelopeSerializer.maskEntryKey(k) val masked = RemoteInstruments.combineKeyLength(k, 0)
val uk = MetadataEnvelopeSerializer.unmaskEntryKey(masked) val uk = RemoteInstruments.getKey(masked)
uk should ===(k) uk should ===(k)
uk == k uk == k
} }
} }
"perform length roundtrip using mask/unmask" in {
"combine and decompose length with 0 multiple times" in {
check { (kl: KeyLen) check { (kl: KeyLen)
val l = kl.l val l = kl.l
val masked = MetadataEnvelopeSerializer.maskEntryLength(l) val masked = RemoteInstruments.combineKeyLength(0, l)
val ul = MetadataEnvelopeSerializer.unmaskEntryLength(masked) val ul = RemoteInstruments.getLength(masked)
ul should ===(l) ul should ===(l)
ul == l ul == l
} }
} }
"perform muxed roundtrip using mask/unmask" in {
"combine and decompose key and length multiple times" in {
check { (kl: KeyLen) check { (kl: KeyLen)
val k = kl.k val k = kl.k
val l = kl.l val l = kl.l
val masked = MetadataEnvelopeSerializer.muxEntryKeyLength(k, l) val masked = RemoteInstruments.combineKeyLength(k, l)
val uk = MetadataEnvelopeSerializer.unmaskEntryKey(masked) val uk = RemoteInstruments.getKey(masked)
uk should ===(k) uk should ===(k)
val ul = MetadataEnvelopeSerializer.unmaskEntryLength(masked) val ul = RemoteInstruments.getLength(masked)
ul should ===(l) ul should ===(l)
ul == l && uk == k ul == l && uk == k
} }