adaptive sampling of hit counting

* when rate exceeds 1000 msg/s adaptive sampling of the
  heavy hitters tracking is enabled by sampling every 256th message
* also fixed some bugs related to advertise in progress

* update InboundCompression state atomically

* enable compression in LatencySpec
This commit is contained in:
Patrik Nordwall 2016-07-04 15:59:44 +02:00
parent d1015c1dc6
commit d2657a5969
18 changed files with 320 additions and 142 deletions

View file

@ -54,6 +54,12 @@ object LatencySpec extends MultiNodeConfig {
remote.artery {
enabled = on
advanced.idle-cpu-level=8
advanced.compression {
enabled = on
actor-refs.advertisement-interval = 2 second
manifests.advertisement-interval = 2 second
}
}
}
""")))
@ -89,6 +95,8 @@ object LatencySpec extends MultiNodeConfig {
def receive = {
case bytes: Array[Byte]
// length 0 is used for warmup
if (bytes.length != 0) {
if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
count += 1
@ -99,6 +107,7 @@ object LatencySpec extends MultiNodeConfig {
context.stop(self)
}
}
}
def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = {
import scala.collection.JavaConverters._
@ -227,12 +236,21 @@ abstract class LatencySpec
histogram.reset()
val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram, plotProbe.ref))
// warmup for 3 seconds to init compression
val warmup = Source(1 to 30)
.throttle(10, 1.second, 10, ThrottleMode.Shaping)
.runForeach { n
echo.tell(Array.emptyByteArray, receiver)
}
warmup.foreach { _
Source(1 to totalMessages)
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
.runForeach { n
sendTimes.set(n - 1, System.nanoTime())
echo.tell(payload, receiver)
}
}
watch(receiver)
expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds)

View file

@ -6,7 +6,6 @@ package akka.remote.artery
import java.nio.ByteBuffer
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
import akka.actor._
import akka.remote.RemoteActorRefProvider
@ -20,6 +19,8 @@ import akka.serialization.ByteBufferSerializer
import akka.serialization.SerializerWithStringManifest
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.remote.artery.compress.CompressionProtocol.Events.ReceivedActorRefCompressionTable
import akka.remote.RARP
object MaxThroughputSpec extends MultiNodeConfig {
val first = role("first")
@ -32,7 +33,8 @@ object MaxThroughputSpec extends MultiNodeConfig {
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0
akka {
loglevel = ERROR
loglevel = INFO
log-dead-letters = 1000000
# avoid TestEventListener
loggers = ["akka.event.Logging$$DefaultLogger"]
testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s
@ -58,11 +60,9 @@ object MaxThroughputSpec extends MultiNodeConfig {
#advanced.aeron-dir = "target/aeron"
advanced.compression {
enabled = off
actor-refs {
enabled = on
advertisement-interval = 1 second
}
actor-refs.advertisement-interval = 2 second
manifests.advertisement-interval = 2 second
}
}
}
@ -108,13 +108,39 @@ object MaxThroughputSpec extends MultiNodeConfig {
var remaining = totalMessages
var maxRoundTripMillis = 0L
context.system.eventStream.subscribe(self, classOf[ReceivedActorRefCompressionTable])
val compressionEnabled =
RARP(context.system).provider.transport.isInstanceOf[ArteryTransport] &&
RARP(context.system).provider.remoteSettings.ArteryCompressionSettings.enabled
def receive = {
case Run
// first some warmup
sendBatch()
// then Start, which will echo back here
target ! Start
if (compressionEnabled) {
target ! payload
context.setReceiveTimeout(1.second)
context.become(waitingForCompression)
} else {
sendBatch() // first some warmup
target ! Start // then Start, which will echo back here
context.become(active)
}
}
def waitingForCompression: Receive = {
case ReceivedActorRefCompressionTable(_, table)
if (table.map.contains(target)) {
sendBatch() // first some warmup
target ! Start // then Start, which will echo back here
context.setReceiveTimeout(Duration.Undefined)
context.become(active)
} else
target ! payload
case ReceiveTimeout
target ! payload
}
def active: Receive = {
case Start
println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " +
s"$burstSize and payload size $payloadSize")
@ -152,6 +178,8 @@ object MaxThroughputSpec extends MultiNodeConfig {
s"$took ms to deliver $totalReceived messages")
plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024)
context.stop(self)
case c: ReceivedActorRefCompressionTable
}
def sendBatch(): Unit = {
@ -280,8 +308,7 @@ abstract class MaxThroughputSpec
totalMessages = adjustedTotalMessages(20000),
burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000
payloadSize = 100,
senderReceiverPairs = 5)
)
senderReceiverPairs = 5))
def test(testSettings: TestSettings): Unit = {
import testSettings._

View file

@ -71,11 +71,8 @@ private[akka] object MessageSerializer {
}
}
def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, headerBuilder: HeaderBuilder,
envelope: EnvelopeBuffer): AnyRef = {
serialization.deserializeByteBuffer(
envelope.byteBuffer,
headerBuilder.serializer,
headerBuilder.manifest(originUid)) // FIXME currently compression will not work for manifests
def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization,
serializer: Int, classManifest: String, envelope: EnvelopeBuffer): AnyRef = {
serialization.deserializeByteBuffer(envelope.byteBuffer, serializer, classManifest)
}
}

View file

@ -290,8 +290,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
@volatile private[this] var aeron: Aeron = _
@volatile private[this] var aeronErrorLogTask: Cancellable = _
// this is only used to allow triggering compression advertisements or state from tests
@volatile private[this] var activeCompressions = Set.empty[InboundCompressions]
@volatile private[this] var inboundCompressions: Option[InboundCompressions] = None
override def localAddress: UniqueAddress = _localAddress
override def defaultAddress: Address = localAddress.address
@ -517,6 +516,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def runInboundStreams(): Unit = {
val noCompressions = NoInboundCompressions // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082
val compressions = createInboundCompressions(this)
inboundCompressions = Some(compressions)
runInboundControlStream(noCompressions) // TODO should understand compressions too
runInboundOrdinaryMessagesStream(compressions)
@ -547,18 +547,27 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
controlSubject.attach(new ControlMessageObserver {
override def notify(inboundEnvelope: InboundEnvelope): Unit = {
inboundEnvelope.message match {
case m: CompressionMessage
import CompressionProtocol._
m match {
case CompressionProtocol.ActorRefCompressionAdvertisement(from, table)
case ActorRefCompressionAdvertisement(from, table)
log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table)
association(from.address).outboundCompression.applyActorRefCompressionTable(table)
system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table))
case CompressionProtocol.ClassManifestCompressionAdvertisement(from, table)
val a = association(from.address)
a.outboundCompression.applyActorRefCompressionTable(table)
a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version))
system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table))
case ActorRefCompressionAdvertisementAck(from, tableVersion)
inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion))
case ClassManifestCompressionAdvertisement(from, table)
log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table)
association(from.address).outboundCompression.applyClassManifestCompressionTable(table)
system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table))
val a = association(from.address)
a.outboundCompression.applyClassManifestCompressionTable(table)
a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version))
system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table))
case ClassManifestCompressionAdvertisementAck(from, tableVersion)
inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion))
}
case Quarantined(from, to) if to == localAddress
@ -768,11 +777,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool))
private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions =
if (remoteSettings.ArteryCompressionSettings.enabled) {
val comp = new InboundCompressionsImpl(system, inboundContext)
activeCompressions += comp
comp
} else NoInboundCompressions
if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext)
else NoInboundCompressions
def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool))
@ -849,7 +855,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
/** INTERNAL API: for testing only. */
private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = {
activeCompressions.foreach {
inboundCompressions.foreach {
case c: InboundCompressionsImpl if actorRef || manifest
log.info("Triggering compression table advertisement for {}", c)
if (actorRef) c.runNextActorRefAdvertisement()

View file

@ -193,7 +193,9 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres
def setRecipientActorRef(ref: ActorRef): Unit = {
_recipientActorRefIdx = outboundCompression.compressActorRef(ref)
if (_recipientActorRefIdx == -1) _recipientActorRef = ref.path.toSerializationFormat
if (_recipientActorRefIdx == -1) {
_recipientActorRef = ref.path.toSerializationFormat
}
}
def recipientActorRef(originUid: Long): OptionVal[ActorRef] =
if (_recipientActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _recipientActorRefIdx)

View file

@ -15,6 +15,7 @@ import akka.util.{ ByteString, OptionVal, PrettyByteString }
import akka.actor.EmptyLocalActorRef
import akka.remote.artery.compress.{ InboundCompressions, OutboundCompressions, OutboundCompressionsImpl }
import akka.stream.stage.TimerGraphStageLogic
import java.util.concurrent.TimeUnit
/**
* INTERNAL API
@ -48,6 +49,13 @@ private[remote] class Encoder(
val envelope = bufferPool.acquire()
// FIXME: OMG race between setting the version, and using the table!!!!
// incoming messages are concurrent to outgoing ones
// incoming message may be table advertisement
// which swaps the table in Outgoing*Compression for the new one (n+1)
// by itself it does so atomically,
// race: however here we store the compression table version separately from actually using it (storing the refs / manifests etc).
// so there is a slight race IF the table is swapped right between us setting the version n here [then the table being swapped to n+1] and then we use the n+1 version to compressions the compressions (which the receiving end will fail to read, since the encoding could be completely different, and it picks the table based on the version Int).
// A solution would be to getTable => use it to set and compress things
headerBuilder setActorRefCompressionTableVersion compression.actorRefCompressionTableVersion
headerBuilder setClassManifestCompressionTableVersion compression.classManifestCompressionTableVersion
@ -113,6 +121,8 @@ private[remote] object Decoder {
attemptsLeft: Int,
recipientPath: String,
inboundEnvelope: InboundEnvelope)
private object Tick
}
/**
@ -125,6 +135,7 @@ private[remote] class Decoder(
compression: InboundCompressions, // TODO has to do demuxing on remote address It would seem, as decoder does not yet know
bufferPool: EnvelopeBufferPool,
inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
import Decoder.Tick
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
@ -139,9 +150,22 @@ private[remote] class Decoder(
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
private val retryResolveRemoteDeployedRecipientAttempts = 20
// adaptive sampling when rate > 1000 msg/s
private var messageCount = 0L
private val HeavyHitterMask = (1 << 8) - 1 // sample every 256nth message
private var adaptiveSampling = false
private val adaptiveSamplingRateThreshold = 1000
private var tickTimestamp = System.nanoTime()
private var tickMessageCount = 0L
override protected def logSource = classOf[Decoder]
override def preStart(): Unit = {
schedulePeriodically(Tick, 1.seconds)
}
override def onPush(): Unit = {
messageCount += 1
val envelope = grab(in)
envelope.parseHeader(headerBuilder)
@ -166,22 +190,40 @@ private[remote] class Decoder(
OptionVal.None
}
val classManifest = headerBuilder.manifest(originUid)
if (!adaptiveSampling || (messageCount & HeavyHitterMask) == 0) {
// --- hit refs and manifests for heavy-hitter counting
association match {
case OptionVal.Some(assoc)
val remoteAddress = assoc.remoteAddress
if (sender.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender.get)
if (recipient.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, recipient.get)
compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, remoteAddress, headerBuilder.manifest(originUid))
sender match {
case OptionVal.Some(snd)
compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion,
remoteAddress, snd, 1)
case OptionVal.None
}
recipient match {
case OptionVal.Some(rcp)
compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion,
remoteAddress, rcp, 1)
case OptionVal.None
}
compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion,
remoteAddress, classManifest, 1)
case _
// we don't want to record hits for compression while handshake is still in progress.
log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
}
// --- end of hit refs and manifests for heavy-hitter counting
}
try {
val deserializedMessage = MessageSerializer.deserializeForArtery(
system, originUid, serialization, headerBuilder, envelope)
system, originUid, serialization, headerBuilder.serializer, classManifest, envelope)
val decoded = inEnvelopePool.acquire().init(
recipient,
@ -203,7 +245,7 @@ private[remote] class Decoder(
case NonFatal(e)
log.warning(
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
headerBuilder.serializer, headerBuilder.manifest(originUid), e.getMessage)
headerBuilder.serializer, classManifest, e.getMessage)
pull(in)
} finally {
bufferPool.release(envelope)
@ -225,6 +267,19 @@ private[remote] class Decoder(
override protected def onTimer(timerKey: Any): Unit = {
timerKey match {
case Tick
val now = System.nanoTime()
val d = now - tickTimestamp
val oldAdaptiveSampling = adaptiveSampling
adaptiveSampling = (d == 0 ||
(messageCount - tickMessageCount) * TimeUnit.SECONDS.toNanos(1) / d > adaptiveSamplingRateThreshold)
if (!oldAdaptiveSampling && adaptiveSampling)
log.info("Turning on adaptive sampling ({}nth message) of compression hit counting", HeavyHitterMask + 1)
else if (oldAdaptiveSampling && !adaptiveSampling)
log.info("Turning off adaptive sampling of compression hit counting")
tickMessageCount = messageCount
tickTimestamp = now
case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope)
resolveRecipient(recipientPath) match {
case OptionVal.None

View file

@ -13,15 +13,17 @@ import akka.util.OptionVal
* Literarily, no compression!
*/
case object NoInboundCompressions extends InboundCompressions {
override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = ()
override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = ()
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] =
if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
else OptionVal.None
override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = ()
override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = ()
override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] =
if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
else OptionVal.None
override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
}
/**

View file

@ -39,8 +39,7 @@ private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remote
*/
private[remote] final class InboundCompressionsImpl(
system: ActorSystem,
inboundContext: InboundContext
) extends InboundCompressions {
inboundContext: InboundContext) extends InboundCompressions {
private val settings = CompressionSettings(system)
@ -67,20 +66,21 @@ private[remote] final class InboundCompressionsImpl(
// actor ref compression ---
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = {
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] =
actorRefsIn(originUid).decompress(tableVersion, idx)
}
override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef): Unit = {
actorRefsIn(originUid).increment(address, ref, 1L)
}
override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef, n: Int): Unit =
actorRefsIn(originUid).increment(address, ref, n)
override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit =
actorRefsIn(originUid).confirmAdvertisement(tableVersion)
// class manifest compression ---
override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] =
classManifestsIn(originUid).decompress(tableVersion, idx)
override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String): Unit = {
classManifestsIn(originUid).increment(address, manifest, 1L)
}
override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String, n: Int): Unit =
classManifestsIn(originUid).increment(address, manifest, n)
override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit =
actorRefsIn(originUid).confirmAdvertisement(tableVersion)
// testing utilities ---

View file

@ -14,11 +14,13 @@ import akka.util.OptionVal
* One per inbound message stream thus must demux by originUid to use the right tables.
*/
private[remote] trait InboundCompressions {
def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit
def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit
def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef]
def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit
def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit
def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit
def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String]
def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit
}
/**
* INTERNAL API

View file

@ -23,6 +23,16 @@ object CompressionProtocol {
private[remote] final case class ActorRefCompressionAdvertisement(from: UniqueAddress, table: CompressionTable[ActorRef])
extends ControlMessage with CompressionMessage
/**
* INTERNAL API
* Sent by the "sending" node after receiving [[ActorRefCompressionAdvertisement]]
* The advertisement is also confirmed by the first message using that table version,
* but we need separate ack in case the sender is not using any of the refs in the advertised
* table.
*/
private[remote] final case class ActorRefCompressionAdvertisementAck(from: UniqueAddress, tableVersion: Int)
extends ControlMessage with CompressionMessage
/**
* INTERNAL API
* Sent by the "receiving" node after allocating a compression id to a given class manifest
@ -30,6 +40,16 @@ object CompressionProtocol {
private[remote] final case class ClassManifestCompressionAdvertisement(from: UniqueAddress, table: CompressionTable[String])
extends ControlMessage with CompressionMessage
/**
* INTERNAL API
* Sent by the "sending" node after receiving [[ClassManifestCompressionAdvertisement]]
* The advertisement is also confirmed by the first message using that table version,
* but we need separate ack in case the sender is not using any of the refs in the advertised
* table.
*/
private[remote] final case class ClassManifestCompressionAdvertisementAck(from: UniqueAddress, tableVersion: Int)
extends ControlMessage with CompressionMessage
/** INTERNAL API */
private[akka] object Events {
/** INTERNAL API */
@ -39,7 +59,10 @@ object CompressionProtocol {
final case class HeavyHitterDetected(key: Any, id: Int, count: Long) extends Event
/** INTERNAL API */
final case class ReceivedCompressionTable[T](from: UniqueAddress, table: CompressionTable[T]) extends Event
final case class ReceivedActorRefCompressionTable(from: UniqueAddress, table: CompressionTable[ActorRef]) extends Event
/** INTERNAL API */
final case class ReceivedClassManifestCompressionTable(from: UniqueAddress, table: CompressionTable[String]) extends Event
}

View file

@ -25,8 +25,7 @@ private[remote] final case class DecompressionTable[T](version: Int, table: Arra
s"(version: $version, " +
(
if (length == 0) "[empty]"
else s"table: [${table.zipWithIndex.map({ case (t, i) s"$i -> $t" }).mkString(",")}"
) + "])"
else s"table: [${table.zipWithIndex.map({ case (t, i) s"$i -> $t" }).mkString(",")}") + "])"
}
/** INTERNAL API */

View file

@ -8,8 +8,8 @@ import akka.actor.{ ActorRef, ActorSystem, Address }
import akka.event.{ Logging, NoLogging }
import akka.remote.artery.{ InboundContext, OutboundContext }
import akka.util.{ OptionVal, PrettyDuration }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import java.util.concurrent.atomic.AtomicReference
/**
* INTERNAL API
@ -33,9 +33,9 @@ private[remote] final class InboundActorRefCompression(
allocations foreach { case ref increment(null, ref, 100000) }
}
override def decompress(tableId: Long, idx: Int): OptionVal[ActorRef] =
override def decompress(tableVersion: Int, idx: Int): OptionVal[ActorRef] =
if (idx == 0) OptionVal.Some(system.deadLetters)
else super.decompress(tableId, idx)
else super.decompress(tableVersion, idx)
scheduleNextTableAdvertisement()
override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval
@ -63,6 +63,34 @@ final class InboundManifestCompression(
outboundContext.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table))
}
}
/**
* INTERNAL API
*/
private[remote] object InboundCompression {
object State {
def empty[T] = State(
oldTable = DecompressionTable.empty[T].copy(version = -1),
activeTable = DecompressionTable.empty[T],
nextTable = DecompressionTable.empty[T].copy(version = 1),
advertisementInProgress = None)
}
final case class State[T](
oldTable: DecompressionTable[T],
activeTable: DecompressionTable[T],
nextTable: DecompressionTable[T],
advertisementInProgress: Option[CompressionTable[T]]) {
def startUsingNextTable(): State[T] =
State(
oldTable = activeTable,
activeTable = nextTable,
nextTable = DecompressionTable.empty[T].copy(version = nextTable.version + 1),
advertisementInProgress = None)
}
}
/**
* INTERNAL API
@ -77,19 +105,13 @@ private[remote] abstract class InboundCompression[T >: Null](
lazy val log = Logging(system, getClass.getSimpleName)
// TODO atomic / state machine? the InbouncCompression could even extend ActomicReference[State]!
// TODO NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet,
// yet we technically could already prepare table 3, then it starts using table 1 suddenly. Edge cases like that.
// SOLUTION 1: We don't start building new tables until we've seen the previous one be used (move from new to active)
// This is nice as it practically disables all the "build the table" work when the other side is not interested in using it.
// SOLUTION 2: We end up dropping messages when old table comes in (we do that anyway)
// TODO have a marker that "advertised table XXX", so we don't generate a new-new one until the new one is in use?
// 2 tables are used, one is "still in use", and the
@volatile private[this] var activeTable = DecompressionTable.empty[T]
@volatile private[this] var nextTable = DecompressionTable.empty[T].copy(version = 1)
private[this] val state: AtomicReference[InboundCompression.State[T]] = new AtomicReference(InboundCompression.State.empty)
// TODO calibrate properly (h/w have direct relation to preciseness and max capacity)
private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt)
@ -104,30 +126,50 @@ private[remote] abstract class InboundCompression[T >: Null](
* @throws UnknownCompressedIdException if given id is not known, this may indicate a bug such situation should not happen.
*/
// not tailrec because we allow special casing in sub-class, however recursion is always at most 1 level deep
def decompress(incomingTableVersion: Long, idx: Int): OptionVal[T] = {
val activeVersion = activeTable.version
def decompress(incomingTableVersion: Int, idx: Int): OptionVal[T] = {
val current = state.get
val oldVersion = current.oldTable.version
val activeVersion = current.activeTable.version
if (incomingTableVersion == -1) OptionVal.None // no compression, bail out early
else if (incomingTableVersion == activeVersion) {
val value: T = activeTable.get(idx)
val value: T = current.activeTable.get(idx)
if (value != null) OptionVal.Some[T](value)
else throw new UnknownCompressedIdException(idx)
} else if (incomingTableVersion == oldVersion) {
// must handle one old table due to messages in flight during advertisement
val value: T = current.oldTable.get(idx)
if (value != null) OptionVal.Some[T](value)
else throw new UnknownCompressedIdException(idx)
} else if (incomingTableVersion < activeVersion) {
log.warning("Received value compressed with old table: [{}], current table version is: [{}]", incomingTableVersion, activeVersion)
OptionVal.None
} else if (incomingTableVersion == nextTable.version) {
advertisementInProgress = false
log.debug("Received first value compressed using the next prepared compression table, flipping to it (version: {})", nextTable.version)
startUsingNextTable()
} else if (incomingTableVersion == current.nextTable.version) {
log.debug(
"Received first value compressed using the next prepared compression table, flipping to it (version: {})",
current.nextTable.version)
confirmAdvertisement(incomingTableVersion)
decompress(incomingTableVersion, idx) // recurse, activeTable will not be able to handle this
} else {
// which means that incoming version was > nextTable.version, which likely is a bug
log.error(
"Inbound message is using compression table version higher than the highest allocated table on this node. " +
"This should not happen! State: activeTable: {}, nextTable: {}, incoming tableVersion: {}",
activeVersion, nextTable.version, incomingTableVersion)
activeVersion, current.nextTable.version, incomingTableVersion)
OptionVal.None
}
}
def confirmAdvertisement(tableVersion: Int): Unit = {
val current = state.get
current.advertisementInProgress match {
case Some(inProgress) if tableVersion == inProgress.version
if (state.compareAndSet(current, current.startUsingNextTable()))
log.debug("Confirmed compression table version {}", tableVersion)
case Some(inProgress) if tableVersion != inProgress.version
log.debug("Confirmed compression table version {} but in progress {}", tableVersion, inProgress.version)
case None // already confirmed
}
}
@ -182,9 +224,6 @@ private[remote] abstract class InboundCompression[T >: Null](
finally scheduleNextTableAdvertisement()
}
// FIXME use AtomicBoolean instead?
@volatile private[this] var advertisementInProgress = false
/**
* Entry point to advertising a new compression table.
*
@ -195,13 +234,16 @@ private[remote] abstract class InboundCompression[T >: Null](
* It must be advertised to the other side so it can start using it in its outgoing compression.
* Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing.
*/
private[remote] def runNextTableAdvertisement() =
if (!advertisementInProgress)
private[remote] def runNextTableAdvertisement() = {
val current = state.get
current.advertisementInProgress match {
case None
inboundContext.association(originUid) match {
case OptionVal.Some(association)
advertisementInProgress = true
val table = prepareCompressionAdvertisement()
nextTable = table.invert // TODO expensive, check if building the other way wouldn't be faster?
val table = prepareCompressionAdvertisement(current.nextTable.version)
// TODO expensive, check if building the other way wouldn't be faster?
val nextState = current.copy(nextTable = table.invert, advertisementInProgress = Some(table))
if (state.compareAndSet(current, nextState))
advertiseCompressionTable(association, table)
case OptionVal.None
@ -210,23 +252,26 @@ private[remote] abstract class InboundCompression[T >: Null](
log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid)
}
case Some(inProgress)
// The ActorRefCompressionAdvertisement message is resent because it can be lost
log.debug("Advertisment in progress for version {}, resending", inProgress.version)
inboundContext.association(originUid) match {
case OptionVal.Some(association)
advertiseCompressionTable(association, inProgress) // resend
case OptionVal.None
}
}
}
/**
* Must be implementeed by extending classes in order to send a [[akka.remote.artery.ControlMessage]]
* of apropriate type to the remote system in order to advertise the compression table to it.
*/
protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit
/** Drop `activeTable` and start using the `nextTable` in its place. */
private def startUsingNextTable(): Unit = {
log.debug("Swaping active decompression table to version {}.", nextTable.version)
activeTable = nextTable
nextTable = DecompressionTable.empty
// TODO we want to keep the currentTableVersion in State too, update here as well then
}
private def prepareCompressionAdvertisement(): CompressionTable[T] = {
private def prepareCompressionAdvertisement(nextTableVersion: Int): CompressionTable[T] = {
// TODO surely we can do better than that, optimise
CompressionTable(activeTable.version + 1, Map(heavyHitters.snapshot.filterNot(_ == null).zipWithIndex: _*))
CompressionTable(nextTableVersion, Map(heavyHitters.snapshot.filterNot(_ == null).zipWithIndex: _*))
}
override def toString =

View file

@ -61,7 +61,7 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd
else
flipTable(activate) // retry
else if (state.version == activate.version)
log.warning("Received duplicate compression table (version: {})! Ignoring it.", state.version)
log.debug("Received duplicate compression table (version: {})! Ignoring it.", state.version)
else
log.error("Received unexpected compression table with version nr [{}]! " +
"Current version number is [{}].", activate.version, state.version)

View file

@ -34,14 +34,16 @@ class EnvelopeBufferSpec extends AkkaSpec {
override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests
override def actorRefCompressionTableVersion: Int = 0
override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1)
override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = ()
override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = ()
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx))
override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests
override def classManifestCompressionTableVersion: Int = 0
override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1)
override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = ()
override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = ()
override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx))
override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
}
"EnvelopeBuffer" must {

View file

@ -75,7 +75,7 @@ class CompressionIntegrationSpec extends AkkaSpec(CompressionIntegrationSpec.com
// cause testActor-1 to become a heavy hitter
(1 to messagesToExchange).foreach { i voidSel ! "hello" } // does not reply, but a hot receiver should be advertised
val a1 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds)
val a1 = aProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](10.seconds)
info("System [A] received: " + a1)
assertCompression[ActorRef](a1.table, 0, _ should ===(system.deadLetters))
assertCompression[ActorRef](a1.table, 1, _ should ===(testActor))

View file

@ -5,7 +5,7 @@
package akka.remote.artery.compress
/* INTERNAL API */
private[remote] trait CompressionTestKit {
private[akka] trait CompressionTestKit {
def assertCompression[T](table: CompressionTable[T], id: Int, assertion: T Unit): Unit = {
table.map.find(_._2 == id)
.orElse { throw new AssertionError(s"No key was compressed to the id [$id]! Table was: $table") }
@ -14,4 +14,4 @@ private[remote] trait CompressionTestKit {
}
/* INTERNAL API */
private[remote] object CompressionTestKit extends CompressionTestKit
private[akka] object CompressionTestKit extends CompressionTestKit

View file

@ -11,7 +11,7 @@ import akka.util.Timeout
import akka.pattern.ask
import akka.remote.RARP
import akka.remote.artery.ArteryTransport
import akka.remote.artery.compress.CompressionProtocol.Events.{ Event, ReceivedCompressionTable }
import akka.remote.artery.compress.CompressionProtocol.Events.{ Event, ReceivedActorRefCompressionTable }
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
@ -82,7 +82,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
waitForEcho(this, s"hello-$messagesToExchange")
systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a0 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
val a0 = aProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
info("System [A] received: " + a0)
a0.table.map.keySet should contain(testActor)
@ -91,7 +91,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
waitForEcho(a1Probe, s"hello-$messagesToExchange")
systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a1 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
val a1 = aProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
info("System [A] received: " + a1)
a1.table.map.keySet should contain(a1Probe.ref)
@ -113,7 +113,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
waitForEcho(this, s"hello-$messagesToExchange", max = 10.seconds)
systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a2 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
val a2 = aNewProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
info("System [A] received: " + a2)
a2.table.map.keySet should contain(testActor)
@ -122,7 +122,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
waitForEcho(aNew2Probe, s"hello-$messagesToExchange")
systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a3 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
val a3 = aNewProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
info("Received second compression: " + a3)
a3.table.map.keySet should contain(aNew2Probe.ref)
}

View file

@ -82,7 +82,7 @@ object AkkaBuild extends Build {
protobuf,
remote,
remoteTests,
samples,
// samples,
slf4j,
stream,
streamTestkit,