+art #20455 HeavyHitters, CountMinSketch => ActorRef Compression
* +art #20455 HeavyHitters and CountMinSketch prepared for Compression * +art #20455 compression tables and integration specs
This commit is contained in:
parent
d0adc01a7c
commit
e818887bb2
44 changed files with 2124 additions and 314 deletions
|
|
@ -1,17 +1,19 @@
|
|||
|
||||
/**
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ ActorRef, InternalActorRef }
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor._
|
||||
import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress }
|
||||
import akka.remote.EndpointManager.Send
|
||||
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
|
||||
import akka.serialization.{ Serialization, SerializationExtension }
|
||||
import akka.stream._
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
||||
import akka.util.OptionVal
|
||||
import akka.util.{ ByteString, OptionVal }
|
||||
import akka.actor.EmptyLocalActorRef
|
||||
import akka.stream.stage.TimerGraphStageLogic
|
||||
|
||||
|
|
@ -21,7 +23,7 @@ import akka.stream.stage.TimerGraphStageLogic
|
|||
private[remote] class Encoder(
|
||||
uniqueLocalAddress: UniqueAddress,
|
||||
system: ActorSystem,
|
||||
compressionTable: LiteralCompressionTable,
|
||||
compression: OutboundCompression,
|
||||
bufferPool: EnvelopeBufferPool)
|
||||
extends GraphStage[FlowShape[Send, EnvelopeBuffer]] {
|
||||
|
||||
|
|
@ -32,58 +34,35 @@ private[remote] class Encoder(
|
|||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
||||
|
||||
private val headerBuilder = HeaderBuilder(compressionTable)
|
||||
private val headerBuilder = HeaderBuilder.out(compression)
|
||||
headerBuilder.version = ArteryTransport.Version
|
||||
headerBuilder.uid = uniqueLocalAddress.uid
|
||||
private val localAddress = uniqueLocalAddress.address
|
||||
private val serialization = SerializationExtension(system)
|
||||
private val serializationInfo = Serialization.Information(localAddress, system)
|
||||
|
||||
private val senderCache = new java.util.HashMap[ActorRef, String]
|
||||
private var recipientCache = new java.util.HashMap[ActorRef, String]
|
||||
|
||||
override protected def logSource = classOf[Encoder]
|
||||
|
||||
override def onPush(): Unit = {
|
||||
val send = grab(in)
|
||||
val envelope = bufferPool.acquire()
|
||||
|
||||
val recipientStr = recipientCache.get(send.recipient) match {
|
||||
case null ⇒
|
||||
val s = send.recipient.path.toSerializationFormat
|
||||
// FIXME this cache will be replaced by compression table
|
||||
if (recipientCache.size() >= 1000)
|
||||
recipientCache.clear()
|
||||
recipientCache.put(send.recipient, s)
|
||||
s
|
||||
case s ⇒ s
|
||||
}
|
||||
headerBuilder.recipientActorRef = recipientStr
|
||||
|
||||
send.senderOption match {
|
||||
case OptionVal.None ⇒ headerBuilder.setNoSender()
|
||||
case OptionVal.Some(sender) ⇒
|
||||
val senderStr = senderCache.get(sender) match {
|
||||
case null ⇒
|
||||
val s = sender.path.toSerializationFormatWithAddress(localAddress)
|
||||
// FIXME we might need an efficient LRU cache, or replaced by compression table
|
||||
if (senderCache.size() >= 1000)
|
||||
senderCache.clear()
|
||||
senderCache.put(sender, s)
|
||||
s
|
||||
case s ⇒ s
|
||||
}
|
||||
headerBuilder.senderActorRef = senderStr
|
||||
}
|
||||
// internally compression is applied by the builder:
|
||||
headerBuilder.recipientActorRef = send.recipient
|
||||
|
||||
try {
|
||||
// avoiding currentTransportInformation.withValue due to thunk allocation
|
||||
val oldValue = Serialization.currentTransportInformation.value
|
||||
try {
|
||||
Serialization.currentTransportInformation.value = serializationInfo
|
||||
|
||||
send.senderOption match {
|
||||
case OptionVal.None ⇒ headerBuilder.setNoSender()
|
||||
case OptionVal.Some(s) ⇒ headerBuilder.senderActorRef = s
|
||||
}
|
||||
|
||||
MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope)
|
||||
} finally
|
||||
Serialization.currentTransportInformation.value = oldValue
|
||||
} finally Serialization.currentTransportInformation.value = oldValue
|
||||
|
||||
envelope.byteBuffer.flip()
|
||||
push(out, envelope)
|
||||
|
|
@ -130,7 +109,7 @@ private[remote] class Decoder(
|
|||
inboundContext: InboundContext,
|
||||
system: ExtendedActorSystem,
|
||||
resolveActorRefWithLocalAddress: String ⇒ InternalActorRef,
|
||||
compressionTable: LiteralCompressionTable,
|
||||
compression: InboundCompression,
|
||||
bufferPool: EnvelopeBufferPool,
|
||||
inEnvelopePool: ObjectPool[InboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
|
||||
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
|
||||
|
|
@ -141,12 +120,9 @@ private[remote] class Decoder(
|
|||
new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
||||
import Decoder.RetryResolveRemoteDeployedRecipient
|
||||
private val localAddress = inboundContext.localAddress.address
|
||||
private val headerBuilder = HeaderBuilder(compressionTable)
|
||||
private val headerBuilder = HeaderBuilder.in(compression)
|
||||
private val serialization = SerializationExtension(system)
|
||||
|
||||
private val recipientCache = new java.util.HashMap[String, InternalActorRef]
|
||||
private val senderCache = new java.util.HashMap[String, ActorRef]
|
||||
|
||||
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
|
||||
private val retryResolveRemoteDeployedRecipientAttempts = 20
|
||||
|
||||
|
|
@ -156,35 +132,32 @@ private[remote] class Decoder(
|
|||
val envelope = grab(in)
|
||||
envelope.parseHeader(headerBuilder)
|
||||
|
||||
// FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances
|
||||
// in case of compression is enabled
|
||||
// FIXME: Is localAddress really needed?
|
||||
|
||||
val sender =
|
||||
if (headerBuilder.isNoSender)
|
||||
OptionVal.None
|
||||
else {
|
||||
senderCache.get(headerBuilder.senderActorRef) match {
|
||||
case null ⇒
|
||||
val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef)
|
||||
// FIXME this cache will be replaced by compression table
|
||||
if (senderCache.size() >= 1000)
|
||||
senderCache.clear()
|
||||
senderCache.put(headerBuilder.senderActorRef, ref)
|
||||
OptionVal(ref)
|
||||
case ref ⇒ OptionVal(ref)
|
||||
}
|
||||
}
|
||||
|
||||
val recipient =
|
||||
if (headerBuilder.isNoRecipient)
|
||||
OptionVal.None
|
||||
else
|
||||
resolveRecipient(headerBuilder.recipientActorRef)
|
||||
|
||||
val originUid = headerBuilder.uid
|
||||
val association = inboundContext.association(originUid)
|
||||
|
||||
val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef match {
|
||||
case OptionVal.Some(ref) ⇒ OptionVal(ref.asInstanceOf[InternalActorRef])
|
||||
case OptionVal.None ⇒ resolveRecipient(headerBuilder.recipientActorRefPath)
|
||||
}
|
||||
|
||||
val sender: InternalActorRef = headerBuilder.senderActorRef match {
|
||||
case OptionVal.Some(ref) ⇒ ref.asInstanceOf[InternalActorRef]
|
||||
case OptionVal.None ⇒ resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath)
|
||||
}
|
||||
|
||||
// --- hit refs and manifests for heavy-hitter counting
|
||||
association match {
|
||||
case OptionVal.Some(assoc) ⇒
|
||||
val remoteAddress = assoc.remoteAddress
|
||||
compression.hitActorRef(remoteAddress, sender)
|
||||
if (recipient.isDefined) compression.hitActorRef(remoteAddress, recipient.get)
|
||||
compression.hitClassManifest(remoteAddress, headerBuilder.manifest)
|
||||
case _ ⇒
|
||||
// we don't want to record hits for compression while handshake is still in progress.
|
||||
log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
|
||||
}
|
||||
// --- end of hit refs and manifests for heavy-hitter counting
|
||||
|
||||
try {
|
||||
val deserializedMessage = MessageSerializer.deserializeForArtery(
|
||||
system, serialization, headerBuilder, envelope)
|
||||
|
|
@ -194,7 +167,7 @@ private[remote] class Decoder(
|
|||
recipient,
|
||||
localAddress, // FIXME: Is this needed anymore? What should we do here?
|
||||
deserializedMessage,
|
||||
sender,
|
||||
OptionVal.Some(sender), // FIXME: No need for an option, decode simply to deadLetters instead
|
||||
originUid,
|
||||
association)
|
||||
|
||||
|
|
@ -203,7 +176,7 @@ private[remote] class Decoder(
|
|||
// recipient for the first message that is sent to it, best effort retry
|
||||
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
||||
retryResolveRemoteDeployedRecipientAttempts,
|
||||
headerBuilder.recipientActorRef, decoded), retryResolveRemoteDeployedRecipientInterval)
|
||||
headerBuilder.recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval)
|
||||
} else
|
||||
push(out, decoded)
|
||||
} catch {
|
||||
|
|
@ -218,28 +191,12 @@ private[remote] class Decoder(
|
|||
}
|
||||
|
||||
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
|
||||
recipientCache.get(path) match {
|
||||
case null ⇒
|
||||
def addToCache(resolved: InternalActorRef): Unit = {
|
||||
// FIXME we might need an efficient LRU cache, or replaced by compression table
|
||||
if (recipientCache.size() >= 1000)
|
||||
recipientCache.clear()
|
||||
recipientCache.put(path, resolved)
|
||||
}
|
||||
|
||||
resolveActorRefWithLocalAddress(path) match {
|
||||
case empty: EmptyLocalActorRef ⇒
|
||||
val pathElements = empty.path.elements
|
||||
if (pathElements.nonEmpty && pathElements.head == "remote")
|
||||
OptionVal.None
|
||||
else {
|
||||
addToCache(empty)
|
||||
OptionVal(empty)
|
||||
}
|
||||
case ref ⇒
|
||||
addToCache(ref)
|
||||
OptionVal(ref)
|
||||
}
|
||||
resolveActorRefWithLocalAddress(path) match {
|
||||
case empty: EmptyLocalActorRef ⇒
|
||||
val pathElements = empty.path.elements
|
||||
// FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer)
|
||||
if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
|
||||
else OptionVal(empty)
|
||||
case ref ⇒ OptionVal(ref)
|
||||
}
|
||||
}
|
||||
|
|
@ -254,11 +211,10 @@ private[remote] class Decoder(
|
|||
if (attemptsLeft > 0)
|
||||
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
||||
attemptsLeft - 1,
|
||||
headerBuilder.recipientActorRef, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
||||
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
||||
else {
|
||||
val recipient = resolveActorRefWithLocalAddress(recipientPath)
|
||||
// only retry for the first message
|
||||
recipientCache.put(recipientPath, recipient)
|
||||
// FIXME only retry for the first message, need to keep them in a cache
|
||||
push(out, inboundEnvelope.withRecipient(recipient))
|
||||
}
|
||||
case OptionVal.Some(recipient) ⇒
|
||||
|
|
@ -270,3 +226,4 @@ private[remote] class Decoder(
|
|||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue