diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index e7e4233284..f08d5f87af 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -58,7 +58,7 @@ class CodecBenchmark { private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) val compressionOut = NoOutboundCompressions - val headerIn = HeaderBuilder.in(NoopInboundCompressions) + val headerIn = HeaderBuilder.in(NoInboundCompressions) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) val uniqueLocalAddress = UniqueAddress( @@ -166,7 +166,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, NoopInboundCompressions, envelopePool, inboundEnvelopePool)) + resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map { _ => @@ -207,7 +207,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, NoopInboundCompressions, envelopePool, inboundEnvelopePool)) + resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 2bdedd2681..3dfee899bd 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -6,9 +6,11 @@ package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.NANOSECONDS + import scala.concurrent.duration._ import akka.actor._ import akka.remote.RemoteActorRefProvider +import akka.remote.artery.compress.CompressionSettings import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -54,12 +56,14 @@ object MaxThroughputSpec extends MultiNodeConfig { # See akka-remote-tests/src/test/resources/aeron.properties #advanced.embedded-media-driver = off #advanced.aeron-dir = "target/aeron" - - #advanced.compression { - # enabled = on - # actor-refs.enabled = on - # manifests.enabled = on - #} + + advanced.compression { + enabled = off + actor-refs { + enabled = on + advertisement-interval = 1 second + } + } } } """))) @@ -137,11 +141,14 @@ object MaxThroughputSpec extends MultiNodeConfig { println( s"=== MaxThroughput ${self.path.name}: " + f"throughput ${throughput * testSettings.senderReceiverPairs}%,.0f msg/s, " + - f"${throughput * payloadSize * testSettings.senderReceiverPairs}%,.0f bytes/s, " + + f"${throughput * payloadSize * testSettings.senderReceiverPairs}%,.0f bytes/s (payload), " + + f"${throughput * totalSize(context.system) * testSettings.senderReceiverPairs}%,.0f bytes/s (total" + + (if (CompressionSettings(context.system).enabled) ",compression" else "") + "), " + s"dropped ${totalMessages - totalReceived}, " + s"max round-trip $maxRoundTripMillis ms, " + s"burst size $burstSize, " + s"payload size $payloadSize, " + + s"total size ${totalSize(context.system)}, " + s"$took ms to deliver $totalReceived messages") plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024) context.stop(self) @@ -171,7 +178,10 @@ object MaxThroughputSpec extends MultiNodeConfig { totalMessages: Long, burstSize: Int, payloadSize: Int, - senderReceiverPairs: Int) + senderReceiverPairs: Int) { + // data based on measurement + def totalSize(system: ActorSystem) = payloadSize + (if (CompressionSettings(system).enabled) 38 else 110) + } class TestSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with ByteBufferSerializer { @@ -270,7 +280,8 @@ abstract class MaxThroughputSpec totalMessages = adjustedTotalMessages(20000), burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000 payloadSize = 100, - senderReceiverPairs = 5)) + senderReceiverPairs = 5) + ) def test(testSettings: TestSettings): Unit = { import testSettings._ diff --git a/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java b/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java index 37b2baffad..6041cb8c78 100644 --- a/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java +++ b/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java @@ -4,15 +4,15 @@ package akka.remote.artery.compress; -import akka.actor.Actor; import akka.actor.ActorRef; -import java.io.UnsupportedEncodingException; import java.util.Random; /** * INTERNAL API: Count-Min Sketch datastructure. - * + * + * Not thread-safe. + * * An Improved Data Stream Summary: The Count-Min Sketch and its Applications * https://web.archive.org/web/20060907232042/http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf * @@ -31,14 +31,19 @@ public class CountMinSketch { private double eps; private double confidence; + private int[] recyclableCMSHashBuckets; + + public CountMinSketch(int depth, int width, int seed) { this.depth = depth; this.width = width; this.eps = 2.0 / width; this.confidence = 1 - 1 / Math.pow(2, depth); + recyclableCMSHashBuckets = preallocateHashBucketsArray(depth); initTablesWith(depth, width, seed); } + @SuppressWarnings("unused") public CountMinSketch(double epsOfTotalCount, double confidence, int seed) { // 2/w = eps ; w = 2/eps // 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence) @@ -46,10 +51,12 @@ public class CountMinSketch { this.confidence = confidence; this.width = (int) Math.ceil(2 / epsOfTotalCount); this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2)); + recyclableCMSHashBuckets = preallocateHashBucketsArray(depth); initTablesWith(depth, width, seed); } - CountMinSketch(int depth, int width, int size, long[] hashA, long[][] table) { + @SuppressWarnings("unused") + public CountMinSketch(int depth, int width, int size, long[] hashA, long[][] table) { this.depth = depth; this.width = width; this.eps = 2.0 / width; @@ -57,6 +64,7 @@ public class CountMinSketch { this.hashA = hashA; this.table = table; this.size = size; + recyclableCMSHashBuckets = preallocateHashBucketsArray(depth); } private void initTablesWith(int depth, int width, int seed) { @@ -75,11 +83,11 @@ public class CountMinSketch { } /** Referred to as {@code epsilon} in the whitepaper */ - public double getRelativeError() { + public double relativeError() { return eps; } - public double getConfidence() { + public double confidence() { return confidence; } @@ -108,7 +116,7 @@ public class CountMinSketch { size += count; } - public void add(String item, long count) { + public void addObject(Object item, long count) { if (count < 0) { // Actually for negative increments we'll need to use the median // instead of minimum, and accuracy will suffer somewhat. @@ -116,19 +124,18 @@ public class CountMinSketch { // parameter to constructor. throw new IllegalArgumentException("Negative increments not implemented"); } - // TODO we could reuse the arrays - final int[] buckets = MurmurHash.hashBuckets(item, depth, width); // TODO replace with Scala's Murmur3, it's much faster + MurmurHash.hashBuckets(item, recyclableCMSHashBuckets, width); for (int i = 0; i < depth; ++i) { - table[i][buckets[i]] += count; + table[i][recyclableCMSHashBuckets[i]] += count; } size += count; } - + /** * Similar to {@code add}, however we reuse the fact that the hask buckets have to be calculated for {@code add} * already, and a separate {@code estimateCount} operation would have to calculate them again, so we do it all in one go. */ - public long addAndEstimateCount(String item, long count) { + public long addObjectAndEstimateCount(Object item, long count) { if (count < 0) { // Actually for negative increments we'll need to use the median // instead of minimum, and accuracy will suffer somewhat. @@ -136,14 +143,14 @@ public class CountMinSketch { // parameter to constructor. throw new IllegalArgumentException("Negative increments not implemented"); } - final int[] buckets = MurmurHash.hashBuckets(item, depth, width); + MurmurHash.hashBuckets(item, recyclableCMSHashBuckets, width); for (int i = 0; i < depth; ++i) { - table[i][buckets[i]] += count; + table[i][recyclableCMSHashBuckets[i]] += count; } size += count; - return estimateCount(buckets); + return estimateCount(recyclableCMSHashBuckets); } - + public long size() { return size; } @@ -160,15 +167,6 @@ public class CountMinSketch { return res; } - /** - * The estimate is correct within {@code 'epsilon' * (total item count)}, - * with probability {@code confidence}. - */ - public long estimateCount(String item) { - int[] buckets = MurmurHash.hashBuckets(item, depth, width); - return estimateCount(buckets); - } - /** * The estimate is correct within {@code 'epsilon' * (total item count)}, * with probability {@code confidence}. @@ -198,7 +196,6 @@ public class CountMinSketch { // TODO replace with Scala's Murmur3, it's much faster private static class MurmurHash { - // FIXME: This overload isn't actually ever used public static int hash(Object o) { if (o == null) { return 0; @@ -288,10 +285,14 @@ public class CountMinSketch { } public static int hashLong(long data) { + return hashLong(data, 0); + } + public static int hashLong(long data, int seed) { int m = 0x5bd1e995; int r = 24; - int h = 0; + int h = seed; + // int h = seed ^ length; int k = (int) data * m; k ^= k >>> r; @@ -314,28 +315,41 @@ public class CountMinSketch { // http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf // does prove to work in actual tests, and is obviously faster // than performing further iterations of murmur. - public static int[] hashBuckets(String key, int hashCount, int max) { - byte[] b; - try { - b = key.getBytes("UTF-16");// TODO Use the Unsafe trick @patriknw used to access the backing array directly -- via Endre - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - return hashBuckets(b, hashCount, max); - } - - static int[] hashBuckets(byte[] b, int hashCount, int max) { - // TODO we could reuse the arrays - int[] result = new int[hashCount]; - int hash1 = hash(b, b.length, 0); - int hash2 = hash(b, b.length, hash1); - for (int i = 0; i < hashCount; i++) { - result[i] = Math.abs((hash1 + i * hash2) % max); - } - return result; +// public static int[] hashBuckets(String key, int hashCount, int max) { +// byte[] b; +// try { +// b = key.getBytes("UTF-16");// TODO Use the Unsafe trick @patriknw used to access the backing array directly -- via Endre +// } catch (UnsupportedEncodingException e) { +// throw new RuntimeException(e); +// } +// return hashBuckets(b, hashCount, max); +// } + +// static int[] hashBuckets(byte[] b, int hashCount, int max) { +// // TODO we could reuse the arrays +// int[] result = preallocateHashBucketsArray(hashCount); +// int hash1 = hash(b, b.length, 0); +// int hash2 = hash(b, b.length, hash1); +// for (int i = 0; i < hashCount; i++) { +// result[i] = Math.abs((hash1 + i * hash2) % max); +// } +// return result; +// } + + /** Mutates passed in {@code hashBuckets} */ + static void hashBuckets(Object item, int[] hashBuckets, int max) { + int hash1 = hash(item); // specialized hash for ActorRef and Strings + int hash2 = hashLong(hash1, hash1); + final int depth = hashBuckets.length; + for (int i = 0; i < depth; i++) + hashBuckets[i] = Math.abs((hash1 + i * hash2) % max); } } + public int[] preallocateHashBucketsArray(int depth) { + return new int[depth]; + } + @Override public String toString() { return "CountMinSketch{" + diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 5daec306a9..42dcb5bad5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -5,9 +5,9 @@ package akka.remote.artery import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit + import akka.remote.artery.compress.CompressionProtocol.CompressionMessage -import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ @@ -33,7 +33,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec -import akka.remote.artery.compress.{ InboundCompressionsImpl, CompressionProtocol } +import akka.remote.artery.compress._ import akka.stream.AbruptTerminationException import akka.stream.ActorMaterializer import akka.stream.KillSwitches @@ -57,16 +57,20 @@ import org.agrona.IoUtil import java.io.File import java.net.InetSocketAddress import java.nio.channels.{ DatagramChannel, FileChannel } + import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import io.aeron.CncFileDescriptor import java.util.concurrent.atomic.AtomicLong + import scala.collection.JavaConverters._ import akka.stream.ActorMaterializerSettings + import scala.annotation.tailrec import akka.util.OptionVal import io.aeron.driver.ThreadingMode import org.agrona.concurrent.BackoffIdleStrategy import org.agrona.concurrent.BusySpinIdleStrategy + import scala.util.control.NonFatal import akka.actor.Props import akka.actor.Actor @@ -113,7 +117,7 @@ private[akka] object AssociationState { incarnation = 1, uniqueRemoteAddressPromise = Promise(), quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], - outboundCompression = NoOutboundCompressions) + outboundCompressions = NoOutboundCompressions) final case class QuarantinedTimestamp(nanoTime: Long) { override def toString: String = @@ -128,7 +132,7 @@ private[akka] final class AssociationState( val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp], - val outboundCompression: OutboundCompressions) { + val outboundCompressions: OutboundCompressions) { import AssociationState.QuarantinedTimestamp @@ -154,6 +158,9 @@ private[akka] final class AssociationState( } } + def withCompression(compression: OutboundCompressions) = + new AssociationState(incarnation, uniqueRemoteAddressPromise, quarantined, compression) + def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompressions): AssociationState = new AssociationState(incarnation + 1, remoteAddressPromise, quarantined, compression) @@ -164,7 +171,7 @@ private[akka] final class AssociationState( incarnation, uniqueRemoteAddressPromise, quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), - outboundCompression = NoOutboundCompressions) // after quarantine no compression needed anymore, drop it + outboundCompressions = NoOutboundCompressions) // after quarantine no compression needed anymore, drop it case _ ⇒ this } @@ -283,6 +290,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _ + // this is only used to allow triggering compression advertisements or state from tests + @volatile private[this] var activeCompressions = Set.empty[InboundCompressions] + override def localAddress: UniqueAddress = _localAddress override def defaultAddress: Address = localAddress.address override def addresses: Set[Address] = _addresses @@ -416,7 +426,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .threadingMode(ThreadingMode.DEDICATED) .conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1)) .receiverIdleStrategy(new BusySpinIdleStrategy) - .senderIdleStrategy(new BusySpinIdleStrategy); + .senderIdleStrategy(new BusySpinIdleStrategy) } else if (remoteSettings.IdleCpuLevel == 1) { driverContext .threadingMode(ThreadingMode.SHARED) @@ -505,10 +515,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundStreams(): Unit = { - val noCompressions = new NoInboundCompressions(system) // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082 + val noCompressions = NoInboundCompressions // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082 val compressions = createInboundCompressions(this) - runInboundControlStream(noCompressions) + runInboundControlStream(noCompressions) // TODO should understand compressions too runInboundOrdinaryMessagesStream(compressions) if (largeMessageDestinationsEnabled) { runInboundLargeMessagesStream() @@ -542,12 +552,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R m match { case CompressionProtocol.ActorRefCompressionAdvertisement(from, table) ⇒ log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table) - association(from.address).compression.applyActorRefCompressionTable(table) + association(from.address).outboundCompression.applyActorRefCompressionTable(table) system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) case CompressionProtocol.ClassManifestCompressionAdvertisement(from, table) ⇒ log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table) - association(from.address).compression.applyClassManifestCompressionTable(table) + association(from.address).outboundCompression.applyClassManifestCompressionTable(table) system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) } @@ -592,12 +602,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundLargeMessagesStream(): Unit = { - val compression = new NoInboundCompressions(system) // no compression on large message stream for now + val disableCompression = NoInboundCompressions // no compression on large message stream for now val completed = if (remoteSettings.TestMode) { val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool) - .via(inboundLargeFlow(compression)) + .via(inboundLargeFlow(disableCompression)) .viaMat(inboundTestFlow)(Keep.right) .toMat(inboundSink)(Keep.both) .run()(materializer) @@ -605,7 +615,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R c } else { aeronSource(largeStreamId, largeEnvelopePool) - .via(inboundLargeFlow(compression)) + .via(inboundLargeFlow(disableCompression)) .toMat(inboundSink)(Keep.right) .run()(materializer) } @@ -758,8 +768,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool)) private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = - if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) - else new NoInboundCompressions(system) + if (remoteSettings.ArteryCompressionSettings.enabled) { + val comp = new InboundCompressionsImpl(system, inboundContext) + activeCompressions += comp + comp + } else NoInboundCompressions def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool)) @@ -779,8 +792,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { - val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = + val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = { recipient ⇒ provider.resolveActorRefWithLocalAddress(recipient, localAddress.address) + } Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool, inboundEnvelopePool)) } @@ -833,6 +847,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def outboundTestFlow(association: Association): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] = Flow.fromGraph(new OutboundTestStage(association)) + /** INTERNAL API: for testing only. */ + private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = { + activeCompressions.foreach { + case c: InboundCompressionsImpl if actorRef || manifest ⇒ + log.info("Triggering compression table advertisement for {}", c) + if (actorRef) c.runNextActorRefAdvertisement() + if (manifest) c.runNextClassManifestAdvertisement() + case _ ⇒ + } + } + } /** diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index affceabd77..fdbfca9747 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -9,7 +9,9 @@ import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable, OutboundCompressionsImpl } + +import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable, OutboundCompressions, OutboundCompressionsImpl } + import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise @@ -82,7 +84,7 @@ private[remote] class Association( // start sending (enqueuing) to the Association immediate after construction. /** Accesses the currently active outbound compression. */ - def compression: OutboundCompressions = associationState.outboundCompression + def outboundCompression: OutboundCompressions = associationState.outboundCompressions def createQueue(capacity: Int): Queue[OutboundEnvelope] = new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) @@ -144,9 +146,14 @@ private[remote] class Association( val current = associationState current.uniqueRemoteAddressPromise.trySuccess(peer) current.uniqueRemoteAddressValue() match { - case Some(`peer`) ⇒ // our value + case Some(`peer`) ⇒ + // our value + if (current.outboundCompressions == NoOutboundCompressions) { + // enable outbound compression (here, since earlier we don't know the remote address) + swapState(current, current.withCompression(createOutboundCompressions(remoteAddress))) + } case _ ⇒ - val newState = current.newIncarnation(Promise.successful(peer), createOutboundCompressionTable(remoteAddress)) + val newState = current.newIncarnation(Promise.successful(peer), createOutboundCompressions(remoteAddress)) if (swapState(current, newState)) { current.uniqueRemoteAddressValue() match { case Some(old) ⇒ @@ -407,11 +414,10 @@ private[remote] class Association( } // TODO: Make sure that once other channels use Compression, each gets it's own - private def createOutboundCompressionTable(remoteAddress: Address): OutboundCompressions = { + private def createOutboundCompressions(remoteAddress: Address): OutboundCompressions = { if (transport.provider.remoteSettings.ArteryCompressionSettings.enabled) { val compression = new OutboundCompressionsImpl(transport.system, remoteAddress) - // FIXME should use verion number of table instead of hashCode - log.info("Creating Outbound compression table ({}) to [{}]", compression.hashCode, remoteAddress) + log.debug("Creating Outbound compression table to [{}]", remoteAddress) compression } else NoOutboundCompressions } @@ -421,17 +427,25 @@ private[remote] class Association( * This way the same outgoing stream will switch to using the new table without the need of restarting it. */ private object CurrentAssociationStateOutboundCompressionsProxy extends OutboundCompressions { - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = - associationState.outboundCompression.applyActorRefCompressionTable(table) + override def actorRefCompressionTableVersion: Int = + associationState.outboundCompressions.actorRefCompressionTableVersion + override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = { + associationState.outboundCompressions.applyActorRefCompressionTable(table) + } + override final def compressActorRef(ref: ActorRef): Int = { + associationState.outboundCompressions.compressActorRef(ref) + } + + override def classManifestCompressionTableVersion: Int = + associationState.outboundCompressions.classManifestCompressionTableVersion override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = - associationState.outboundCompression.applyClassManifestCompressionTable(table) - - override final def compressActorRef(ref: ActorRef): Int = - associationState.outboundCompression.compressActorRef(ref) - + associationState.outboundCompressions.applyClassManifestCompressionTable(table) override final def compressClassManifest(manifest: String): Int = - associationState.outboundCompression.compressClassManifest(manifest) + associationState.outboundCompressions.compressClassManifest(manifest) + + override def toString = + s"${Logging.simpleName(getClass)}(current delegate: ${associationState.outboundCompressions})" } override def toString: String = diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 77b865c4b4..d529c1d42e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -9,7 +9,7 @@ import java.nio.{ ByteBuffer, ByteOrder } import akka.actor.{ ActorRef, Address } import akka.remote.artery.compress.CompressionProtocol._ -import akka.remote.artery.compress.{ CompressionTable, NoopInboundCompressions, NoopOutboundCompressions } +import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions } import akka.serialization.Serialization import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } import akka.util.{ OptionVal, Unsafe } @@ -54,6 +54,7 @@ private[remote] object EnvelopeBuffer { val VersionOffset = 0 // Int val UidOffset = 4 // Long val SerializerOffset = 12 // Int + val SenderActorRefTagOffset = 16 // Int val RecipientActorRefTagOffset = 20 // Int val ClassManifestTagOffset = 24 // Int @@ -70,41 +71,14 @@ private[remote] object EnvelopeBuffer { val StringValueFieldOffset = Unsafe.instance.objectFieldOffset(classOf[String].getDeclaredField("value")) } -/** - * INTERNAL API - * Decompress and cause compression advertisements. - * - * One per inbound message stream thus must demux by originUid to use the right tables. - */ -private[remote] trait InboundCompressions { - def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit - def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] - - def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit - def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] -} -/** - * INTERNAL API - * Compress outgoing data and handle compression advertisements to fill compression table. - * - * One per outgoing message stream. - */ -private[remote] trait OutboundCompressions { - def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit - def compressActorRef(ref: ActorRef): Int - - def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit - def compressClassManifest(manifest: String): Int -} - /** INTERNAL API */ private[remote] object HeaderBuilder { // We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl // we inject no-op compression's of the "other side". - def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoopOutboundCompressions) - def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoopInboundCompressions, compression) + def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoOutboundCompressions) + def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoInboundCompressions, compression) /** INTERNAL API, FOR TESTING ONLY */ private[remote] def bothWays(in: InboundCompressions, out: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(in, out) @@ -170,8 +144,8 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres // Fields only available for EnvelopeBuffer var _version: Int = _ var _uid: Long = _ - var _actorRefCompressionTableVersion: Int = -1 - var _classManifestCompressionTableVersion: Int = -1 + var _actorRefCompressionTableVersion: Int = 0 + var _classManifestCompressionTableVersion: Int = 0 var _senderActorRef: String = null var _senderActorRefIdx: Int = -1 @@ -247,17 +221,17 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres override def toString = "HeaderBuilderImpl(" + - version + ", " + - actorRefCompressionTableVersion + ", " + - classManifestCompressionTableVersion + ", " + - uid + ", " + - _senderActorRef + ", " + - _senderActorRefIdx + ", " + - _recipientActorRef + ", " + - _recipientActorRefIdx + ", " + - _serializer + ", " + - _manifest + ", " + - _manifestIdx + ")" + "version:" + version + ", " + + "actorRefCompressionTableVersion:" + actorRefCompressionTableVersion + ", " + + "classManifestCompressionTableVersion:" + classManifestCompressionTableVersion + ", " + + "uid:" + uid + ", " + + "_senderActorRef:" + _senderActorRef + ", " + + "_senderActorRefIdx:" + _senderActorRefIdx + ", " + + "_recipientActorRef:" + _recipientActorRef + ", " + + "_recipientActorRefIdx:" + _recipientActorRefIdx + ", " + + "_serializer:" + _serializer + ", " + + "_manifest:" + _manifest + ", " + + "_manifestIdx:" + _manifestIdx + ")" } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 8e0d5d70ef..426107b849 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -1,4 +1,3 @@ - /** * Copyright (C) 2016 Lightbend Inc. */ @@ -12,8 +11,9 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.util.{ ByteString, OptionVal } +import akka.util.{ ByteString, OptionVal, PrettyByteString } import akka.actor.EmptyLocalActorRef +import akka.remote.artery.compress.{ InboundCompressions, OutboundCompressions, OutboundCompressionsImpl } import akka.stream.stage.TimerGraphStageLogic /** @@ -47,6 +47,10 @@ private[remote] class Encoder( val outboundEnvelope = grab(in) val envelope = bufferPool.acquire() + // FIXME: OMG race between setting the version, and using the table!!!! + headerBuilder setActorRefCompressionTableVersion compression.actorRefCompressionTableVersion + headerBuilder setClassManifestCompressionTableVersion compression.classManifestCompressionTableVersion + // internally compression is applied by the builder: outboundEnvelope.recipient match { case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r @@ -147,24 +151,26 @@ private[remote] class Decoder( val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef(originUid) match { case OptionVal.Some(ref) ⇒ OptionVal(ref.asInstanceOf[InternalActorRef]) - case OptionVal.None ⇒ - // `get` on Path is safe because it surely is not a compressed value here + case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined ⇒ resolveRecipient(headerBuilder.recipientActorRefPath.get) + case _ ⇒ + OptionVal.None } - val sender: InternalActorRef = headerBuilder.senderActorRef(originUid) match { + val sender: OptionVal[InternalActorRef] = headerBuilder.senderActorRef(originUid) match { case OptionVal.Some(ref) ⇒ - ref.asInstanceOf[InternalActorRef] - case OptionVal.None ⇒ - // `get` on Path is safe because it surely is not a compressed value here - resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get) + OptionVal(ref.asInstanceOf[InternalActorRef]) + case OptionVal.None if headerBuilder.senderActorRefPath.isDefined ⇒ + OptionVal(resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get)) + case _ ⇒ + OptionVal.None } // --- hit refs and manifests for heavy-hitter counting association match { case OptionVal.Some(assoc) ⇒ val remoteAddress = assoc.remoteAddress - compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender) + if (sender.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender.get) if (recipient.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, recipient.get) compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, remoteAddress, headerBuilder.manifest(originUid)) case _ ⇒ @@ -181,7 +187,7 @@ private[remote] class Decoder( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? deserializedMessage, - OptionVal.Some(sender), // FIXME: No need for an option, decode simply to deadLetters instead + sender, // FIXME: No need for an option, decode simply to deadLetters instead originUid, association) diff --git a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala index fe7502ea5e..03b1a4b6a6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala @@ -3,8 +3,8 @@ */ package akka.remote.artery -import akka.actor.{ ActorRef, ActorSystem, Address, InternalActorRef } -import akka.remote.artery.compress.CompressionTable +import akka.actor.{ ActorRef, Address } +import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions } import akka.util.OptionVal /** @@ -12,11 +12,10 @@ import akka.util.OptionVal * * Literarily, no compression! */ -final class NoInboundCompressions(system: ActorSystem) extends InboundCompressions { +case object NoInboundCompressions extends InboundCompressions { override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") - else if (idx == 0) OptionVal.Some(system.deadLetters) // special case deadLetters else OptionVal.None override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () @@ -30,10 +29,12 @@ final class NoInboundCompressions(system: ActorSystem) extends InboundCompressio * * Literarily, no compression! */ -object NoOutboundCompressions extends OutboundCompressions { +case object NoOutboundCompressions extends OutboundCompressions { override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = () + override def actorRefCompressionTableVersion: Int = 0 override def compressActorRef(ref: ActorRef): Int = -1 override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = () + override def classManifestCompressionTableVersion: Int = 0 override def compressClassManifest(manifest: String): Int = -1 } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala index 1580709a1b..04eb467c6a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala @@ -9,24 +9,25 @@ import java.util.function.LongFunction import akka.actor.{ ActorRef, ActorSystem, Address } import akka.remote.artery._ import akka.util.OptionVal -import akka.remote.artery.OutboundCompressions import org.agrona.collections.Long2ObjectHashMap /** INTERNAL API */ private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompressions { private val actorRefsOut = new OutboundActorRefCompression(system, remoteAddress) - private val classManifestsOut = new OutboundCompressionTable[String](system, remoteAddress) + private val classManifestsOut = new OutboundClassManifestCompression(system, remoteAddress) // actor ref compression --- override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) + override def actorRefCompressionTableVersion: Int = actorRefsOut.activeCompressionTableVersion override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = actorRefsOut.flipTable(table) // class manifest compression --- override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) + override def classManifestCompressionTableVersion: Int = classManifestsOut.activeCompressionTableVersion override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = classManifestsOut.flipTable(table) } @@ -42,10 +43,9 @@ private[remote] final class InboundCompressionsImpl( ) extends InboundCompressions { private val settings = CompressionSettings(system) - private val localAddress = inboundContext.localAddress // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers - private[this] val _actorRefsIn = new Long2ObjectHashMap[InboundActorRefCompression]() + private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { override def apply(originUid: Long): InboundActorRefCompression = { val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) @@ -53,9 +53,9 @@ private[remote] final class InboundCompressionsImpl( } } private def actorRefsIn(originUid: Long): InboundActorRefCompression = - _actorRefsIn.computeIfAbsent(originUid, createInboundActorRefsForOrigin) + _actorRefsIns.computeIfAbsent(originUid, createInboundActorRefsForOrigin) - private[this] val _classManifestsIn = new Long2ObjectHashMap[InboundManifestCompression]() + private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]() private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { override def apply(originUid: Long): InboundManifestCompression = { val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) @@ -63,12 +63,13 @@ private[remote] final class InboundCompressionsImpl( } } private def classManifestsIn(originUid: Long): InboundManifestCompression = - _classManifestsIn.computeIfAbsent(originUid, createInboundManifestsForOrigin) + _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin) // actor ref compression --- - override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = { actorRefsIn(originUid).decompress(tableVersion, idx) + } override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef): Unit = { actorRefsIn(originUid).increment(address, ref, 1L) } @@ -80,20 +81,18 @@ private[remote] final class InboundCompressionsImpl( override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String): Unit = { classManifestsIn(originUid).increment(address, manifest, 1L) } -} - -object NoopInboundCompressions extends InboundCompressions { - override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () - override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal.None - - override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () - override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal.None -} - -object NoopOutboundCompressions extends OutboundCompressions { - override def compressActorRef(ref: ActorRef): Int = -1 - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = () - - override def compressClassManifest(manifest: String): Int = -1 - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = () + + // testing utilities --- + + /** INTERNAL API: for testing only */ + private[remote] def runNextActorRefAdvertisement() = { + import scala.collection.JavaConverters._ + _actorRefsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } + } + + /** INTERNAL API: for testing only */ + private[remote] def runNextClassManifestAdvertisement() = { + import scala.collection.JavaConverters._ + _classManifestsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } + } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/AdvertiseCompressionId.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/AdvertiseCompressionId.scala deleted file mode 100644 index 92ef0a5840..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/AdvertiseCompressionId.scala +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.artery.compress - -import akka.actor.Address - -/** Callback invoked when a compression id allocation should be advertised to the remote actor system. */ -trait AdvertiseCompressionId[T] { - def apply(remoteAddress: Address, ref: T, id: Int): Unit -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala new file mode 100644 index 0000000000..c0729636ed --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.{ ActorRef, Address } +import akka.util.OptionVal + +/** + * INTERNAL API + * Decompress and cause compression advertisements. + * + * One per inbound message stream thus must demux by originUid to use the right tables. + */ +private[remote] trait InboundCompressions { + def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit + def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] + + def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit + def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] +} +/** + * INTERNAL API + * Compress outgoing data and handle compression advertisements to fill compression table. + * + * One per outgoing message stream. + */ +private[remote] trait OutboundCompressions { + def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit + def actorRefCompressionTableVersion: Int + def compressActorRef(ref: ActorRef): Int + + def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit + def classManifestCompressionTableVersion: Int + def compressClassManifest(manifest: String): Int +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala index 6125a2b3ea..dfc612868b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala @@ -5,7 +5,7 @@ package akka.remote.artery.compress /** INTERNAL API: Versioned compression table to be advertised between systems */ -private[akka] final case class CompressionTable[T](version: Long, map: Map[T, Int]) { +private[akka] final case class CompressionTable[T](version: Int, map: Map[T, Int]) { def invert: DecompressionTable[T] = if (map.isEmpty) DecompressionTable.empty[T].copy(version = version) diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala index 2434112982..4e5b74bf0b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala @@ -5,8 +5,16 @@ package akka.remote.artery.compress /** INTERNAL API */ -private[remote] final case class DecompressionTable[T](version: Long, table: Array[T]) { - def get(idx: Int): T = table(idx) +private[remote] final case class DecompressionTable[T](version: Int, table: Array[T]) { + // TODO version maybe better as Long? // OR implement roll-over + private[this] val length = table.length + + def get(idx: Int): T = { + if (idx >= length) + throw new IllegalArgumentException(s"Attempted decompression of unknown id: [$idx]! " + + s"Only $length ids allocated in table version [$version].") + table(idx) + } def invert: CompressionTable[T] = CompressionTable(version, Map(table.zipWithIndex: _*)) @@ -16,7 +24,7 @@ private[remote] final case class DecompressionTable[T](version: Long, table: Arr getClass.getName + s"(version: $version, " + ( - if (table.length == 0) "[empty]" + if (length == 0) "[empty]" else s"table: [${table.zipWithIndex.map({ case (t, i) ⇒ s"$i -> $t" }).mkString(",")}" ) + "])" } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 75b7ab0f08..6ec6863585 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -4,12 +4,9 @@ package akka.remote.artery.compress -import java.util.concurrent.atomic.AtomicReference - import akka.actor.{ ActorRef, ActorSystem, Address } -import akka.event.Logging +import akka.event.{ Logging, NoLogging } import akka.remote.artery.{ InboundContext, OutboundContext } -import akka.stream.impl.ConstantFun import akka.util.{ OptionVal, PrettyDuration } import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -27,7 +24,7 @@ private[remote] final class InboundActorRefCompression( settings: CompressionSettings, originUid: Long, inboundContext: InboundContext, - heavyHitters: TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters, _.path.toSerializationFormat) { + heavyHitters: TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters) { preAllocate(system.deadLetters) @@ -43,9 +40,9 @@ private[remote] final class InboundActorRefCompression( scheduleNextTableAdvertisement() override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval - def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[ActorRef]): Unit = { - log.debug(s"Advertise ActorRef compression [$table] to [${association.remoteAddress}]") - association.sendControl(CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table)) + override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[ActorRef]): Unit = { + log.debug(s"Advertise ActorRef compression [$table], from [${inboundContext.localAddress}] to [${outboundContext.remoteAddress}]") + outboundContext.sendControl(CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table)) } } @@ -54,14 +51,16 @@ final class InboundManifestCompression( settings: CompressionSettings, originUid: Long, inboundContext: InboundContext, - heavyHitters: TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters, ConstantFun.scalaIdentityFunction) { + heavyHitters: TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters) { scheduleNextTableAdvertisement() override protected def tableAdvertisementInterval = settings.manifests.advertisementInterval - override def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[String]): Unit = { - log.debug(s"Advertise ClassManifest compression [$table] to [${association.remoteAddress}]") - association.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table)) + override lazy val log = NoLogging + + override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[String]): Unit = { + log.debug(s"Advertise ClassManifest compression [$table] to [${outboundContext.remoteAddress}]") + outboundContext.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table)) } } @@ -70,14 +69,13 @@ final class InboundManifestCompression( * Handles counting and detecting of heavy-hitters and compressing them via a table lookup. */ private[remote] abstract class InboundCompression[T >: Null]( - val system: ActorSystem, - val settings: CompressionSettings, - originUid: Long, - inboundContext: InboundContext, - val heavyHitters: TopHeavyHitters[T], - convertKeyToString: T ⇒ String) { // TODO avoid converting to string, in order to use the ActorRef.hashCode! + val system: ActorSystem, + val settings: CompressionSettings, + originUid: Long, + inboundContext: InboundContext, + val heavyHitters: TopHeavyHitters[T]) { - val log = Logging(system, "InboundCompressionTable") + lazy val log = Logging(system, getClass.getSimpleName) // TODO atomic / state machine? the InbouncCompression could even extend ActomicReference[State]! @@ -91,7 +89,7 @@ private[remote] abstract class InboundCompression[T >: Null]( // 2 tables are used, one is "still in use", and the @volatile private[this] var activeTable = DecompressionTable.empty[T] - @volatile private[this] var nextTable = DecompressionTable.empty[T] + @volatile private[this] var nextTable = DecompressionTable.empty[T].copy(version = 1) // TODO calibrate properly (h/w have direct relation to preciseness and max capacity) private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt) @@ -106,25 +104,28 @@ private[remote] abstract class InboundCompression[T >: Null]( * @throws UnknownCompressedIdException if given id is not known, this may indicate a bug – such situation should not happen. */ // not tailrec because we allow special casing in sub-class, however recursion is always at most 1 level deep - def decompress(tableVersion: Long, idx: Int): OptionVal[T] = { - val activeVersion = activeTable.version // TODO move into state + def decompress(incomingTableVersion: Long, idx: Int): OptionVal[T] = { + val activeVersion = activeTable.version - if (tableVersion == -1) OptionVal.None // no compression, bail out early - else if (tableVersion == activeVersion) { + if (incomingTableVersion == -1) OptionVal.None // no compression, bail out early + else if (incomingTableVersion == activeVersion) { val value: T = activeTable.get(idx) - if (settings.debug) log.debug(s"Decompress [{}] => {}", idx, value) if (value != null) OptionVal.Some[T](value) else throw new UnknownCompressedIdException(idx) - } else if (tableVersion < activeVersion) { - log.warning("Received value compressed with old table: [{}], current table version is: [{}]", tableVersion, activeVersion) + } else if (incomingTableVersion < activeVersion) { + log.warning("Received value compressed with old table: [{}], current table version is: [{}]", incomingTableVersion, activeVersion) OptionVal.None - } else if (tableVersion == nextTable.version) { - flipTables() - decompress(tableVersion, idx) // recurse, activeTable will not be able to handle this + } else if (incomingTableVersion == nextTable.version) { + advertisementInProgress = false + log.debug("Received first value compressed using the next prepared compression table, flipping to it (version: {})", nextTable.version) + startUsingNextTable() + decompress(incomingTableVersion, idx) // recurse, activeTable will not be able to handle this } else { // which means that incoming version was > nextTable.version, which likely is a bug - log.error("Inbound message is using compression table version higher than the highest allocated table on this node. " + - "This should not happen! State: activeTable: {}, nextTable, incoming tableVersion: {}", activeVersion, nextTable, tableVersion) + log.error( + "Inbound message is using compression table version higher than the highest allocated table on this node. " + + "This should not happen! State: activeTable: {}, nextTable: {}, incoming tableVersion: {}", + activeVersion, nextTable.version, incomingTableVersion) OptionVal.None } @@ -136,41 +137,11 @@ private[remote] abstract class InboundCompression[T >: Null]( */ // TODO not so happy about passing around address here, but in incoming there's no other earlier place to get it? def increment(remoteAddress: Address, value: T, n: Long): Unit = { - val key = convertKeyToString(value) - if (shouldIgnore(key)) { - // ignore... - } else { - val count = cms.addAndEstimateCount(key, n) + val count = cms.addObjectAndEstimateCount(value, n) - // TODO optimise order of these, what is more expensive? - // TODO (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering. - val wasHeavyHitter = addAndCheckIfheavyHitterDetected(value, count) - if (wasHeavyHitter) - log.debug(s"Heavy hitter detected: {} [count: {}]", value, count) - // if (wasHeavyHitter && !wasCompressedPreviously(key)) { - // val idx = prepareCompressionAdvertisement() - // log.debug("Allocated compression id [" + idx + "] for [" + value + "], in association with [" + remoteAddress + "]") - // } - } - } - - private def shouldIgnore(key: String) = { // TODO this is hacky, if we'd do this we trigger compression too early (before association exists, so control messages fail) - key match { - case null ⇒ true - case "" ⇒ true // empty class manifest for example - case _ ⇒ key.endsWith("/") - } - } - - // TODO this must be optimised, we really don't want to scan the entire key-set each time to make sure - private def wasCompressedPreviously(key: String): Boolean = { - var i = 0 - val len = activeTable.table.length - while (i < len) { - if (activeTable.table(i) == key) return true - i += 1 - } - false + // TODO optimise order of these, what is more expensive? + // TODO (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering. + addAndCheckIfheavyHitterDetected(value, count) } /** Mutates heavy hitters */ @@ -186,10 +157,8 @@ private[remote] abstract class InboundCompression[T >: Null]( * INTERNAL / TESTING API * Used for manually triggering when a compression table should be advertised. * Note that most likely you'd want to set the advertisment-interval to `0` when using this. - * - * TODO: Technically this would be solvable by a "triggerable" scheduler. */ - private[remote] def triggerNextTableAdvertisement(): Unit = // TODO expose and use in tests + private[remote] def triggerNextTableAdvertisement(): Unit = // TODO use this in tests for triggering runNextTableAdvertisement() def scheduleNextTableAdvertisement(): Unit = @@ -200,9 +169,9 @@ private[remote] abstract class InboundCompression[T >: Null]( log.debug("Scheduled {} advertisement in [{}] from now...", getClass.getSimpleName, PrettyDuration.format(tableAdvertisementInterval, includeNanos = false, 1)) } catch { case ex: IllegalStateException ⇒ - log.warning("Unable to schedule {} advertisement, " + - "likely system is shutting down. " + - "Reason: {}", getClass.getName, ex.getMessage) + // this is usually harmless + log.debug("Unable to schedule {} advertisement, " + + "likely system is shutting down. Reason: {}", getClass.getName, ex.getMessage) } case _ ⇒ // ignore... } @@ -213,6 +182,9 @@ private[remote] abstract class InboundCompression[T >: Null]( finally scheduleNextTableAdvertisement() } + // FIXME use AtomicBoolean instead? + @volatile private[this] var advertisementInProgress = false + /** * Entry point to advertising a new compression table. * @@ -223,19 +195,20 @@ private[remote] abstract class InboundCompression[T >: Null]( * It must be advertised to the other side so it can start using it in its outgoing compression. * Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing. */ - def runNextTableAdvertisement() = { // TODO guard against re-entrancy? - inboundContext.association(originUid) match { - case OptionVal.Some(association) ⇒ - val table = prepareCompressionAdvertisement() - nextTable = table.invert // TODO expensive, check if building the other way wouldn't be faster? - advertiseCompressionTable(association, table) + private[remote] def runNextTableAdvertisement() = + if (!advertisementInProgress) + inboundContext.association(originUid) match { + case OptionVal.Some(association) ⇒ + advertisementInProgress = true + val table = prepareCompressionAdvertisement() + nextTable = table.invert // TODO expensive, check if building the other way wouldn't be faster? + advertiseCompressionTable(association, table) - case OptionVal.None ⇒ - // otherwise it's too early, association not ready yet. - // so we don't build the table since we would not be able to send it anyway. - log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid) - } - } + case OptionVal.None ⇒ + // otherwise it's too early, association not ready yet. + // so we don't build the table since we would not be able to send it anyway. + log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid) + } /** * Must be implementeed by extending classes in order to send a [[akka.remote.artery.ControlMessage]] @@ -244,7 +217,7 @@ private[remote] abstract class InboundCompression[T >: Null]( protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit /** Drop `activeTable` and start using the `nextTable` in its place. */ - private def flipTables(): Unit = { + private def startUsingNextTable(): Unit = { log.debug("Swaping active decompression table to version {}.", nextTable.version) activeTable = nextTable nextTable = DecompressionTable.empty @@ -261,11 +234,6 @@ private[remote] abstract class InboundCompression[T >: Null]( } -final class ExistingcompressedIdReuseAttemptException(id: Long, value: Any) - extends RuntimeException( - s"Attempted to re-allocate compressedId [$id] which is still in use for compressing [$value]! " + - s"This should never happen and is likely an implementation bug.") - final class UnknownCompressedIdException(id: Long) extends RuntimeException( s"Attempted de-compress unknown id [$id]! " + diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala index 03224b491c..c70dd95a54 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference import java.{ util ⇒ ju } import akka.actor.{ ActorRef, ActorSystem, Address } -import akka.event.Logging +import akka.event.{ Logging, LoggingAdapter } import akka.remote.artery.compress.OutboundCompression.OutboundCompressionState import scala.annotation.tailrec @@ -20,9 +20,14 @@ private[remote] final class OutboundActorRefCompression(system: ActorSystem, rem flipTable(CompressionTable( version = 0, map = Map( - system.deadLetters → 0 - ) - )) + system.deadLetters → 0))) +} + +/** INTERNAL API */ +private[remote] final class OutboundClassManifestCompression(system: ActorSystem, remoteAddress: Address) + extends OutboundCompressionTable[String](system, remoteAddress) { + + flipTable(CompressionTable(version = 0, Map.empty)) } /** @@ -34,9 +39,15 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd extends AtomicReference[OutboundCompressionState[T]](OutboundCompressionState.initial) { // TODO could be instead via Unsafe import OutboundCompression._ - // TODO: The compression map may benefit from padding if we want multiple compressions to be running in parallel + // TODO: The compression map may benefit from padding if we want multiple compressions to be running in parallel - private[this] val log = Logging(system, "OutboundCompressionTable") + protected val log: LoggingAdapter = Logging(system, Logging.simpleName(getClass)) + + // TODO this exposes us to a race between setting the Version and USING the table...? + def activeCompressionTableVersion = { + val version = get.version + version + } /** * Flips the currently used compression table to the new one (iff the new one has a version number higher than the currently used one). @@ -44,16 +55,16 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd // (╯°□°)╯︵ ┻━┻ @tailrec final def flipTable(activate: CompressionTable[T]): Unit = { val state = get() - if (state.version < activate.version) // TODO or we could demand it to be strictly `currentVersion + 1` + if (activate.version > state.version) // TODO this should handle roll-over as we move to Byte if (compareAndSet(state, prepareState(activate))) - log.debug("Successfully flipped compression table to version {}, for ourgoing connection to {}", activate.version, remoteAddress) + log.debug(s"Successfully flipped compression table versions {}=>{}, for outgoing to [{}]", state.version, activate.version, remoteAddress) else flipTable(activate) // retry else if (state.version == activate.version) log.warning("Received duplicate compression table (version: {})! Ignoring it.", state.version) else log.error("Received unexpected compression table with version nr [{}]! " + - "Current version number is []") + "Current version number is [{}].", activate.version, state.version) } @@ -66,11 +77,8 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd // load factor is `1` since we will never grow this table beyond the initial size, // this way we can avoid any rehashing from happening. val m = new ju.HashMap[T, Integer](size, 1.0f) // TODO could be replaced with primitive `int` specialized version - val it = activate.map.keysIterator - var i = 0 - while (it.hasNext) { - m.put(it.next(), i) // TODO boxing :< - i += 1 + activate.map.foreach { + case (key, value) ⇒ m.put(key, value) // TODO boxing :< } OutboundCompressionState(activate.version, m) } @@ -98,7 +106,7 @@ private[remote] object OutboundCompression { // format: ON /** INTERNAL API */ - private[remote] final case class OutboundCompressionState[T](version: Long, table: ju.Map[T, Integer]) + private[remote] final case class OutboundCompressionState[T](version: Int, table: ju.Map[T, Integer]) private[remote] object OutboundCompressionState { def initial[T] = OutboundCompressionState[T](-1, ju.Collections.emptyMap()) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index 3aa3c64773..cc013c3646 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -1,9 +1,13 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + package akka.remote.artery import java.nio.{ ByteBuffer, ByteOrder } import akka.actor._ -import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils } +import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils, InboundCompressions, OutboundCompressions } import akka.testkit.AkkaSpec import akka.util.{ ByteString, OptionVal } @@ -28,11 +32,13 @@ class EnvelopeBufferSpec extends AkkaSpec { val idxToManifest = manifestToIdx.map(_.swap) override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests + override def actorRefCompressionTableVersion: Int = 0 override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1) override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx)) override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests + override def classManifestCompressionTableVersion: Int = 0 override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1) override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala index e99219bad7..3dd3030fb6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + package akka.remote.artery import java.io.{ File, IOException, RandomAccessFile } diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala index 673f70cbcc..efb1fa4ef9 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -9,6 +9,9 @@ import akka.remote.artery.compress.CompressionProtocol.Events import akka.testkit._ import akka.util.Timeout import akka.pattern.ask +import akka.remote.RARP +import akka.remote.artery.ArteryTransport +import akka.remote.artery.compress.CompressionProtocol.Events.{ Event, ReceivedCompressionTable } import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfter @@ -33,7 +36,8 @@ object HandshakeShouldDropCompressionTableSpec { enabled = on actor-refs { enabled = on - advertisement-interval = 3 seconds + # we'll trigger advertisement manually + advertisement-interval = 10 hours } } } @@ -57,59 +61,79 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr } "Outgoing compression table" must { + // FIXME this is failing, we must rethink how tables are identified and updated "be dropped on system restart" in { val messagesToExchange = 10 + val systemATransport = RARP(system).provider.transport.asInstanceOf[ArteryTransport] + def systemBTransport = RARP(systemB).provider.transport.asInstanceOf[ArteryTransport] // listen for compression table events val aProbe = TestProbe() val a1Probe = TestProbe() val b1Probe = TestProbe()(systemB) - system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.Event]) - systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.Event]) + system.eventStream.subscribe(aProbe.ref, classOf[Event]) + systemB.eventStream.subscribe(b1Probe.ref, classOf[Event]) - def voidSel = system.actorSelection(s"artery://systemB@localhost:$portB/user/void") - systemB.actorOf(TestActors.blackholeProps, "void") + def echoSel = system.actorSelection(s"artery://systemB@localhost:$portB/user/echo") + systemB.actorOf(TestActors.echoActorProps, "echo") // cause testActor-1 to become a heavy hitter - (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised - // give it enough time to advertise first table - val a0 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) + (1 to messagesToExchange).foreach { i ⇒ echoSel ! s"hello-$i" } // does not reply, but a hot receiver should be advertised + waitForEcho(this, s"hello-$messagesToExchange") + systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) + + val a0 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a0) - assertCompression[ActorRef](a0.table, 1, _.toString should include(testActor.path.name)) + a0.table.map.keySet should contain(testActor) // cause a1Probe to become a heavy hitter (we want to not have it in the 2nd compression table later) - (1 to messagesToExchange).foreach { i ⇒ voidSel.tell("hello", a1Probe.ref) } // does not reply, but a hot receiver should be advertised - // give it enough time to advertise first table - val a1 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) + (1 to messagesToExchange).foreach { i ⇒ echoSel.tell(s"hello-$i", a1Probe.ref) } + waitForEcho(a1Probe, s"hello-$messagesToExchange") + systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) + + val a1 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a1) - assertCompression[ActorRef](a1.table, 2, _.toString should include(a1Probe.ref.path.name)) + a1.table.map.keySet should contain(a1Probe.ref) log.warning("SHUTTING DOWN system {}...", systemB) shutdown(systemB) systemB = ActorSystem("systemB", configB) - Thread.sleep(5000) + Thread.sleep(1000) log.warning("SYSTEM READY {}...", systemB) val aNewProbe = TestProbe() - system.eventStream.subscribe(aNewProbe.ref, classOf[CompressionProtocol.Events.Event]) + system.eventStream.subscribe(aNewProbe.ref, classOf[Event]) - systemB.actorOf(TestActors.blackholeProps, "void") // start it again - (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised - // compression triggered again - val a2 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) + systemB.actorOf(TestActors.echoActorProps, "echo") // start it again + (1 to 5) foreach { _ ⇒ + // since some messages may end up being lost + (1 to messagesToExchange).foreach { i ⇒ echoSel ! s"hello-$i" } // does not reply, but a hot receiver should be advertised + Thread.sleep(100) + } + waitForEcho(this, s"hello-$messagesToExchange", max = 10.seconds) + systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) + + val a2 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a2) - assertCompression[ActorRef](a2.table, 1, _.toString should include(testActor.path.name)) + a2.table.map.keySet should contain(testActor) val aNew2Probe = TestProbe() - (1 to messagesToExchange).foreach { i ⇒ voidSel.tell("hello", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised - // compression triggered again - val a3 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) - info("Received second compression: " + a3) - assertCompression[ActorRef](a3.table, 2, _.toString should include(aNew2Probe.ref.path.name)) - } + (1 to messagesToExchange).foreach { i ⇒ echoSel.tell(s"hello-$i", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised + waitForEcho(aNew2Probe, s"hello-$messagesToExchange") + systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false) + val a3 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds) + info("Received second compression: " + a3) + a3.table.map.keySet should contain(aNew2Probe.ref) + } } + def waitForEcho(probe: TestKit, m: String, max: Duration = 3.seconds): Any = + probe.fishForMessage(max = max, hint = s"waiting for '$m'") { + case `m` ⇒ true + case x ⇒ false + } + def identify(_system: String, port: Int, name: String) = { val selection = system.actorSelection(s"artery://${_system}@localhost:$port/user/$name")