diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index a6030f71e1..7746658e89 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -54,6 +54,12 @@ object LatencySpec extends MultiNodeConfig { remote.artery { enabled = on advanced.idle-cpu-level=8 + + advanced.compression { + enabled = on + actor-refs.advertisement-interval = 2 second + manifests.advertisement-interval = 2 second + } } } """))) @@ -89,14 +95,17 @@ object LatencySpec extends MultiNodeConfig { def receive = { case bytes: Array[Byte] ⇒ - if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") - reporter.onMessage(1, payloadSize) - count += 1 - val d = System.nanoTime() - sendTimes.get(count - 1) - histogram.recordValue(d) - if (count == totalMessages) { - printTotal(testName, bytes.length, histogram) - context.stop(self) + // length 0 is used for warmup + if (bytes.length != 0) { + if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") + reporter.onMessage(1, payloadSize) + count += 1 + val d = System.nanoTime() - sendTimes.get(count - 1) + histogram.recordValue(d) + if (count == totalMessages) { + printTotal(testName, bytes.length, histogram) + context.stop(self) + } } } @@ -227,13 +236,22 @@ abstract class LatencySpec histogram.reset() val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram, plotProbe.ref)) - Source(1 to totalMessages) - .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) + // warmup for 3 seconds to init compression + val warmup = Source(1 to 30) + .throttle(10, 1.second, 10, ThrottleMode.Shaping) .runForeach { n ⇒ - sendTimes.set(n - 1, System.nanoTime()) - echo.tell(payload, receiver) + echo.tell(Array.emptyByteArray, receiver) } + warmup.foreach { _ ⇒ + Source(1 to totalMessages) + .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) + .runForeach { n ⇒ + sendTimes.set(n - 1, System.nanoTime()) + echo.tell(payload, receiver) + } + } + watch(receiver) expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds) val p = plotProbe.expectMsgType[LatencyPlots] diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 3dfee899bd..50eeb1035e 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -6,7 +6,6 @@ package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.NANOSECONDS - import scala.concurrent.duration._ import akka.actor._ import akka.remote.RemoteActorRefProvider @@ -20,6 +19,8 @@ import akka.serialization.ByteBufferSerializer import akka.serialization.SerializerWithStringManifest import akka.testkit._ import com.typesafe.config.ConfigFactory +import akka.remote.artery.compress.CompressionProtocol.Events.ReceivedActorRefCompressionTable +import akka.remote.RARP object MaxThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -32,7 +33,8 @@ object MaxThroughputSpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (20) akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0 akka { - loglevel = ERROR + loglevel = INFO + log-dead-letters = 1000000 # avoid TestEventListener loggers = ["akka.event.Logging$$DefaultLogger"] testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s @@ -58,11 +60,9 @@ object MaxThroughputSpec extends MultiNodeConfig { #advanced.aeron-dir = "target/aeron" advanced.compression { - enabled = off - actor-refs { - enabled = on - advertisement-interval = 1 second - } + enabled = on + actor-refs.advertisement-interval = 2 second + manifests.advertisement-interval = 2 second } } } @@ -108,13 +108,39 @@ object MaxThroughputSpec extends MultiNodeConfig { var remaining = totalMessages var maxRoundTripMillis = 0L + context.system.eventStream.subscribe(self, classOf[ReceivedActorRefCompressionTable]) + + val compressionEnabled = + RARP(context.system).provider.transport.isInstanceOf[ArteryTransport] && + RARP(context.system).provider.remoteSettings.ArteryCompressionSettings.enabled + def receive = { case Run ⇒ - // first some warmup - sendBatch() - // then Start, which will echo back here - target ! Start + if (compressionEnabled) { + target ! payload + context.setReceiveTimeout(1.second) + context.become(waitingForCompression) + } else { + sendBatch() // first some warmup + target ! Start // then Start, which will echo back here + context.become(active) + } + } + def waitingForCompression: Receive = { + case ReceivedActorRefCompressionTable(_, table) ⇒ + if (table.map.contains(target)) { + sendBatch() // first some warmup + target ! Start // then Start, which will echo back here + context.setReceiveTimeout(Duration.Undefined) + context.become(active) + } else + target ! payload + case ReceiveTimeout ⇒ + target ! payload + } + + def active: Receive = { case Start ⇒ println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " + s"$burstSize and payload size $payloadSize") @@ -152,6 +178,8 @@ object MaxThroughputSpec extends MultiNodeConfig { s"$took ms to deliver $totalReceived messages") plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024) context.stop(self) + + case c: ReceivedActorRefCompressionTable ⇒ } def sendBatch(): Unit = { @@ -280,8 +308,7 @@ abstract class MaxThroughputSpec totalMessages = adjustedTotalMessages(20000), burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000 payloadSize = 100, - senderReceiverPairs = 5) - ) + senderReceiverPairs = 5)) def test(testSettings: TestSettings): Unit = { import testSettings._ diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 981b2b5bed..b09f9cbd01 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -71,11 +71,8 @@ private[akka] object MessageSerializer { } } - def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, headerBuilder: HeaderBuilder, - envelope: EnvelopeBuffer): AnyRef = { - serialization.deserializeByteBuffer( - envelope.byteBuffer, - headerBuilder.serializer, - headerBuilder.manifest(originUid)) // FIXME currently compression will not work for manifests + def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, + serializer: Int, classManifest: String, envelope: EnvelopeBuffer): AnyRef = { + serialization.deserializeByteBuffer(envelope.byteBuffer, serializer, classManifest) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 42dcb5bad5..b94fbb38a1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -290,8 +290,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _ - // this is only used to allow triggering compression advertisements or state from tests - @volatile private[this] var activeCompressions = Set.empty[InboundCompressions] + @volatile private[this] var inboundCompressions: Option[InboundCompressions] = None override def localAddress: UniqueAddress = _localAddress override def defaultAddress: Address = localAddress.address @@ -517,6 +516,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def runInboundStreams(): Unit = { val noCompressions = NoInboundCompressions // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082 val compressions = createInboundCompressions(this) + inboundCompressions = Some(compressions) runInboundControlStream(noCompressions) // TODO should understand compressions too runInboundOrdinaryMessagesStream(compressions) @@ -547,18 +547,27 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R controlSubject.attach(new ControlMessageObserver { override def notify(inboundEnvelope: InboundEnvelope): Unit = { + inboundEnvelope.message match { case m: CompressionMessage ⇒ + import CompressionProtocol._ m match { - case CompressionProtocol.ActorRefCompressionAdvertisement(from, table) ⇒ + case ActorRefCompressionAdvertisement(from, table) ⇒ log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table) - association(from.address).outboundCompression.applyActorRefCompressionTable(table) - system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) - - case CompressionProtocol.ClassManifestCompressionAdvertisement(from, table) ⇒ + val a = association(from.address) + a.outboundCompression.applyActorRefCompressionTable(table) + a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version)) + system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table)) + case ActorRefCompressionAdvertisementAck(from, tableVersion) ⇒ + inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) + case ClassManifestCompressionAdvertisement(from, table) ⇒ log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table) - association(from.address).outboundCompression.applyClassManifestCompressionTable(table) - system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) + val a = association(from.address) + a.outboundCompression.applyClassManifestCompressionTable(table) + a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version)) + system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table)) + case ClassManifestCompressionAdvertisementAck(from, tableVersion) ⇒ + inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) } case Quarantined(from, to) if to == localAddress ⇒ @@ -768,11 +777,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool)) private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = - if (remoteSettings.ArteryCompressionSettings.enabled) { - val comp = new InboundCompressionsImpl(system, inboundContext) - activeCompressions += comp - comp - } else NoInboundCompressions + if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) + else NoInboundCompressions def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool)) @@ -849,7 +855,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R /** INTERNAL API: for testing only. */ private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = { - activeCompressions.foreach { + inboundCompressions.foreach { case c: InboundCompressionsImpl if actorRef || manifest ⇒ log.info("Triggering compression table advertisement for {}", c) if (actorRef) c.runNextActorRefAdvertisement() diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index d529c1d42e..0b4bfbf558 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -75,7 +75,7 @@ private[remote] object EnvelopeBuffer { private[remote] object HeaderBuilder { // We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl - // we inject no-op compression's of the "other side". + // we inject no-op compression's of the "other side". def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoOutboundCompressions) def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoInboundCompressions, compression) @@ -193,7 +193,9 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres def setRecipientActorRef(ref: ActorRef): Unit = { _recipientActorRefIdx = outboundCompression.compressActorRef(ref) - if (_recipientActorRefIdx == -1) _recipientActorRef = ref.path.toSerializationFormat + if (_recipientActorRefIdx == -1) { + _recipientActorRef = ref.path.toSerializationFormat + } } def recipientActorRef(originUid: Long): OptionVal[ActorRef] = if (_recipientActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _recipientActorRefIdx) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 426107b849..848369d2e4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -15,6 +15,7 @@ import akka.util.{ ByteString, OptionVal, PrettyByteString } import akka.actor.EmptyLocalActorRef import akka.remote.artery.compress.{ InboundCompressions, OutboundCompressions, OutboundCompressionsImpl } import akka.stream.stage.TimerGraphStageLogic +import java.util.concurrent.TimeUnit /** * INTERNAL API @@ -48,6 +49,13 @@ private[remote] class Encoder( val envelope = bufferPool.acquire() // FIXME: OMG race between setting the version, and using the table!!!! + // incoming messages are concurrent to outgoing ones + // incoming message may be table advertisement + // which swaps the table in Outgoing*Compression for the new one (n+1) + // by itself it does so atomically, + // race: however here we store the compression table version separately from actually using it (storing the refs / manifests etc). + // so there is a slight race IF the table is swapped right between us setting the version n here [then the table being swapped to n+1] and then we use the n+1 version to compressions the compressions (which the receiving end will fail to read, since the encoding could be completely different, and it picks the table based on the version Int). + // A solution would be to getTable => use it to set and compress things headerBuilder setActorRefCompressionTableVersion compression.actorRefCompressionTableVersion headerBuilder setClassManifestCompressionTableVersion compression.classManifestCompressionTableVersion @@ -113,6 +121,8 @@ private[remote] object Decoder { attemptsLeft: Int, recipientPath: String, inboundEnvelope: InboundEnvelope) + + private object Tick } /** @@ -125,6 +135,7 @@ private[remote] class Decoder( compression: InboundCompressions, // TODO has to do demuxing on remote address It would seem, as decoder does not yet know bufferPool: EnvelopeBufferPool, inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { + import Decoder.Tick val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) @@ -139,9 +150,22 @@ private[remote] class Decoder( private val retryResolveRemoteDeployedRecipientInterval = 50.millis private val retryResolveRemoteDeployedRecipientAttempts = 20 + // adaptive sampling when rate > 1000 msg/s + private var messageCount = 0L + private val HeavyHitterMask = (1 << 8) - 1 // sample every 256nth message + private var adaptiveSampling = false + private val adaptiveSamplingRateThreshold = 1000 + private var tickTimestamp = System.nanoTime() + private var tickMessageCount = 0L + override protected def logSource = classOf[Decoder] + override def preStart(): Unit = { + schedulePeriodically(Tick, 1.seconds) + } + override def onPush(): Unit = { + messageCount += 1 val envelope = grab(in) envelope.parseHeader(headerBuilder) @@ -166,22 +190,40 @@ private[remote] class Decoder( OptionVal.None } - // --- hit refs and manifests for heavy-hitter counting - association match { - case OptionVal.Some(assoc) ⇒ - val remoteAddress = assoc.remoteAddress - if (sender.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender.get) - if (recipient.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, recipient.get) - compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, remoteAddress, headerBuilder.manifest(originUid)) - case _ ⇒ - // we don't want to record hits for compression while handshake is still in progress. - log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") + val classManifest = headerBuilder.manifest(originUid) + + if (!adaptiveSampling || (messageCount & HeavyHitterMask) == 0) { + // --- hit refs and manifests for heavy-hitter counting + association match { + case OptionVal.Some(assoc) ⇒ + val remoteAddress = assoc.remoteAddress + sender match { + case OptionVal.Some(snd) ⇒ + compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, + remoteAddress, snd, 1) + case OptionVal.None ⇒ + } + + recipient match { + case OptionVal.Some(rcp) ⇒ + compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, + remoteAddress, rcp, 1) + case OptionVal.None ⇒ + } + + compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, + remoteAddress, classManifest, 1) + + case _ ⇒ + // we don't want to record hits for compression while handshake is still in progress. + log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") + } + // --- end of hit refs and manifests for heavy-hitter counting } - // --- end of hit refs and manifests for heavy-hitter counting try { val deserializedMessage = MessageSerializer.deserializeForArtery( - system, originUid, serialization, headerBuilder, envelope) + system, originUid, serialization, headerBuilder.serializer, classManifest, envelope) val decoded = inEnvelopePool.acquire().init( recipient, @@ -203,7 +245,7 @@ private[remote] class Decoder( case NonFatal(e) ⇒ log.warning( "Failed to deserialize message with serializer id [{}] and manifest [{}]. {}", - headerBuilder.serializer, headerBuilder.manifest(originUid), e.getMessage) + headerBuilder.serializer, classManifest, e.getMessage) pull(in) } finally { bufferPool.release(envelope) @@ -225,6 +267,19 @@ private[remote] class Decoder( override protected def onTimer(timerKey: Any): Unit = { timerKey match { + case Tick ⇒ + val now = System.nanoTime() + val d = now - tickTimestamp + val oldAdaptiveSampling = adaptiveSampling + adaptiveSampling = (d == 0 || + (messageCount - tickMessageCount) * TimeUnit.SECONDS.toNanos(1) / d > adaptiveSamplingRateThreshold) + if (!oldAdaptiveSampling && adaptiveSampling) + log.info("Turning on adaptive sampling ({}nth message) of compression hit counting", HeavyHitterMask + 1) + else if (oldAdaptiveSampling && !adaptiveSampling) + log.info("Turning off adaptive sampling of compression hit counting") + tickMessageCount = messageCount + tickTimestamp = now + case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) ⇒ resolveRecipient(recipientPath) match { case OptionVal.None ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala index 03b1a4b6a6..63886945d5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala @@ -13,15 +13,17 @@ import akka.util.OptionVal * Literarily, no compression! */ case object NoInboundCompressions extends InboundCompressions { - override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () + override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else OptionVal.None + override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () - override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () + override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = () override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else OptionVal.None + override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () } /** diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala index 04eb467c6a..fdbc067d61 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala @@ -17,14 +17,14 @@ private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remote private val actorRefsOut = new OutboundActorRefCompression(system, remoteAddress) private val classManifestsOut = new OutboundClassManifestCompression(system, remoteAddress) - // actor ref compression --- + // actor ref compression --- override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) override def actorRefCompressionTableVersion: Int = actorRefsOut.activeCompressionTableVersion override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = actorRefsOut.flipTable(table) - // class manifest compression --- + // class manifest compression --- override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) override def classManifestCompressionTableVersion: Int = classManifestsOut.activeCompressionTableVersion @@ -39,8 +39,7 @@ private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remote */ private[remote] final class InboundCompressionsImpl( system: ActorSystem, - inboundContext: InboundContext -) extends InboundCompressions { + inboundContext: InboundContext) extends InboundCompressions { private val settings = CompressionSettings(system) @@ -65,22 +64,23 @@ private[remote] final class InboundCompressionsImpl( private def classManifestsIn(originUid: Long): InboundManifestCompression = _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin) - // actor ref compression --- + // actor ref compression --- - override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = { + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = actorRefsIn(originUid).decompress(tableVersion, idx) - } - override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef): Unit = { - actorRefsIn(originUid).increment(address, ref, 1L) - } + override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef, n: Int): Unit = + actorRefsIn(originUid).increment(address, ref, n) + override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = + actorRefsIn(originUid).confirmAdvertisement(tableVersion) - // class manifest compression --- + // class manifest compression --- override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = classManifestsIn(originUid).decompress(tableVersion, idx) - override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String): Unit = { - classManifestsIn(originUid).increment(address, manifest, 1L) - } + override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String, n: Int): Unit = + classManifestsIn(originUid).increment(address, manifest, n) + override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = + actorRefsIn(originUid).confirmAdvertisement(tableVersion) // testing utilities --- diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala index c0729636ed..e90db8bd31 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala @@ -14,11 +14,13 @@ import akka.util.OptionVal * One per inbound message stream thus must demux by originUid to use the right tables. */ private[remote] trait InboundCompressions { - def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit + def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] + def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit - def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit + def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] + def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit } /** * INTERNAL API diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala index 5cee5f77f3..6aeada84d5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala @@ -9,7 +9,7 @@ import akka.actor.{ ActorRef, Address } import akka.remote.UniqueAddress import akka.remote.artery.ControlMessage -// FIXME serialization +// FIXME serialization /** INTERNAL API */ object CompressionProtocol { @@ -23,6 +23,16 @@ object CompressionProtocol { private[remote] final case class ActorRefCompressionAdvertisement(from: UniqueAddress, table: CompressionTable[ActorRef]) extends ControlMessage with CompressionMessage + /** + * INTERNAL API + * Sent by the "sending" node after receiving [[ActorRefCompressionAdvertisement]] + * The advertisement is also confirmed by the first message using that table version, + * but we need separate ack in case the sender is not using any of the refs in the advertised + * table. + */ + private[remote] final case class ActorRefCompressionAdvertisementAck(from: UniqueAddress, tableVersion: Int) + extends ControlMessage with CompressionMessage + /** * INTERNAL API * Sent by the "receiving" node after allocating a compression id to a given class manifest @@ -30,6 +40,16 @@ object CompressionProtocol { private[remote] final case class ClassManifestCompressionAdvertisement(from: UniqueAddress, table: CompressionTable[String]) extends ControlMessage with CompressionMessage + /** + * INTERNAL API + * Sent by the "sending" node after receiving [[ClassManifestCompressionAdvertisement]] + * The advertisement is also confirmed by the first message using that table version, + * but we need separate ack in case the sender is not using any of the refs in the advertised + * table. + */ + private[remote] final case class ClassManifestCompressionAdvertisementAck(from: UniqueAddress, tableVersion: Int) + extends ControlMessage with CompressionMessage + /** INTERNAL API */ private[akka] object Events { /** INTERNAL API */ @@ -39,7 +59,10 @@ object CompressionProtocol { final case class HeavyHitterDetected(key: Any, id: Int, count: Long) extends Event /** INTERNAL API */ - final case class ReceivedCompressionTable[T](from: UniqueAddress, table: CompressionTable[T]) extends Event + final case class ReceivedActorRefCompressionTable(from: UniqueAddress, table: CompressionTable[ActorRef]) extends Event + + /** INTERNAL API */ + final case class ReceivedClassManifestCompressionTable(from: UniqueAddress, table: CompressionTable[String]) extends Event } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala index 4e5b74bf0b..e25ce85489 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala @@ -25,8 +25,7 @@ private[remote] final case class DecompressionTable[T](version: Int, table: Arra s"(version: $version, " + ( if (length == 0) "[empty]" - else s"table: [${table.zipWithIndex.map({ case (t, i) ⇒ s"$i -> $t" }).mkString(",")}" - ) + "])" + else s"table: [${table.zipWithIndex.map({ case (t, i) ⇒ s"$i -> $t" }).mkString(",")}") + "])" } /** INTERNAL API */ diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 6ec6863585..7fd5b5bfd4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -8,8 +8,8 @@ import akka.actor.{ ActorRef, ActorSystem, Address } import akka.event.{ Logging, NoLogging } import akka.remote.artery.{ InboundContext, OutboundContext } import akka.util.{ OptionVal, PrettyDuration } - import scala.concurrent.duration.{ Duration, FiniteDuration } +import java.util.concurrent.atomic.AtomicReference /** * INTERNAL API @@ -33,9 +33,9 @@ private[remote] final class InboundActorRefCompression( allocations foreach { case ref ⇒ increment(null, ref, 100000) } } - override def decompress(tableId: Long, idx: Int): OptionVal[ActorRef] = + override def decompress(tableVersion: Int, idx: Int): OptionVal[ActorRef] = if (idx == 0) OptionVal.Some(system.deadLetters) - else super.decompress(tableId, idx) + else super.decompress(tableVersion, idx) scheduleNextTableAdvertisement() override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval @@ -63,6 +63,34 @@ final class InboundManifestCompression( outboundContext.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table)) } } +/** + * INTERNAL API + */ +private[remote] object InboundCompression { + + object State { + def empty[T] = State( + oldTable = DecompressionTable.empty[T].copy(version = -1), + activeTable = DecompressionTable.empty[T], + nextTable = DecompressionTable.empty[T].copy(version = 1), + advertisementInProgress = None) + } + + final case class State[T]( + oldTable: DecompressionTable[T], + activeTable: DecompressionTable[T], + nextTable: DecompressionTable[T], + advertisementInProgress: Option[CompressionTable[T]]) { + + def startUsingNextTable(): State[T] = + State( + oldTable = activeTable, + activeTable = nextTable, + nextTable = DecompressionTable.empty[T].copy(version = nextTable.version + 1), + advertisementInProgress = None) + } + +} /** * INTERNAL API @@ -77,19 +105,13 @@ private[remote] abstract class InboundCompression[T >: Null]( lazy val log = Logging(system, getClass.getSimpleName) - // TODO atomic / state machine? the InbouncCompression could even extend ActomicReference[State]! - // TODO NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet, // yet we technically could already prepare table 3, then it starts using table 1 suddenly. Edge cases like that. // SOLUTION 1: We don't start building new tables until we've seen the previous one be used (move from new to active) // This is nice as it practically disables all the "build the table" work when the other side is not interested in using it. // SOLUTION 2: We end up dropping messages when old table comes in (we do that anyway) - // TODO have a marker that "advertised table XXX", so we don't generate a new-new one until the new one is in use? - - // 2 tables are used, one is "still in use", and the - @volatile private[this] var activeTable = DecompressionTable.empty[T] - @volatile private[this] var nextTable = DecompressionTable.empty[T].copy(version = 1) + private[this] val state: AtomicReference[InboundCompression.State[T]] = new AtomicReference(InboundCompression.State.empty) // TODO calibrate properly (h/w have direct relation to preciseness and max capacity) private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt) @@ -104,30 +126,50 @@ private[remote] abstract class InboundCompression[T >: Null]( * @throws UnknownCompressedIdException if given id is not known, this may indicate a bug – such situation should not happen. */ // not tailrec because we allow special casing in sub-class, however recursion is always at most 1 level deep - def decompress(incomingTableVersion: Long, idx: Int): OptionVal[T] = { - val activeVersion = activeTable.version + def decompress(incomingTableVersion: Int, idx: Int): OptionVal[T] = { + val current = state.get + val oldVersion = current.oldTable.version + val activeVersion = current.activeTable.version if (incomingTableVersion == -1) OptionVal.None // no compression, bail out early else if (incomingTableVersion == activeVersion) { - val value: T = activeTable.get(idx) + val value: T = current.activeTable.get(idx) + if (value != null) OptionVal.Some[T](value) + else throw new UnknownCompressedIdException(idx) + } else if (incomingTableVersion == oldVersion) { + // must handle one old table due to messages in flight during advertisement + val value: T = current.oldTable.get(idx) if (value != null) OptionVal.Some[T](value) else throw new UnknownCompressedIdException(idx) } else if (incomingTableVersion < activeVersion) { log.warning("Received value compressed with old table: [{}], current table version is: [{}]", incomingTableVersion, activeVersion) OptionVal.None - } else if (incomingTableVersion == nextTable.version) { - advertisementInProgress = false - log.debug("Received first value compressed using the next prepared compression table, flipping to it (version: {})", nextTable.version) - startUsingNextTable() + } else if (incomingTableVersion == current.nextTable.version) { + log.debug( + "Received first value compressed using the next prepared compression table, flipping to it (version: {})", + current.nextTable.version) + confirmAdvertisement(incomingTableVersion) decompress(incomingTableVersion, idx) // recurse, activeTable will not be able to handle this } else { // which means that incoming version was > nextTable.version, which likely is a bug log.error( "Inbound message is using compression table version higher than the highest allocated table on this node. " + "This should not happen! State: activeTable: {}, nextTable: {}, incoming tableVersion: {}", - activeVersion, nextTable.version, incomingTableVersion) + activeVersion, current.nextTable.version, incomingTableVersion) OptionVal.None } + } + + def confirmAdvertisement(tableVersion: Int): Unit = { + val current = state.get + current.advertisementInProgress match { + case Some(inProgress) if tableVersion == inProgress.version ⇒ + if (state.compareAndSet(current, current.startUsingNextTable())) + log.debug("Confirmed compression table version {}", tableVersion) + case Some(inProgress) if tableVersion != inProgress.version ⇒ + log.debug("Confirmed compression table version {} but in progress {}", tableVersion, inProgress.version) + case None ⇒ // already confirmed + } } @@ -182,9 +224,6 @@ private[remote] abstract class InboundCompression[T >: Null]( finally scheduleNextTableAdvertisement() } - // FIXME use AtomicBoolean instead? - @volatile private[this] var advertisementInProgress = false - /** * Entry point to advertising a new compression table. * @@ -195,20 +234,34 @@ private[remote] abstract class InboundCompression[T >: Null]( * It must be advertised to the other side so it can start using it in its outgoing compression. * Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing. */ - private[remote] def runNextTableAdvertisement() = - if (!advertisementInProgress) - inboundContext.association(originUid) match { - case OptionVal.Some(association) ⇒ - advertisementInProgress = true - val table = prepareCompressionAdvertisement() - nextTable = table.invert // TODO expensive, check if building the other way wouldn't be faster? - advertiseCompressionTable(association, table) + private[remote] def runNextTableAdvertisement() = { + val current = state.get + current.advertisementInProgress match { + case None ⇒ + inboundContext.association(originUid) match { + case OptionVal.Some(association) ⇒ + val table = prepareCompressionAdvertisement(current.nextTable.version) + // TODO expensive, check if building the other way wouldn't be faster? + val nextState = current.copy(nextTable = table.invert, advertisementInProgress = Some(table)) + if (state.compareAndSet(current, nextState)) + advertiseCompressionTable(association, table) - case OptionVal.None ⇒ - // otherwise it's too early, association not ready yet. - // so we don't build the table since we would not be able to send it anyway. - log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid) - } + case OptionVal.None ⇒ + // otherwise it's too early, association not ready yet. + // so we don't build the table since we would not be able to send it anyway. + log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid) + } + + case Some(inProgress) ⇒ + // The ActorRefCompressionAdvertisement message is resent because it can be lost + log.debug("Advertisment in progress for version {}, resending", inProgress.version) + inboundContext.association(originUid) match { + case OptionVal.Some(association) ⇒ + advertiseCompressionTable(association, inProgress) // resend + case OptionVal.None ⇒ + } + } + } /** * Must be implementeed by extending classes in order to send a [[akka.remote.artery.ControlMessage]] @@ -216,17 +269,9 @@ private[remote] abstract class InboundCompression[T >: Null]( */ protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit - /** Drop `activeTable` and start using the `nextTable` in its place. */ - private def startUsingNextTable(): Unit = { - log.debug("Swaping active decompression table to version {}.", nextTable.version) - activeTable = nextTable - nextTable = DecompressionTable.empty - // TODO we want to keep the currentTableVersion in State too, update here as well then - } - - private def prepareCompressionAdvertisement(): CompressionTable[T] = { + private def prepareCompressionAdvertisement(nextTableVersion: Int): CompressionTable[T] = { // TODO surely we can do better than that, optimise - CompressionTable(activeTable.version + 1, Map(heavyHitters.snapshot.filterNot(_ == null).zipWithIndex: _*)) + CompressionTable(nextTableVersion, Map(heavyHitters.snapshot.filterNot(_ == null).zipWithIndex: _*)) } override def toString = diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala index c70dd95a54..59756afe32 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala @@ -61,7 +61,7 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd else flipTable(activate) // retry else if (state.version == activate.version) - log.warning("Received duplicate compression table (version: {})! Ignoring it.", state.version) + log.debug("Received duplicate compression table (version: {})! Ignoring it.", state.version) else log.error("Received unexpected compression table with version nr [{}]! " + "Current version number is [{}].", activate.version, state.version) diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index cc013c3646..d9a3040a28 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -34,14 +34,16 @@ class EnvelopeBufferSpec extends AkkaSpec { override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests override def actorRefCompressionTableVersion: Int = 0 override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1) - override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () + override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx)) + override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests override def classManifestCompressionTableVersion: Int = 0 override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1) - override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () + override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = () override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx)) + override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () } "EnvelopeBuffer" must { diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index f312f118d1..9e3b602bd0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -22,7 +22,7 @@ object CompressionIntegrationSpec { val commonConfig = ConfigFactory.parseString(s""" akka { loglevel = INFO - + actor.provider = "akka.remote.RemoteActorRefProvider" remote.artery.enabled = on remote.artery.advanced { @@ -32,7 +32,7 @@ object CompressionIntegrationSpec { remote.artery.hostname = localhost remote.artery.port = 0 remote.handshake-timeout = 10s - + remote.artery.advanced.compression { enabled = on actor-refs { @@ -40,7 +40,7 @@ object CompressionIntegrationSpec { advertisement-interval = 3 seconds } } - + } """) @@ -75,7 +75,7 @@ class CompressionIntegrationSpec extends AkkaSpec(CompressionIntegrationSpec.com // cause testActor-1 to become a heavy hitter (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised - val a1 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) + val a1 = aProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](10.seconds) info("System [A] received: " + a1) assertCompression[ActorRef](a1.table, 0, _ should ===(system.deadLetters)) assertCompression[ActorRef](a1.table, 1, _ should ===(testActor)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala index 9872374252..7bdd8973e8 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala @@ -5,7 +5,7 @@ package akka.remote.artery.compress /* INTERNAL API */ -private[remote] trait CompressionTestKit { +private[akka] trait CompressionTestKit { def assertCompression[T](table: CompressionTable[T], id: Int, assertion: T ⇒ Unit): Unit = { table.map.find(_._2 == id) .orElse { throw new AssertionError(s"No key was compressed to the id [$id]! Table was: $table") } @@ -14,4 +14,4 @@ private[remote] trait CompressionTestKit { } /* INTERNAL API */ -private[remote] object CompressionTestKit extends CompressionTestKit +private[akka] object CompressionTestKit extends CompressionTestKit diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala index efb1fa4ef9..4ee33cb9a0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -11,7 +11,7 @@ import akka.util.Timeout import akka.pattern.ask import akka.remote.RARP import akka.remote.artery.ArteryTransport -import akka.remote.artery.compress.CompressionProtocol.Events.{ Event, ReceivedCompressionTable } +import akka.remote.artery.compress.CompressionProtocol.Events.{ Event, ReceivedActorRefCompressionTable } import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfter @@ -82,7 +82,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr waitForEcho(this, s"hello-$messagesToExchange") systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) - val a0 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) + val a0 = aProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds) info("System [A] received: " + a0) a0.table.map.keySet should contain(testActor) @@ -91,7 +91,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr waitForEcho(a1Probe, s"hello-$messagesToExchange") systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) - val a1 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) + val a1 = aProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds) info("System [A] received: " + a1) a1.table.map.keySet should contain(a1Probe.ref) @@ -113,7 +113,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr waitForEcho(this, s"hello-$messagesToExchange", max = 10.seconds) systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) - val a2 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) + val a2 = aNewProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds) info("System [A] received: " + a2) a2.table.map.keySet should contain(testActor) @@ -122,7 +122,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr waitForEcho(aNew2Probe, s"hello-$messagesToExchange") systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) - val a3 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) + val a3 = aNewProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds) info("Received second compression: " + a3) a3.table.map.keySet should contain(aNew2Probe.ref) } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index cd7ac5e57b..87856727ef 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -82,7 +82,7 @@ object AkkaBuild extends Build { protobuf, remote, remoteTests, - samples, +// samples, slf4j, stream, streamTestkit,