diff --git a/.gitignore b/.gitignore index 8e716eb2d4..cb0c517fff 100755 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ *# +*.log +*.orig *.iml *.ipr *.iws diff --git a/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala b/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala index fdde031dda..3f68e35ba9 100644 --- a/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala +++ b/akka-actor/src/main/scala/akka/io/DirectByteBufferPool.scala @@ -83,10 +83,10 @@ private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries: */ private final def tryCleanDirectByteBuffer(toBeDestroyed: ByteBuffer): Unit = try { if (toBeDestroyed.isDirect) { - val cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner") + val cleanerMethod = toBeDestroyed.getClass.getMethod("cleaner") cleanerMethod.setAccessible(true) val cleaner = cleanerMethod.invoke(toBeDestroyed) - val cleanMethod = cleaner.getClass().getMethod("clean") + val cleanMethod = cleaner.getClass.getMethod("clean") cleanMethod.setAccessible(true) cleanMethod.invoke(cleaner) } diff --git a/akka-actor/src/main/scala/akka/routing/MurmurHash.scala b/akka-actor/src/main/scala/akka/routing/MurmurHash.scala index fc67613a5d..f4fb81edb5 100644 --- a/akka-actor/src/main/scala/akka/routing/MurmurHash.scala +++ b/akka-actor/src/main/scala/akka/routing/MurmurHash.scala @@ -68,11 +68,11 @@ object MurmurHash { /** * Incorporates a new value into an existing hash. * - * @param hash the prior hash value - * @param value the new value to incorporate - * @param magicA a magic integer from the stream - * @param magicB a magic integer from a different stream - * @return the updated hash value + * @param hash the prior hash value + * @param value the new value to incorporate + * @param magicA a magic integer from the stream + * @param magicB a magic integer from a different stream + * @return the updated hash value */ def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int = (hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer diff --git a/akka-actor/src/main/scala/akka/util/PrettyByteString.scala b/akka-actor/src/main/scala/akka/util/PrettyByteString.scala new file mode 100644 index 0000000000..85b6a38005 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/PrettyByteString.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.util + +/** + * INTERNAL API: ByteString pretty printer, based on Johanes Rudolph's implementation from: + * https://github.com/jrudolph/akka/commit/c889dddf37c8635c365a79a391eb18a709f36773#diff-947cbf07996eeb823cb9850cc2e81126R19 + */ +private[akka] object PrettyByteString { + private val indentDepth = 2 + private val indent = " " * (indentDepth + 1) + + implicit class asPretty(bs: ByteString) { + def prettyPrint(maxBytes: Int = 16 * 5): String = formatBytes(bs, maxBytes).mkString("\n") + } + + def formatBytes(bs: ByteString, maxBytes: Int = 16 * 5): Iterator[String] = { + def asHex(b: Byte): String = b formatted "%02X" + def asASCII(b: Byte): Char = + if (b >= 0x20 && b < 0x7f) b.toChar + else '.' + + def formatLine(bs: ByteString): String = { + val data = bs.toSeq + val hex = data.map(asHex).mkString(" ") + val ascii = data.map(asASCII).mkString + f"$indent%s $hex%-48s | $ascii" + } + def formatBytes(bs: ByteString): String = + bs.grouped(16).map(formatLine).mkString("\n") + + val prefix = s"${indent}ByteString(${bs.size} bytes)" + + if (bs.size <= maxBytes) Iterator(prefix + "\n", formatBytes(bs)) + else + Iterator( + s"$prefix first + last $maxBytes:\n", + formatBytes(bs.take(maxBytes)), + s"\n$indent ... [${bs.size - maxBytes} bytes omitted] ...\n", + formatBytes(bs.takeRight(maxBytes))) + } + +} diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index b02ca87672..9a1da92bd6 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -3,18 +3,21 @@ */ package akka.remote.artery +import java.io.File import java.nio.ByteBuffer import java.nio.ByteOrder +import java.nio.channels.FileChannel import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit + +import akka.remote.artery.compress._ +import akka.stream.impl.ConstantFun +import org.openjdk.jmh.annotations.Scope + import scala.concurrent.Await import scala.concurrent.duration._ import akka.NotUsed -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem -import akka.actor.InternalActorRef -import akka.actor.Props -import akka.actor.RootActorPath +import akka.actor._ import akka.remote.AddressUidExtension import akka.remote.EndpointManager.Send import akka.remote.RARP @@ -56,8 +59,9 @@ class CodecBenchmark { 16, create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear() ) - val compression = new Compression(system) - val headerIn = HeaderBuilder(compression) + + val compressionOut = NoOutboundCompression + val headerIn = HeaderBuilder.in(NoopInboundCompression) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) val uniqueLocalAddress = UniqueAddress( @@ -102,8 +106,8 @@ class CodecBenchmark { headerIn.version = 1 headerIn.uid = 42 headerIn.serializer = 4 - headerIn.senderActorRef = senderStringA - headerIn.recipientActorRef = recipientStringB + headerIn.senderActorRef = actorOnSystemA + headerIn.recipientActorRef = remoteRefB headerIn.manifest = "" envelope.writeHeader(headerIn) envelope.byteBuffer.put(payload) @@ -136,7 +140,7 @@ class CodecBenchmark { val N = 100000 val encoder: Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool)) + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, envelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) @@ -165,7 +169,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, compression, envelopePool, inboundEnvelopePool)) + resolveActorRefWithLocalAddress, NoopInboundCompression, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map { _ => @@ -193,7 +197,7 @@ class CodecBenchmark { val N = 100000 val encoder: Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool)) + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, envelopePool)) val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) val provider = RARP(system).provider @@ -206,7 +210,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, compression, envelopePool, inboundEnvelopePool)) + resolveActorRefWithLocalAddress, NoopInboundCompression, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/FlightRecorderBench.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/FlightRecorderBench.scala index 9b286f34c0..be6cdfcb67 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/FlightRecorderBench.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/FlightRecorderBench.scala @@ -6,8 +6,9 @@ package akka.remote.artery import java.io.File import java.nio.channels.FileChannel import java.nio.file.StandardOpenOption -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{ CountDownLatch, TimeUnit } import java.util.concurrent.TimeUnit + import org.openjdk.jmh.annotations.{ OperationsPerInvocation, _ } @State(Scope.Benchmark) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala new file mode 100644 index 0000000000..4297dbb171 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.compress + +import java.util.Random + +import akka.remote.artery.compress.TopHeavyHitters +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole + +/** + * On Macbook pro: + * [info] Benchmark (n) Mode Cnt Score Error Units + * [info] HeavyHittersBenchmark.updateHitter 8192 thrpt 40 357 405.512 ± 3329.008 ops/s + * [info] HeavyHittersBenchmark.updateNotHitter 8192 thrpt 40 259 032 711.743 ± 7199514.142 ops/s + * [info] HeavyHittersBenchmark.updateRandomHitter 8192 thrpt 40 2 105 102.088 ± 18214.624 ops/s + * + * === + * on our benchmarking box: + * ubuntu@ip-172-31-43-199:~/akka-ktoso$ lscpu + * Architecture: x86_64 + * CPU op-mode(s): 32-bit, 64-bit + * Byte Order: Little Endian + * CPU(s): 2 + * Thread(s) per core: 2 + * CPU MHz: 2494.068 + * Hypervisor vendor: Xen + * Virtualization type: full + * L1d cache: 32K + * L1i cache: 32K + * L2 cache: 256K + * L3 cache: 25600K + * + * ubuntu@ip-172-31-43-199:~/akka-ktoso$ cpuid | grep nm + * (simple synth) = Intel Core i9-4000 / Xeon E5-1600/E5-2600 v2 (Ivy Bridge-EP C1/M1/S1), 22nm + * + * [info] Benchmark (n) Mode Cnt Score Error Units + * [info] HeavyHittersBenchmark.updateHitter 8192 thrpt 40 309 512.584 ± 153.248 ops/s + * [info] HeavyHittersBenchmark.updateNotHitter 8192 thrpt 40 248 170 545.577 ± 1244986.765 ops/s + * [info] HeavyHittersBenchmark.updateRandomHitter 8192 thrpt 40 1 207 521.674 ± 912.676 ops/s + */ +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(2) +class HeavyHittersBenchmark { + + // @Param(Array("512", "8192")) + @Param(Array("8192")) + var n: Int = 0 + + var topN: TopHeavyHitters[String] = _ + + val rand = new Random(1001021) + + val preallocatedNums: Array[Long] = Array.ofDim(8192) + val preallocatedStrings: Array[String] = Array.ofDim(8192) + + @Setup + def init(): Unit = { + topN = new TopHeavyHitters(n) + var i = 0 + while (i < n) { + topN.update(i.toString, i) + preallocatedNums(i) = rand.nextLong() + preallocatedStrings(i) = i.toString + i += 1 + } + } + + @Benchmark + @OperationsPerInvocation(8192) + def updateNotHitter(blackhole: Blackhole): Unit = { + var i = 0 + while (i < 8192) { + blackhole.consume(topN.update("NOT", 1)) // definitely not a heavy hitter + i += 1 + } + } + + @Benchmark + @OperationsPerInvocation(8192) + def updateExistingHitter(blackhole: Blackhole): Unit = { + var i = 0 + while (i < 8192) { + blackhole.consume(topN.update("HEAVY_HITTER", Long.MaxValue)) // definitely a heavy hitter + i += 1 + } + } + + @Benchmark + def updateNewHitter(blackhole: Blackhole): Unit = { + var i = 0 + while (i < 8192) { + blackhole.consume(topN.update(preallocatedStrings(i), Long.MaxValue)) + i += 1 + } + } + + @Benchmark + @OperationsPerInvocation(8192) + def updateRandomHitter(blackhole: Blackhole): Unit = { + var i = 0 + while (i < 8192) { + blackhole.consume(topN.update(preallocatedStrings(i), preallocatedNums(i))) // maybe a heavy hitter + i += 1 + } + } + +} diff --git a/akka-bench-jmh/src/main/scala/akka/remote/compress/OutboundCompressionTableBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/compress/OutboundCompressionTableBenchmark.scala new file mode 100644 index 0000000000..7f7da57b01 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/compress/OutboundCompressionTableBenchmark.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.compress + +import java.util.Random + +import akka.actor.{ ActorSystem, Address } +import akka.event.NoLogging +import akka.remote.artery.compress.{ OutboundCompressionTable, TopHeavyHitters } +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(2) +class OutboundCompressionTableBenchmark { + + @Param(Array("512", "8192")) + var registered: Int = 0 + + implicit val system = ActorSystem("TestSystem") + + var outgoingCompression: OutboundCompressionTable[String] = _ + + val rand = new Random(1001021) + + var preallocatedNums: Array[Long] = _ + var preallocatedStrings: Array[String] = _ + + var i = 0 + + @Setup + def init(): Unit = { + preallocatedNums = Array.ofDim(registered) + preallocatedStrings = Array.ofDim(8192) + + outgoingCompression = new OutboundCompressionTable(system, Address("akka", "remote-system")) + + var i = 0 + while (i < registered) { + outgoingCompression.register(i.toString, i) + preallocatedNums(i) = rand.nextLong() + preallocatedStrings(i) = i.toString + i += 1 + } + } + + // @Benchmark + // @BenchmarkMode(Array(Mode.SingleShotTime)) + // def registerThenCompress(): Int = { + // outgoingCompression.register("new", i) + // outgoingCompression.compress("new") + // } + + @Benchmark + def compressKnown(): Int = + outgoingCompression.compress("1") + +} diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala index 44f97b3375..ea00cd9fc9 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -18,7 +18,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ @State(Scope.Benchmark) -@OutputTimeUnit(TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) @BenchmarkMode(Array(Mode.Throughput)) class FlowMapBenchmark { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala index a3573c9c1d..21e01a84b3 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala @@ -89,4 +89,4 @@ private[http] object PoolGateway { private[this] val uniqueGatewayId = new AtomicLong(0) def newUniqueGatewayIdentifier = UniqueGateway(uniqueGatewayId.incrementAndGet()) -} \ No newline at end of file +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 65f966091d..9e081ff638 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -6,7 +6,6 @@ package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.NANOSECONDS - import scala.concurrent.duration._ import akka.actor._ diff --git a/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java b/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java new file mode 100644 index 0000000000..577b8718c2 --- /dev/null +++ b/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java @@ -0,0 +1,342 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.remote.artery.compress; + +import akka.actor.Actor; +import akka.actor.ActorRef; + +import java.io.UnsupportedEncodingException; +import java.util.Random; + +/** + * INTERNAL API: Count-Min Sketch datastructure. + * + * An Improved Data Stream Summary: The Count-Min Sketch and its Applications + * https://web.archive.org/web/20060907232042/http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf + * + * This implementation is mostly taken and adjusted from the Apache V2 licensed project `stream-lib`, located here: + * https://github.com/clearspring/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/frequency/CountMinSketch.java + */ +public class CountMinSketch { + + public static final long PRIME_MODULUS = (1L << 31) - 1; + + private int depth; + private int width; + private long[][] table; + private long[] hashA; + private long size; + private double eps; + private double confidence; + + public CountMinSketch(int depth, int width, int seed) { + this.depth = depth; + this.width = width; + this.eps = 2.0 / width; + this.confidence = 1 - 1 / Math.pow(2, depth); + initTablesWith(depth, width, seed); + } + + public CountMinSketch(double epsOfTotalCount, double confidence, int seed) { + // 2/w = eps ; w = 2/eps + // 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence) + this.eps = epsOfTotalCount; + this.confidence = confidence; + this.width = (int) Math.ceil(2 / epsOfTotalCount); + this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2)); + initTablesWith(depth, width, seed); + } + + CountMinSketch(int depth, int width, int size, long[] hashA, long[][] table) { + this.depth = depth; + this.width = width; + this.eps = 2.0 / width; + this.confidence = 1 - 1 / Math.pow(2, depth); + this.hashA = hashA; + this.table = table; + this.size = size; + } + + private void initTablesWith(int depth, int width, int seed) { + this.table = new long[depth][width]; + this.hashA = new long[depth]; + Random r = new Random(seed); + // We're using a linear hash functions + // of the form (a*x+b) mod p. + // a,b are chosen independently for each hash function. + // However we can set b = 0 as all it does is shift the results + // without compromising their uniformity or independence with + // the other hashes. + for (int i = 0; i < depth; ++i) { + hashA[i] = r.nextInt(Integer.MAX_VALUE); + } + } + + /** Referred to as {@code epsilon} in the whitepaper */ + public double getRelativeError() { + return eps; + } + + public double getConfidence() { + return confidence; + } + + private int hash(long item, int i) { + long hash = hashA[i] * item; + // A super fast way of computing x mod 2^p-1 + // See http://www.cs.princeton.edu/courses/archive/fall09/cos521/Handouts/universalclasses.pdf + // page 149, right after Proposition 7. + hash += hash >> 32; + hash &= PRIME_MODULUS; + // Doing "%" after (int) conversion is ~2x faster than %'ing longs. + return ((int) hash) % width; + } + + public void add(long item, long count) { + if (count < 0) { + // Actually for negative increments we'll need to use the median + // instead of minimum, and accuracy will suffer somewhat. + // Probably makes sense to add an "allow negative increments" + // parameter to constructor. + throw new IllegalArgumentException("Negative increments not implemented"); + } + for (int i = 0; i < depth; ++i) { + table[i][hash(item, i)] += count; + } + size += count; + } + + public void add(String item, long count) { + if (count < 0) { + // Actually for negative increments we'll need to use the median + // instead of minimum, and accuracy will suffer somewhat. + // Probably makes sense to add an "allow negative increments" + // parameter to constructor. + throw new IllegalArgumentException("Negative increments not implemented"); + } + // TODO we could reuse the arrays + final int[] buckets = MurmurHash.hashBuckets(item, depth, width); // TODO replace with Scala's Murmur3, it's much faster + for (int i = 0; i < depth; ++i) { + table[i][buckets[i]] += count; + } + size += count; + } + + /** + * Similar to {@code add}, however we reuse the fact that the hask buckets have to be calculated for {@code add} + * already, and a separate {@code estimateCount} operation would have to calculate them again, so we do it all in one go. + */ + public long addAndEstimateCount(String item, long count) { + if (count < 0) { + // Actually for negative increments we'll need to use the median + // instead of minimum, and accuracy will suffer somewhat. + // Probably makes sense to add an "allow negative increments" + // parameter to constructor. + throw new IllegalArgumentException("Negative increments not implemented"); + } + final int[] buckets = MurmurHash.hashBuckets(item, depth, width); + for (int i = 0; i < depth; ++i) { + table[i][buckets[i]] += count; + } + size += count; + return estimateCount(buckets); + } + + public long size() { + return size; + } + + /** + * The estimate is correct within {@code 'epsilon' * (total item count)}, + * with probability {@code confidence}. + */ + public long estimateCount(long item) { + long res = Long.MAX_VALUE; + for (int i = 0; i < depth; ++i) { + res = Math.min(res, table[i][hash(item, i)]); + } + return res; + } + + /** + * The estimate is correct within {@code 'epsilon' * (total item count)}, + * with probability {@code confidence}. + */ + public long estimateCount(String item) { + int[] buckets = MurmurHash.hashBuckets(item, depth, width); + return estimateCount(buckets); + } + + /** + * The estimate is correct within {@code 'epsilon' * (total item count)}, + * with probability {@code confidence}. + * + * @param buckets the "indexes" of buckets from which we want to calculate the count + */ + private long estimateCount(int[] buckets) { + long res = Long.MAX_VALUE; + for (int i = 0; i < depth; ++i) { + res = Math.min(res, table[i][buckets[i]]); + } + return res; + } + + /** + * This is copied from https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/hash/MurmurHash.java + * Which is Apache V2 licensed. + *

+ * This is a very fast, non-cryptographic hash suitable for general hash-based + * lookup. See http://murmurhash.googlepages.com/ for more details. + *

+ *

+ * The C version of MurmurHash 2.0 found at that site was ported to Java by + * Andrzej Bialecki (ab at getopt org). + *

+ */ + // TODO replace with Scala's Murmur3, it's much faster + private static class MurmurHash { + + public static int hash(Object o) { + if (o == null) { + return 0; + } + if (o instanceof String) { + return hash(((String) o).getBytes()); + } + // TODO consider calling hashCode on ActorRef here directly? It is just a random number though so possibly not as evenly distributed...? + if (o instanceof Long) { + return hashLong((Long) o); + } + if (o instanceof Integer) { + return hashLong((Integer) o); + } + if (o instanceof Double) { + return hashLong(Double.doubleToRawLongBits((Double) o)); + } + if (o instanceof Float) { + return hashLong(Float.floatToRawIntBits((Float) o)); + } + if (o instanceof byte[]) { + return hash((byte[]) o); + } + return hash(o.toString()); + } + + public static int hash(byte[] data) { + return hash(data, data.length, -1); + } + + public static int hash(byte[] data, int seed) { + return hash(data, data.length, seed); + } + + public static int hash(byte[] data, int length, int seed) { + int m = 0x5bd1e995; + int r = 24; + + int h = seed ^ length; + + int len_4 = length >> 2; + + for (int i = 0; i < len_4; i++) { + int i_4 = i << 2; + int k = data[i_4 + 3]; + k = k << 8; + k = k | (data[i_4 + 2] & 0xff); + k = k << 8; + k = k | (data[i_4 + 1] & 0xff); + k = k << 8; + k = k | (data[i_4 + 0] & 0xff); + k *= m; + k ^= k >>> r; + k *= m; + h *= m; + h ^= k; + } + + // avoid calculating modulo + int len_m = len_4 << 2; + int left = length - len_m; + + if (left != 0) { + if (left >= 3) { + h ^= (int) data[length - 3] << 16; + } + if (left >= 2) { + h ^= (int) data[length - 2] << 8; + } + if (left >= 1) { + h ^= (int) data[length - 1]; + } + + h *= m; + } + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + public static int hashLong(long data) { + int m = 0x5bd1e995; + int r = 24; + + int h = 0; + + int k = (int) data * m; + k ^= k >>> r; + h ^= k * m; + + k = (int) (data >> 32) * m; + k ^= k >>> r; + h *= m; + h ^= k * m; + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + // Murmur is faster than an SHA-based approach and provides as-good collision + // resistance. The combinatorial generation approach described in + // http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf + // does prove to work in actual tests, and is obviously faster + // than performing further iterations of murmur. + public static int[] hashBuckets(String key, int hashCount, int max) { + byte[] b; + try { + b = key.getBytes("UTF-16");// TODO Use the Unsafe trick @patriknw used to access the backing array directly -- via Endre + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + return hashBuckets(b, hashCount, max); + } + + static int[] hashBuckets(byte[] b, int hashCount, int max) { + // TODO we could reuse the arrays + int[] result = new int[hashCount]; + int hash1 = hash(b, b.length, 0); + int hash2 = hash(b, b.length, hash1); + for (int i = 0; i < hashCount; i++) { + result[i] = Math.abs((hash1 + i * hash2) % max); + } + return result; + } + } + + @Override + public String toString() { + return "CountMinSketch{" + + "confidence=" + confidence + + ", size=" + size + + ", depth=" + depth + + ", width=" + width + + '}'; + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 7ed83e7c06..d3425fbd81 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -110,8 +110,8 @@ akka { # For enabling testing features, such as blackhole in akka-remote-testkit. test-mode = off - # Settings for the materializer that is used for the remote streams. - materializer = ${akka.stream.materializer} + # Settings for the materializer that is used for the remote streams. + materializer = ${akka.stream.materializer} materializer { dispatcher = "akka.remote.default-remote-dispatcher" } @@ -134,7 +134,38 @@ akka { # Level 1 strongly prefer low CPU consumption over low latency. # Level 10 strongly prefer low latency over low CPU consumption. idle-cpu-level = 5 + + # compression of common strings in remoting messages, like actor destinations, serializers etc + compression { + # global flag to disable all compression + enabled = off + + # unlocks additional very verbose debug logging of compression events (on DEBUG log level) + debug = off + + actor-refs { + enabled = off # TODO possibly remove on/off option once we have battle proven it? + + # Max number of compressed actor-refs + # Note that compression tables are "rolling" (i.e. a new table replaces the old + # compression table once in a while), and this setting is only about the total number + # of compressions within a single such table. + # Must be a positive natural number. + max = 256 + } + manifests { + enabled = off # TODO possibly remove on/off option once we have battle proven it? + + # Max number of compressed manifests + # Note that compression tables are "rolling" (i.e. a new table replaces the old + # compression table once in a while), and this setting is only about the total number + # of compressions within a single such table. + # Must be a positive natural number. + max = 256 + } + } } + } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 70bf13e572..9561ac1d55 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -217,7 +217,7 @@ private[akka] class RemoteActorRefProvider( } protected def createRemoteDeploymentWatcher(system: ActorSystemImpl): ActorRef = - system.systemActorOf(remoteSettings.configureDispatcher(Props[RemoteDeploymentWatcher]), "remote-deployment-watcher") + system.systemActorOf(remoteSettings.configureDispatcher(Props[RemoteDeploymentWatcher]()), "remote-deployment-watcher") def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = @@ -361,10 +361,10 @@ private[akka] class RemoteActorRefProvider( private[akka] def resolveActorRefWithLocalAddress(path: String, localAddress: Address): InternalActorRef = { path match { case ActorPathExtractor(address, elems) ⇒ - if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems) + if (hasAddress(address)) + local.resolveActorRef(rootGuardian, elems) else try { - new RemoteActorRef(transport, localAddress, RootActorPath(address) / elems, - Nobody, props = None, deploy = None) + new RemoteActorRef(transport, localAddress, RootActorPath(address) / elems, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ⇒ log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -479,7 +479,6 @@ private[akka] class RemoteActorRef private[akka] ( s"Wrong protocol of [${path}], expected [${t.localAddress.address.protocol}]") case _ ⇒ } - @volatile private[remote] var cachedAssociation: artery.Association = null // used by artery to direct messages to a separate stream for large messages diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 9493dcb7df..77fd6b18cb 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -3,6 +3,7 @@ */ package akka.remote +import akka.remote.artery.compress.CompressionSettings import com.typesafe.config.Config import scala.concurrent.duration._ import akka.util.Timeout @@ -33,6 +34,8 @@ final class RemoteSettings(val config: Config) { val IdleCpuLevel: Int = getInt("akka.remote.artery.advanced.idle-cpu-level").requiring(level ⇒ 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10") + val ArteryCompressionSettings = CompressionSettings(getConfig("akka.remote.artery.advanced.compression")) + val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") val LogSend: Boolean = getBoolean("akka.remote.log-sent-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index 93f1dbbe23..629ddf5b66 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -43,7 +43,7 @@ object AeronSource { class MessageHandler(pool: EnvelopeBufferPool) { def reset(): Unit = messageReceived = null - var messageReceived: EnvelopeBuffer = null + private[remote] var messageReceived: EnvelopeBuffer = null // private to avoid scalac warning about exposing EnvelopeBuffer val fragmentsHandler = new Fragments(data ⇒ messageReceived = data, pool) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c166e170dd..3cb59d8e97 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -3,12 +3,10 @@ */ package akka.remote.artery -import java.io.File -import java.net.InetSocketAddress -import java.nio.channels.DatagramChannel import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicLong +import akka.remote.artery.compress.CompressionProtocol.CompressionMessage + import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.Promise @@ -18,11 +16,8 @@ import scala.util.Success import scala.util.Try import akka.Done import akka.NotUsed -import akka.actor.ActorRef -import akka.actor.Address +import akka.actor._ import akka.actor.Cancellable -import akka.actor.ExtendedActorSystem -import akka.actor.InternalActorRef import akka.event.Logging import akka.event.LoggingAdapter import akka.remote.AddressUidExtension @@ -40,9 +35,9 @@ import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec +import akka.remote.artery.compress.{ AdvertiseCompressionId, InboundCompressionImpl, CompressionProtocol } import akka.stream.AbruptTerminationException import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings import akka.stream.KillSwitches import akka.stream.Materializer import akka.stream.SharedKillSwitch @@ -55,7 +50,6 @@ import akka.util.Helpers.Requiring import akka.util.WildcardTree import io.aeron.Aeron import io.aeron.AvailableImageHandler -import io.aeron.CncFileDescriptor import io.aeron.Image import io.aeron.UnavailableImageHandler import io.aeron.driver.MediaDriver @@ -68,7 +62,6 @@ import java.nio.channels.{ DatagramChannel, FileChannel } import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import io.aeron.CncFileDescriptor import java.util.concurrent.atomic.AtomicLong -import akka.actor.Cancellable import scala.collection.JavaConverters._ import akka.stream.ActorMaterializerSettings import scala.annotation.tailrec @@ -206,8 +199,11 @@ private[akka] trait InboundContext { */ private[akka] object AssociationState { def apply(): AssociationState = - new AssociationState(incarnation = 1, uniqueRemoteAddressPromise = Promise(), - quarantined = ImmutableLongMap.empty[QuarantinedTimestamp]) + new AssociationState( + incarnation = 1, + uniqueRemoteAddressPromise = Promise(), + quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], + outboundCompression = NoOutboundCompression) final case class QuarantinedTimestamp(nanoTime: Long) { override def toString: String = @@ -221,7 +217,8 @@ private[akka] object AssociationState { private[akka] final class AssociationState( val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { + val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp], + val outboundCompression: OutboundCompression) { import AssociationState.QuarantinedTimestamp @@ -247,14 +244,17 @@ private[akka] final class AssociationState( } } - def newIncarnation(remoteAddressPromise: Promise[UniqueAddress]): AssociationState = - new AssociationState(incarnation + 1, remoteAddressPromise, quarantined) + def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompression): AssociationState = + new AssociationState(incarnation + 1, remoteAddressPromise, quarantined, compression) def newQuarantined(): AssociationState = uniqueRemoteAddressPromise.future.value match { case Some(Success(a)) ⇒ - new AssociationState(incarnation, uniqueRemoteAddressPromise, - quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime()))) + new AssociationState( + incarnation, + uniqueRemoteAddressPromise, + quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), + outboundCompression = NoOutboundCompression) // after quarantine no compression needed anymore, drop it case _ ⇒ this } @@ -384,10 +384,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // !!! WARNING !!! This is *NOT* thread safe, private val topLevelFREvents = flightRecorder.createEventSink() - // FIXME: Compression table must be owned by each channel instead - // of having a global one - val compression = new Compression(system) - private val associationRegistry = new AssociationRegistry( remoteAddress ⇒ new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations)) @@ -538,19 +534,22 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundStreams(): Unit = { - runInboundControlStream() - runInboundOrdinaryMessagesStream() + val noCompression = new NoInboundCompression(system) // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082 + val compression = createInboundCompressionTable(this) + + runInboundControlStream(noCompression) + runInboundOrdinaryMessagesStream(compression) if (largeMessageDestinationsEnabled) { runInboundLargeMessagesStream() } } - private def runInboundControlStream(): Unit = { + private def runInboundControlStream(compression: InboundCompression): Unit = { val (ctrl, completed) = if (remoteSettings.TestMode) { val (mgmt, (ctrl, completed)) = aeronSource(controlStreamId, envelopePool) - .via(inboundFlow) + .via(inboundFlow(compression)) .viaMat(inboundTestFlow)(Keep.right) .toMat(inboundControlSink)(Keep.both) .run()(materializer) @@ -558,26 +557,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R (ctrl, completed) } else { aeronSource(controlStreamId, envelopePool) - .via(inboundFlow) + .via(inboundFlow(compression)) .toMat(inboundControlSink)(Keep.right) .run()(materializer) } controlSubject = ctrl - controlSubject.attach(new ControlMessageObserver { - override def notify(inboundEnvelope: InboundEnvelope): Unit = { - inboundEnvelope.message match { - case Quarantined(from, to) if to == localAddress ⇒ - val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address) - publishLifecycleEvent(lifecycleEvent) - // quarantine the other system from here - association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid)) - case _ ⇒ // not interesting - } - } - }) - // ordinary messages stream controlSubject.attach(new ControlMessageObserver { override def notify(inboundEnvelope: InboundEnvelope): Unit = { @@ -592,14 +578,34 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } }) - attachStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream()) + // compression messages + controlSubject.attach(new ControlMessageObserver { + override def notify(inboundEnvelope: InboundEnvelope): Unit = + inboundEnvelope.message match { + case m: CompressionMessage ⇒ + m match { + case CompressionProtocol.ActorRefCompressionAdvertisement(from, ref, id) ⇒ + log.debug("Incoming ActorRef compression advertisement from [{}], allocating: [{} => {}]", from, ref, id) + association(from.address).compression.allocateActorRefCompressionId(ref, id) + system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionAdvertisement(from, ref, id)) + + case CompressionProtocol.ClassManifestCompressionAdvertisement(from, manifest, id) ⇒ + log.debug("Incoming Class Manifest compression advertisement from [{}], allocating: [{} => {}]", from, manifest, id) + association(from.address).compression.allocateClassManifestCompressionId(manifest, id) + system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionAdvertisement(from, manifest, id)) + } + case _ ⇒ // not interested in non CompressionMessages + } + }) + + attachStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream(compression)) } - private def runInboundOrdinaryMessagesStream(): Unit = { + private def runInboundOrdinaryMessagesStream(compression: InboundCompression): Unit = { val completed = if (remoteSettings.TestMode) { val (mgmt, c) = aeronSource(ordinaryStreamId, envelopePool) - .via(inboundFlow) + .via(inboundFlow(compression)) .viaMat(inboundTestFlow)(Keep.right) .toMat(inboundSink)(Keep.both) .run()(materializer) @@ -607,19 +613,21 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R c } else { aeronSource(ordinaryStreamId, envelopePool) - .via(inboundFlow) + .via(inboundFlow(compression)) .toMat(inboundSink)(Keep.right) .run()(materializer) } - attachStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream()) + attachStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream(compression)) } private def runInboundLargeMessagesStream(): Unit = { + val compression = new NoInboundCompression(system) // no compression on large message stream for now + val completed = if (remoteSettings.TestMode) { val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool) - .via(inboundLargeFlow) + .via(inboundLargeFlow(compression)) .viaMat(inboundTestFlow)(Keep.right) .toMat(inboundSink)(Keep.both) .run()(materializer) @@ -627,7 +635,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R c } else { aeronSource(largeStreamId, largeEnvelopePool) - .via(inboundLargeFlow) + .via(inboundLargeFlow(compression)) .toMat(inboundSink)(Keep.right) .run()(materializer) } @@ -678,7 +686,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Future.successful(Done) } - private[remote] def isShutdown(): Boolean = _shutdown + private[remote] def isShutdown: Boolean = _shutdown override def managementCommand(cmd: Any): Future[Boolean] = { if (testStages.isEmpty) @@ -709,8 +717,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R a.send(message, sender, recipient) } - override def association(remoteAddress: Address): Association = + override def association(remoteAddress: Address): Association = { + require(remoteAddress != localAddress.address, "Attemted association with self address!") associationRegistry.association(remoteAddress) + } override def association(uid: Long): OptionVal[Association] = associationRegistry.association(uid) @@ -728,39 +738,46 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) } - def outbound(outboundContext: OutboundContext): Sink[Send, Future[Done]] = { + def outbound(outboundContext: OutboundContext, compression: OutboundCompression): Sink[Send, Future[Done]] = { Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) - .via(encoder) + .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) + .via(encoder(compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) } - def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = { + def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompression): Sink[Send, Future[Done]] = { Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) - .via(createEncoder(largeEnvelopePool)) + .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) + .via(createEncoder(largeEnvelopePool, compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) } - def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = { + def outboundControl(outboundContext: OutboundContext, compression: OutboundCompression): Sink[Send, (OutboundControlIngress, Future[Done])] = { Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) + .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval, remoteSettings.SysMsgBufferSize)) .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) - .via(encoder) + .via(encoder(compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, envelopePool, Duration.Inf, flightRecorder.createEventSink()))(Keep.both) // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } - def createEncoder(bufferPool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = + def createEncoder(compression: OutboundCompression, bufferPool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = Flow.fromGraph(new Encoder(localAddress, system, compression, bufferPool)) - def encoder: Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool) + private def createInboundCompressionTable(inboundContext: InboundContext): InboundCompression = + if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionImpl(system, inboundContext) + else new NoInboundCompression(system) + + def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompression): Flow[Send, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(localAddress, system, compression, pool)) + + def encoder(compression: OutboundCompression): Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool, compression) def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, @@ -771,14 +788,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R inboundEnvelopePool.release(m) } - def createDecoder(bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + def createDecoder(compression: InboundCompression, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = recipient ⇒ provider.resolveActorRefWithLocalAddress(recipient, localAddress.address) Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool, inboundEnvelopePool)) } - def decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = createDecoder(envelopePool) + def decoder(compression: InboundCompression): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = + createDecoder(compression, envelopePool) def inboundSink: Sink[InboundEnvelope, Future[Done]] = Flow[InboundEnvelope] @@ -786,16 +804,16 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .via(new InboundQuarantineCheck(this)) .toMat(messageDispatcherSink)(Keep.right) - def inboundFlow: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + def inboundFlow(compression: InboundCompression): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { Flow[EnvelopeBuffer] .via(killSwitch.flow) - .via(decoder) + .via(decoder(compression)) } - def inboundLargeFlow: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + def inboundLargeFlow(compression: InboundCompression): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { Flow[EnvelopeBuffer] .via(killSwitch.flow) - .via(createDecoder(largeEnvelopePool)) + .via(createDecoder(compression, largeEnvelopePool)) } def inboundControlSink: Sink[InboundEnvelope, (ControlMessageSubject, Future[Done])] = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index c6d27f4c85..4a73807ba8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -9,6 +9,8 @@ import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference +import akka.remote.artery.compress.{ OutboundCompressionImpl, CompressionProtocol } + import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise @@ -79,6 +81,9 @@ private[remote] class Association( // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to // start sending (enqueuing) to the Association immediate after construction. + /** Accesses the currently active outbound compression. */ + def compression: OutboundCompression = associationState.outboundCompression + def createQueue(capacity: Int): Queue[Send] = new ManyToOneConcurrentArrayQueue[Send](capacity) @@ -135,13 +140,13 @@ private[remote] class Association( def completeHandshake(peer: UniqueAddress): Unit = { require( remoteAddress == peer.address, - s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}") + s"wrong remote address in completeHandshake, got ${peer.address}, expected $remoteAddress") val current = associationState current.uniqueRemoteAddressPromise.trySuccess(peer) current.uniqueRemoteAddressValue() match { case Some(`peer`) ⇒ // our value case _ ⇒ - val newState = current.newIncarnation(Promise.successful(peer)) + val newState = current.newIncarnation(Promise.successful(peer), createOutboundCompressionTable(remoteAddress)) if (swapState(current, newState)) { current.uniqueRemoteAddressValue() match { case Some(old) ⇒ @@ -215,7 +220,10 @@ private[remote] class Association( // FIXME we should be able to Send without a recipient ActorRef override val dummyRecipient: RemoteActorRef = - transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef] + try transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef] + catch { + case ex: Exception ⇒ throw new Exception("Bad dummy recipient! RemoteAddress: " + remoteAddress, ex) + } // OutboundContext override def quarantine(reason: String): Unit = { @@ -270,18 +278,24 @@ private[remote] class Association( def associate(): Unit = { if (!controlQueue.isInstanceOf[QueueWrapper]) throw new IllegalStateException("associate() must only be called once") + runOutboundStreams() + } + + private def runOutboundStreams(): Unit = { + // TODO no compression for control / large streams currently + val disableCompression = NoOutboundCompression // it's important to materialize the outboundControl stream first, // so that outboundControlIngress is ready when stages for all streams start - runOutboundControlStream() - runOutboundOrdinaryMessagesStream() + runOutboundControlStream(disableCompression) + runOutboundOrdinaryMessagesStream(CurrentAssociationStateOutboundCompressionProxy) if (largeMessageChannelEnabled) { - runOutboundLargeMessagesStream() + runOutboundLargeMessagesStream(disableCompression) } } - private def runOutboundControlStream(): Unit = { + private def runOutboundControlStream(compression: OutboundCompression): Unit = { // stage in the control stream may access the outboundControlIngress before returned here // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress materializing = new CountDownLatch(1) @@ -294,13 +308,13 @@ private[remote] class Association( val ((queueValue, mgmt), (control, completed)) = Source.fromGraph(new SendQueue[Send]) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundControl(this))(Keep.both) + .toMat(transport.outboundControl(this, compression))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, (control, completed)) } else { Source.fromGraph(new SendQueue[Send]) - .toMat(transport.outboundControl(this))(Keep.both) + .toMat(transport.outboundControl(this, compression))(Keep.both) .run()(materializer) } @@ -310,7 +324,7 @@ private[remote] class Association( _outboundControlIngress = control materializing.countDown() attachStreamRestart("Outbound control stream", completed, cause ⇒ { - runOutboundControlStream() + runOutboundControlStream(compression) cause match { case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID case _ ⇒ quarantine("Outbound control stream restarted") @@ -326,7 +340,7 @@ private[remote] class Association( QueueWrapper(createQueue(capacity)) } - private def runOutboundOrdinaryMessagesStream(): Unit = { + private def runOutboundOrdinaryMessagesStream(compression: OutboundCompression): Unit = { val wrapper = getOrCreateQueueWrapper(queue, queueSize) queue = wrapper // use new underlying queue immediately for restarts @@ -334,13 +348,13 @@ private[remote] class Association( if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[Send]) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outbound(this))(Keep.both) + .toMat(transport.outbound(this, compression))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { Source.fromGraph(new SendQueue[Send]) - .toMat(transport.outbound(this))(Keep.both) + .toMat(transport.outbound(this, compression))(Keep.both) .run()(materializer) } @@ -348,10 +362,10 @@ private[remote] class Association( // replace with the materialized value, still same underlying queue queue = queueValue - attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) + attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream(compression)) } - private def runOutboundLargeMessagesStream(): Unit = { + private def runOutboundLargeMessagesStream(compression: OutboundCompression): Unit = { val wrapper = getOrCreateQueueWrapper(queue, largeQueueSize) largeQueue = wrapper // use new underlying queue immediately for restarts @@ -359,20 +373,20 @@ private[remote] class Association( if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[Send]) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundLarge(this))(Keep.both) + .toMat(transport.outboundLarge(this, compression))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { Source.fromGraph(new SendQueue[Send]) - .toMat(transport.outboundLarge(this))(Keep.both) + .toMat(transport.outboundLarge(this, compression))(Keep.both) .run()(materializer) } queueValue.inject(wrapper.queue) // replace with the materialized value, still same underlying queue largeQueue = queueValue - attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream()) + attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream(compression)) } private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable ⇒ Unit): Unit = { @@ -396,7 +410,35 @@ private[remote] class Association( } } - override def toString(): String = + // TODO: Make sure that once other channels use Compression, each gets it's own + private def createOutboundCompressionTable(remoteAddress: Address): OutboundCompression = { + if (transport.provider.remoteSettings.ArteryCompressionSettings.enabled) { + val compression = new OutboundCompressionImpl(transport.system, remoteAddress) + // FIXME should use verion number of table instead of hashCode + log.info("Creating Outbound compression table ({}) to [{}]", compression.hashCode, remoteAddress) + compression + } else NoOutboundCompression + } + + /* + * This proxy uses the current associationStates compression table, which is reset for a new incarnation. + * This way the same outgoing stream will switch to using the new table without the need of restarting it. + */ + object CurrentAssociationStateOutboundCompressionProxy extends OutboundCompression { + override final def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = + associationState.outboundCompression.allocateActorRefCompressionId(ref, id) + + override final def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = + associationState.outboundCompression.allocateClassManifestCompressionId(manifest, id) + + override final def compressActorRef(ref: ActorRef): Int = + associationState.outboundCompression.compressActorRef(ref) + + override final def compressClassManifest(manifest: String): Int = + associationState.outboundCompression.compressClassManifest(manifest) + } + + override def toString: String = s"Association($localAddress -> $remoteAddress with $associationState)" } diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index b649fd71ae..ee32b9a187 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -4,13 +4,14 @@ package akka.remote.artery -import java.lang.reflect.Field import java.nio.charset.Charset import java.nio.{ ByteBuffer, ByteOrder } +import akka.actor.{ Address, ActorRef } +import akka.remote.artery.compress.{ NoopOutboundCompression, NoopInboundCompression } +import akka.serialization.Serialization import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } -import sun.misc.Cleaner -import akka.util.Unsafe +import akka.util.{ OptionVal, Unsafe } import scala.util.control.NonFatal @@ -68,19 +69,37 @@ private[remote] object EnvelopeBuffer { /** * INTERNAL API + * Decompress and cause compression advertisements. */ -private[remote] trait LiteralCompressionTable { +private[remote] trait InboundCompression { + def hitActorRef(remote: Address, ref: ActorRef): Unit + def decompressActorRef(idx: Int): OptionVal[ActorRef] - def compressActorRef(ref: String): Int - def decompressActorRef(idx: Int): String + def hitClassManifest(remote: Address, manifest: String): Unit + def decompressClassManifest(idx: Int): OptionVal[String] +} +/** + * INTERNAL API + * Compress outgoing data and handle compression advertisements to fill compression table. + */ +private[remote] trait OutboundCompression { + def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit + def compressActorRef(ref: ActorRef): Int + def allocateClassManifestCompressionId(manifest: String, id: Int): Unit def compressClassManifest(manifest: String): Int - def decompressClassManifest(idx: Int): String - } object HeaderBuilder { - def apply(compressionTable: LiteralCompressionTable): HeaderBuilder = new HeaderBuilderImpl(compressionTable) + + // We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl + // we inject no-op compression's of the "other side". + + def in(compression: InboundCompression): HeaderBuilder = new HeaderBuilderImpl(compression, NoopOutboundCompression) + def out(compression: OutboundCompression): HeaderBuilder = new HeaderBuilderImpl(NoopInboundCompression, compression) + + /** INTERNAL API, FOR TESTING ONLY */ + private[remote] def bothWays(in: InboundCompression, out: OutboundCompression): HeaderBuilder = new HeaderBuilderImpl(in, out) } /** @@ -93,8 +112,9 @@ sealed trait HeaderBuilder { def uid_=(u: Long): Unit def uid: Long - def senderActorRef_=(ref: String): Unit - def senderActorRef: String + def senderActorRef_=(ref: ActorRef): Unit + def senderActorRef: OptionVal[ActorRef] + def senderActorRefPath: String def setNoSender(): Unit def isNoSender: Boolean @@ -102,8 +122,9 @@ sealed trait HeaderBuilder { def setNoRecipient(): Unit def isNoRecipient: Boolean - def recipientActorRef_=(ref: String): Unit - def recipientActorRef: String + def recipientActorRef_=(ref: ActorRef): Unit + def recipientActorRef: OptionVal[ActorRef] + def recipientActorRefPath: String def serializer_=(serializer: Int): Unit def serializer: Int @@ -115,7 +136,7 @@ sealed trait HeaderBuilder { /** * INTERNAL API */ -private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompressionTable) extends HeaderBuilder { +private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompression, outboundCompression: OutboundCompression) extends HeaderBuilder { var version: Int = _ var uid: Long = _ @@ -129,64 +150,61 @@ private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompr var _manifest: String = null var _manifestIdx: Int = -1 - def senderActorRef_=(ref: String): Unit = { - _senderActorRef = ref - _senderActorRefIdx = compressionTable.compressActorRef(ref) + def senderActorRef_=(ref: ActorRef): Unit = { + _senderActorRefIdx = outboundCompression.compressActorRef(ref) + if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref) // includes local address from `currentTransportInformation` } - def setNoSender(): Unit = { _senderActorRef = null _senderActorRefIdx = EnvelopeBuffer.DeadLettersCode } - def isNoSender: Boolean = (_senderActorRef eq null) && _senderActorRefIdx == EnvelopeBuffer.DeadLettersCode - - def senderActorRef: String = { + def senderActorRef: OptionVal[ActorRef] = + if (_senderActorRef eq null) inboundCompression.decompressActorRef(_senderActorRefIdx) + else OptionVal.None + def senderActorRefPath: String = if (_senderActorRef ne null) _senderActorRef else { - _senderActorRef = compressionTable.decompressActorRef(_senderActorRefIdx) + _senderActorRef = inboundCompression.decompressActorRef(_senderActorRefIdx).get.path.toSerializationFormat _senderActorRef } - } def setNoRecipient(): Unit = { _recipientActorRef = null _recipientActorRefIdx = EnvelopeBuffer.DeadLettersCode } - def isNoRecipient: Boolean = (_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode - def recipientActorRef_=(ref: String): Unit = { - _recipientActorRef = ref - _recipientActorRefIdx = compressionTable.compressActorRef(ref) + def recipientActorRef_=(ref: ActorRef): Unit = { + _recipientActorRefIdx = outboundCompression.compressActorRef(ref) + if (_recipientActorRefIdx == -1) _recipientActorRef = ref.path.toSerializationFormat } - - def recipientActorRef: String = { + def recipientActorRef: OptionVal[ActorRef] = + if (_recipientActorRef eq null) inboundCompression.decompressActorRef(_recipientActorRefIdx) + else OptionVal.None + def recipientActorRefPath: String = if (_recipientActorRef ne null) _recipientActorRef else { - _recipientActorRef = compressionTable.decompressActorRef(_recipientActorRefIdx) + _recipientActorRef = inboundCompression.decompressActorRef(_recipientActorRefIdx).get.path.toSerializationFormat _recipientActorRef } - } override def serializer_=(serializer: Int): Unit = { _serializer = serializer } - override def serializer: Int = _serializer override def manifest_=(manifest: String): Unit = { - _manifest = manifest - _manifestIdx = compressionTable.compressClassManifest(manifest) + _manifestIdx = outboundCompression.compressClassManifest(manifest) + if (_manifestIdx == -1) _manifest = manifest } - override def manifest: String = { if (_manifest ne null) _manifest else { - _manifest = compressionTable.decompressClassManifest(_manifestIdx) + _manifest = inboundCompression.decompressClassManifest(_manifestIdx).get _manifest } } @@ -341,10 +359,10 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { */ def tryCleanDirectByteBuffer(): Unit = try { if (byteBuffer.isDirect) { - val cleanerMethod = byteBuffer.getClass().getMethod("cleaner") + val cleanerMethod = byteBuffer.getClass.getMethod("cleaner") cleanerMethod.setAccessible(true) val cleaner = cleanerMethod.invoke(byteBuffer) - val cleanMethod = cleaner.getClass().getMethod("clean") + val cleanMethod = cleaner.getClass.getMethod("clean") cleanMethod.setAccessible(true) cleanMethod.invoke(cleaner) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 0104daa779..61ae1368df 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -1,17 +1,19 @@ + +/** + * Copyright (C) 2016 Lightbend Inc. + */ package akka.remote.artery import scala.concurrent.duration._ import scala.util.control.NonFatal -import akka.actor.{ ActorRef, InternalActorRef } -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem +import akka.actor._ import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress } import akka.remote.EndpointManager.Send import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.util.OptionVal +import akka.util.{ ByteString, OptionVal } import akka.actor.EmptyLocalActorRef import akka.stream.stage.TimerGraphStageLogic @@ -21,7 +23,7 @@ import akka.stream.stage.TimerGraphStageLogic private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, system: ActorSystem, - compressionTable: LiteralCompressionTable, + compression: OutboundCompression, bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { @@ -32,58 +34,35 @@ private[remote] class Encoder( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { - private val headerBuilder = HeaderBuilder(compressionTable) + private val headerBuilder = HeaderBuilder.out(compression) headerBuilder.version = ArteryTransport.Version headerBuilder.uid = uniqueLocalAddress.uid private val localAddress = uniqueLocalAddress.address private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) - private val senderCache = new java.util.HashMap[ActorRef, String] - private var recipientCache = new java.util.HashMap[ActorRef, String] - override protected def logSource = classOf[Encoder] override def onPush(): Unit = { val send = grab(in) val envelope = bufferPool.acquire() - val recipientStr = recipientCache.get(send.recipient) match { - case null ⇒ - val s = send.recipient.path.toSerializationFormat - // FIXME this cache will be replaced by compression table - if (recipientCache.size() >= 1000) - recipientCache.clear() - recipientCache.put(send.recipient, s) - s - case s ⇒ s - } - headerBuilder.recipientActorRef = recipientStr - - send.senderOption match { - case OptionVal.None ⇒ headerBuilder.setNoSender() - case OptionVal.Some(sender) ⇒ - val senderStr = senderCache.get(sender) match { - case null ⇒ - val s = sender.path.toSerializationFormatWithAddress(localAddress) - // FIXME we might need an efficient LRU cache, or replaced by compression table - if (senderCache.size() >= 1000) - senderCache.clear() - senderCache.put(sender, s) - s - case s ⇒ s - } - headerBuilder.senderActorRef = senderStr - } + // internally compression is applied by the builder: + headerBuilder.recipientActorRef = send.recipient try { // avoiding currentTransportInformation.withValue due to thunk allocation val oldValue = Serialization.currentTransportInformation.value try { Serialization.currentTransportInformation.value = serializationInfo + + send.senderOption match { + case OptionVal.None ⇒ headerBuilder.setNoSender() + case OptionVal.Some(s) ⇒ headerBuilder.senderActorRef = s + } + MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope) - } finally - Serialization.currentTransportInformation.value = oldValue + } finally Serialization.currentTransportInformation.value = oldValue envelope.byteBuffer.flip() push(out, envelope) @@ -130,7 +109,7 @@ private[remote] class Decoder( inboundContext: InboundContext, system: ExtendedActorSystem, resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, - compressionTable: LiteralCompressionTable, + compression: InboundCompression, bufferPool: EnvelopeBufferPool, inEnvelopePool: ObjectPool[InboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") @@ -141,12 +120,9 @@ private[remote] class Decoder( new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { import Decoder.RetryResolveRemoteDeployedRecipient private val localAddress = inboundContext.localAddress.address - private val headerBuilder = HeaderBuilder(compressionTable) + private val headerBuilder = HeaderBuilder.in(compression) private val serialization = SerializationExtension(system) - private val recipientCache = new java.util.HashMap[String, InternalActorRef] - private val senderCache = new java.util.HashMap[String, ActorRef] - private val retryResolveRemoteDeployedRecipientInterval = 50.millis private val retryResolveRemoteDeployedRecipientAttempts = 20 @@ -156,35 +132,32 @@ private[remote] class Decoder( val envelope = grab(in) envelope.parseHeader(headerBuilder) - // FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances - // in case of compression is enabled - // FIXME: Is localAddress really needed? - - val sender = - if (headerBuilder.isNoSender) - OptionVal.None - else { - senderCache.get(headerBuilder.senderActorRef) match { - case null ⇒ - val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef) - // FIXME this cache will be replaced by compression table - if (senderCache.size() >= 1000) - senderCache.clear() - senderCache.put(headerBuilder.senderActorRef, ref) - OptionVal(ref) - case ref ⇒ OptionVal(ref) - } - } - - val recipient = - if (headerBuilder.isNoRecipient) - OptionVal.None - else - resolveRecipient(headerBuilder.recipientActorRef) - val originUid = headerBuilder.uid val association = inboundContext.association(originUid) + val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef match { + case OptionVal.Some(ref) ⇒ OptionVal(ref.asInstanceOf[InternalActorRef]) + case OptionVal.None ⇒ resolveRecipient(headerBuilder.recipientActorRefPath) + } + + val sender: InternalActorRef = headerBuilder.senderActorRef match { + case OptionVal.Some(ref) ⇒ ref.asInstanceOf[InternalActorRef] + case OptionVal.None ⇒ resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath) + } + + // --- hit refs and manifests for heavy-hitter counting + association match { + case OptionVal.Some(assoc) ⇒ + val remoteAddress = assoc.remoteAddress + compression.hitActorRef(remoteAddress, sender) + if (recipient.isDefined) compression.hitActorRef(remoteAddress, recipient.get) + compression.hitClassManifest(remoteAddress, headerBuilder.manifest) + case _ ⇒ + // we don't want to record hits for compression while handshake is still in progress. + log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") + } + // --- end of hit refs and manifests for heavy-hitter counting + try { val deserializedMessage = MessageSerializer.deserializeForArtery( system, serialization, headerBuilder, envelope) @@ -194,7 +167,7 @@ private[remote] class Decoder( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? deserializedMessage, - sender, + OptionVal.Some(sender), // FIXME: No need for an option, decode simply to deadLetters instead originUid, association) @@ -203,7 +176,7 @@ private[remote] class Decoder( // recipient for the first message that is sent to it, best effort retry scheduleOnce(RetryResolveRemoteDeployedRecipient( retryResolveRemoteDeployedRecipientAttempts, - headerBuilder.recipientActorRef, decoded), retryResolveRemoteDeployedRecipientInterval) + headerBuilder.recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval) } else push(out, decoded) } catch { @@ -218,28 +191,12 @@ private[remote] class Decoder( } private def resolveRecipient(path: String): OptionVal[InternalActorRef] = { - recipientCache.get(path) match { - case null ⇒ - def addToCache(resolved: InternalActorRef): Unit = { - // FIXME we might need an efficient LRU cache, or replaced by compression table - if (recipientCache.size() >= 1000) - recipientCache.clear() - recipientCache.put(path, resolved) - } - - resolveActorRefWithLocalAddress(path) match { - case empty: EmptyLocalActorRef ⇒ - val pathElements = empty.path.elements - if (pathElements.nonEmpty && pathElements.head == "remote") - OptionVal.None - else { - addToCache(empty) - OptionVal(empty) - } - case ref ⇒ - addToCache(ref) - OptionVal(ref) - } + resolveActorRefWithLocalAddress(path) match { + case empty: EmptyLocalActorRef ⇒ + val pathElements = empty.path.elements + // FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer) + if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None + else OptionVal(empty) case ref ⇒ OptionVal(ref) } } @@ -254,11 +211,10 @@ private[remote] class Decoder( if (attemptsLeft > 0) scheduleOnce(RetryResolveRemoteDeployedRecipient( attemptsLeft - 1, - headerBuilder.recipientActorRef, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval) + recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval) else { val recipient = resolveActorRefWithLocalAddress(recipientPath) - // only retry for the first message - recipientCache.put(recipientPath, recipient) + // FIXME only retry for the first message, need to keep them in a cache push(out, inboundEnvelope.withRecipient(recipient)) } case OptionVal.Some(recipient) ⇒ @@ -270,3 +226,4 @@ private[remote] class Decoder( setHandlers(in, out, this) } } + diff --git a/akka-remote/src/main/scala/akka/remote/artery/Compression.scala b/akka-remote/src/main/scala/akka/remote/artery/Compression.scala deleted file mode 100644 index 50206833a1..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/Compression.scala +++ /dev/null @@ -1,20 +0,0 @@ -package akka.remote.artery - -import akka.actor.ActorSystem - -/** - * INTERNAL API - */ -// FIXME: Dummy compression table, needs to be replaced by the real deal -// Currently disables all compression -private[remote] class Compression(system: ActorSystem) extends LiteralCompressionTable { - // FIXME: Of course it is foolish to store this as String, but this is a stub - val deadLettersString = system.deadLetters.path.toSerializationFormat - - override def compressActorRef(ref: String): Int = -1 - override def decompressActorRef(idx: Int): String = ??? - - override def compressClassManifest(manifest: String): Int = -1 - override def decompressClassManifest(idx: Int): String = ??? - -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala index 1f61bb2063..93d034622c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala @@ -35,4 +35,10 @@ object FlightRecorderEvents { val AeronSource_Received = 23 val AeronSource_DelegateToTaskRunner = 24 + // Compression events + val Compression_CompressedActorRef = 25 + val Compression_AllocatedActorRefCompressionId = 26 + val Compression_CompressedManifest = 27 + val Compression_AllocatedManifestCompressionId = 28 + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 22b633208f..8a66226d6f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -3,6 +3,8 @@ */ package akka.remote.artery +import akka.actor.ActorSystem + import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.remote.EndpointManager.Send @@ -47,8 +49,10 @@ private[akka] object OutboundHandshake { /** * INTERNAL API */ -private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration, - retryInterval: FiniteDuration, injectHandshakeInterval: FiniteDuration) +private[akka] class OutboundHandshake( + system: ActorSystem, + outboundContext: OutboundContext, timeout: FiniteDuration, + retryInterval: FiniteDuration, injectHandshakeInterval: FiniteDuration) extends GraphStage[FlowShape[Send, Send]] { val in: Inlet[Send] = Inlet("OutboundHandshake.in") diff --git a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala new file mode 100644 index 0000000000..be70489b85 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.{ Address, InternalActorRef, ActorSystem, ActorRef } +import akka.util.OptionVal + +/** + * INTERNAL API + * + * Literarily, no compression! + */ +final class NoInboundCompression(system: ActorSystem) extends InboundCompression { + override def hitActorRef(address: Address, ref: ActorRef): Unit = () + override def decompressActorRef(idx: Int): OptionVal[ActorRef] = + if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") + else if (idx == 0) OptionVal.Some(system.deadLetters) // special case deadLetters + else OptionVal.None + + override def hitClassManifest(address: Address, manifest: String): Unit = () + override def decompressClassManifest(idx: Int): OptionVal[String] = + if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") + else OptionVal.None +} + +/** + * INTERNAL API + * + * Literarily, no compression! + */ +object NoOutboundCompression extends OutboundCompression { + override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = () + override def compressActorRef(ref: ActorRef): Int = -1 + + override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = () + override def compressClassManifest(manifest: String): Int = -1 +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressionTables.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressionTables.scala new file mode 100644 index 0000000000..9aebf26f62 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressionTables.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.{ Address, ActorRef, ActorSystem } +import akka.remote.artery._ +import akka.remote.artery.compress.CompressionProtocol.Events +import akka.serialization.Serialization +import akka.stream.impl.ConstantFun +import akka.util.OptionVal + +/** INTERNAL API */ +private[remote] final class OutboundCompressionImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompression { + + private val settings = CompressionSettings(system) + + private val actorRefsOut = new OutboundActorRefCompressionTable(system, remoteAddress) + + private val classManifestsOut = new OutboundCompressionTable[String](system, remoteAddress) + + // actor ref compression --- + + override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = actorRefsOut.register(ref, id) + override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) + + // class manifest compression --- + + override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) + override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = classManifestsOut.register(manifest, id) +} + +/** INTERNAL API */ +private[remote] final class InboundCompressionImpl( + system: ActorSystem, + inboundContext: InboundContext +) extends InboundCompression { + + private val settings = CompressionSettings(system) + private val log = system.log + + private val localAddress = inboundContext.localAddress + + // TODO maybe use inbound context to get remoteAddress instead? + val advertiseActorRef = new AdvertiseCompressionId[ActorRef] { + override def apply(remoteAddress: Address, ref: ActorRef, id: Int): Unit = { + + log.debug(s"Advertise ActorRef compression [$ref => $id] to [$remoteAddress]") + // TODO could use remote address via association lookup??? could be more lookups though + inboundContext.sendControl(remoteAddress, CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, ref, id)) + } + } + val advertiseManifest = new AdvertiseCompressionId[String] { + override def apply(remoteAddress: Address, man: String, id: Int): Unit = { + log.error(s"Advertise ClassManifest compression [$man => $id] to [$remoteAddress]") + inboundContext.sendControl(remoteAddress, CompressionProtocol.ClassManifestCompressionAdvertisement(localAddress, man, id)) + } + } + + private val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) + private val actorRefsIn = new InboundActorRefCompressionTable(system, actorRefHitters, advertiseActorRef) + + private val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) + private val classManifestsIn = new InboundCompressionTable[String](system, manifestHitters, ConstantFun.scalaIdentityFunction, advertiseManifest) + + // actor ref compression --- + + override def decompressActorRef(idx: Int): OptionVal[ActorRef] = { + val value = actorRefsIn.decompress(idx) + OptionVal.Some(value) + } + override def hitActorRef(address: Address, ref: ActorRef): Unit = { + actorRefsIn.increment(address, ref, 1L) + } + + // class manifest compression --- + + override def decompressClassManifest(idx: Int): OptionVal[String] = { + val value = classManifestsIn.decompress(idx) + OptionVal.Some(value) + } + override def hitClassManifest(address: Address, manifest: String): Unit = { + classManifestsIn.increment(address, manifest, 1L) + } +} + +object NoopInboundCompression extends InboundCompression { + override def hitActorRef(remote: Address, ref: ActorRef): Unit = () + override def decompressActorRef(idx: Int): OptionVal[ActorRef] = OptionVal.None + + override def hitClassManifest(remote: Address, manifest: String): Unit = () + override def decompressClassManifest(idx: Int): OptionVal[String] = OptionVal.None +} + +object NoopOutboundCompression extends OutboundCompression { + override def compressActorRef(ref: ActorRef): Int = -1 + override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = () + + override def compressClassManifest(manifest: String): Int = -1 + override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = () +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/AdvertiseCompressionId.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/AdvertiseCompressionId.scala new file mode 100644 index 0000000000..92ef0a5840 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/AdvertiseCompressionId.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery.compress + +import akka.actor.Address + +/** Callback invoked when a compression id allocation should be advertised to the remote actor system. */ +trait AdvertiseCompressionId[T] { + def apply(remoteAddress: Address, ref: T, id: Int): Unit +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala new file mode 100644 index 0000000000..4dfe2763ce --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.{ ActorRef, Address } +import akka.remote.UniqueAddress +import akka.remote.artery.ControlMessage + +// FIXME serialization +/** INTERNAL API */ +object CompressionProtocol { + + /** INTERNAL API */ + sealed trait CompressionMessage + + /** + * INTERNAL API + * Sent by the "receiving" node after allocating a compression id to a given [[akka.actor.ActorRef]] + */ + private[remote] final case class ActorRefCompressionAdvertisement(from: UniqueAddress, ref: ActorRef, id: Int) + extends ControlMessage with CompressionMessage + + /** + * INTERNAL API + * Sent by the "receiving" node after allocating a compression id to a given class manifest + */ + private[remote] final case class ClassManifestCompressionAdvertisement(from: UniqueAddress, manifest: String, id: Int) + extends ControlMessage with CompressionMessage + + /** INTERNAL API */ + private[akka] object Events { + /** INTERNAL API */ + private[akka] sealed trait Event + + /** INTERNAL API */ + final case class HeavyHitterDetected(key: Any, id: Int, count: Long) extends Event + + /** INTERNAL API */ + final case class ReceivedCompressionAdvertisement(from: UniqueAddress, key: Any, id: Int) extends Event + + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala new file mode 100644 index 0000000000..2e27d244d5 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery.compress + +import akka.actor.ActorSystem +import com.typesafe.config.Config + +/** INTERNAL API */ +private[akka] class CompressionSettings(_config: Config) { + val enabled = _config.getBoolean("enabled") + @inline private def globalEnabled = enabled + + val debug = _config.getBoolean("debug") + + object actorRefs { + private val c = _config.getConfig("actor-refs") + + val enabled = globalEnabled && c.getBoolean("enabled") + val max = c.getInt("max") + } + object manifests { + private val c = _config.getConfig("manifests") + + val enabled = globalEnabled && c.getBoolean("enabled") + val max = c.getInt("max") + } +} + +/** INTERNAL API */ +private[akka] object CompressionSettings { // TODO make it an extension + def apply(config: Config): CompressionSettings = new CompressionSettings(config) + def apply(system: ActorSystem): CompressionSettings = + new CompressionSettings( + system.settings.config.getConfig("akka.remote.artery.advanced.compression")) +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressionTable.scala new file mode 100644 index 0000000000..bae6264695 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressionTable.scala @@ -0,0 +1,161 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.{ Address, ActorRef, ActorSystem } +import akka.event.Logging + +final class InboundActorRefCompressionTable( + system: ActorSystem, + heavyHitters: TopHeavyHitters[ActorRef], + onNewHeavyHitterDetected: AdvertiseCompressionId[ActorRef] +) extends InboundCompressionTable[ActorRef](system, heavyHitters, _.path.toSerializationFormat, onNewHeavyHitterDetected) { + + preAllocate( + system.deadLetters + ) + + /* Since the table is empty here, anything we increment here becomes a heavy hitter immediately. */ + def preAllocate(allocations: ActorRef*): Unit = { + allocations foreach { case ref ⇒ increment(null, ref, 100000) } + } + + override def shouldAdvertiseCompressionId(idx: Int): Boolean = + idx > 0 // 0 is special => deadLetters + + override def decompress(idx: Int): ActorRef = + if (idx == 0) system.deadLetters + else super.decompress(idx) +} + +/** + * Handles counting and detecting of heavy-hitters and compressing them via a table lookup. + * Mutable and not thread-safe. + * + * Compression flow goes like: + * [1] on each message we add the actor path here + * [2] if it becomes a heavy hitter, we allocate an identifier for it and invoke the callback + * [3]> the callback for example then triggers an CompressionAdvertisement to the receiving side + */ +// TODO should the onHeavyHitter be inside HeavyHitters? +class InboundCompressionTable[T]( + system: ActorSystem, + heavyHitters: TopHeavyHitters[T], + convertKeyToString: T ⇒ String, + onNewHeavyHitterDetected: AdvertiseCompressionId[T]) { + require(heavyHitters != null, "heavyHitters must not be null") + + private val settings = CompressionSettings(system) + val log = Logging(system, "InboundCompressionTable") + + // TODO calibrate properly (h/w have direct relation to preciseness and max capacity) + private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt) + + @volatile private[this] var compressionAllocations = Map.empty[Int, T] // TODO replace with a specialized LongMap + private[this] var currentCompressionId = InboundCompressionTable.CompressionAllocationCounterStart + + /** + * Decompress given identifier into original String representation. + * + * @throws UnknownCompressedIdException if given id is not known, this may indicate a bug – such situation should not happen. + */ + def decompress(idx: Int): T = { + if (settings.debug) log.debug(s"Decompress [{}] => {}", idx, compressionAllocations.get(idx)) + compressionAllocations.get(idx) match { + case Some(value) ⇒ value + case None ⇒ throw new UnknownCompressedIdException(idx) + } + } + + /** + * Add `n` occurance for the given key and call `heavyHittedDetected` if element has become a heavy hitter. + * Empty keys are omitted. + */ + // TODO not so happy about passing around address here, but in incoming there's no other earlier place to get it? + def increment(remoteAddress: Address, value: T, n: Long): Unit = { + val key = convertKeyToString(value) + if (shouldIgnore(key)) { + // ignore... + } else { + // val countBefore = cms.estimateCount(key) + val count = cms.addAndEstimateCount(key, n) + // log.warning(s"HIT: increment $key + $n => ($countBefore->) $count; (addAndCheckIfheavyHitterDetected(value, count) = ${addAndCheckIfheavyHitterDetected(value, count)}); (!wasCompressedPreviously(key) = ${!wasCompressedPreviously(key)})") + + // TODO optimise order of these, what is more expensive? (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering. + if (addAndCheckIfheavyHitterDetected(value, count) && !wasCompressedPreviously(key)) { + val idx = allocateCompressedId(value) + log.debug("Allocated compression id [" + idx + "] for [" + value + "], in association with [" + remoteAddress + "]") + if (shouldAdvertiseCompressionId(idx)) { // TODO change to "time based accumulate new table => advertise it" + // TODO guard with if + log.debug(s"Inbound: Heavy hitter detected: [{} => $idx], {} hits recorded for it (confidence: {}, relative error (eps) {}).\n" + + s"All allocations: ${compressionAllocations}", key, count, cms.getConfidence, cms.getRelativeError) + onNewHeavyHitterDetected(remoteAddress, value, idx) // would be used to signal via side-channel to OutboundCompression that we want to send a ActorRefCompressionAdvertisement + } + } + } + } + + /** Some compression IDs are special and known upfront by both sides, thus need not be advertised (e.g. deadLetters => 0) */ + def shouldAdvertiseCompressionId(idx: Int): Boolean = + true // TODO this will be different in the "advertise entire table mode", it will be "once table is big enough or much time passed" + + private def shouldIgnore(key: String) = { // TODO this is hacky, if we'd do this we trigger compression too early (before association exists, so control messages fail) + key match { + case null ⇒ true + case "" ⇒ true // empty class manifest for example + case _ ⇒ key.endsWith("/system/dummy") || key.endsWith("/") // TODO dummy likely shouldn't exist? can we remove it? + } + } + + // TODO this must be optimised, we really don't want to scan the entire key-set each time to make sure + private def wasCompressedPreviously(key: String): Boolean = + compressionAllocations.values.exists(_ == key) // TODO expensive, aprox or something sneakier? + + /** Mutates heavy hitters */ + private def addAndCheckIfheavyHitterDetected(value: T, count: Long): Boolean = { + heavyHitters.update(value, count) + } + + private def allocateCompressedId(value: T): Int = { + val idx = nextCompressionId() + compressionAllocations.get(idx) match { + case Some(previouslyCompressedValue) ⇒ + // should never really happen, but let's not assume that + throw new ExistingcompressedIdReuseAttemptException(idx, previouslyCompressedValue) + + case None ⇒ + // good, the idx is not used so we can allocate it + compressionAllocations = compressionAllocations.updated(idx, value) + idx + } + } + + private def nextCompressionId(): Int = { + val id = currentCompressionId + currentCompressionId += 1 + id + } + + override def toString = + s"""${getClass.getSimpleName}(countMinSketch: $cms, heavyHitters: $heavyHitters)""" + +} + +object InboundCompressionTable { + val CompressionAllocationCounterStart = 0 + // val CompressionAllocationCounterStart = 64L // we leave 64 slots (0 counts too) for pre-allocated Akka compressions +} + +final class ExistingcompressedIdReuseAttemptException(id: Long, value: Any) + extends RuntimeException( + s"Attempted to re-allocate compressedId [$id] which is still in use for compressing [$value]! " + + s"This should never happen and is likely an implementation bug.") + +final class UnknownCompressedIdException(id: Long) + extends RuntimeException( + s"Attempted de-compress unknown id [$id]! " + + s"This could happen if this node has started a new ActorSystem bound to the same address as previously, " + + s"and previous messages from a remote system were still in flight (using an old compression table). " + + s"The remote system is expected to drop the compression table and this system will advertise a new one.") diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompressionTable.scala new file mode 100644 index 0000000000..0b22effa2b --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompressionTable.scala @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.{ Address, ActorRef, ActorSystem } +import akka.event.Logging + +import scala.annotation.tailrec + +final class OutboundActorRefCompressionTable(system: ActorSystem, remoteAddress: Address) + extends OutboundCompressionTable[ActorRef](system, remoteAddress) { + + preAllocate( + system.deadLetters → 0 + ) + + // if (system.toString.contains("systemB")) + // system.log.error(new Throwable, "new OutboundActorRefCompressionTable = " + this.hashCode()) + + def preAllocate(allocations: (ActorRef, Int)*): Unit = + allocations foreach { case (ref, id) ⇒ register(ref, id) } +} + +/** + * Base class for all outgoing compression. + * Encapsulates the compressedId registration and lookup. + * + * Not thread safe. + */ +class OutboundCompressionTable[T](system: ActorSystem, remoteAddress: Address) { + import OutboundCompressionTable._ + + private val settings = CompressionSettings(system) + + private val log = system.log + + // TODO can we specialize this? (tuning due here) + @volatile private[this] var backing = Map.empty[T, Int] // TODO could use unsafe to swap the map instead of volatile + + // mapping guarding + private[this] var compressionIdsAllocated = -1 + private[this] var aheadAllocatedCompressionIds = Set.empty[Int] + + def register(value: T, id: Int): Unit = { + backing.get(value) match { + case None if isNextCompressionId(id) ⇒ + log.debug("Outbound: Registering new compression from [{}] to [{}].", value, id) // TODO should be debug + addFastForwardCompressionIdsAllocatedCounter() + backing = backing.updated(value, id) + + if (settings.debug) log.debug("Outgoing: Updated compression table state: \n{}", toDebugString) // TODO debug + + case None ⇒ + // TODO could be wrong? since we can not guarantee alocations come in sequence? + if (compressionIdAlreadyAllocated(id)) + throw new AllocatedSameIdMultipleTimesException(id, backing.find(_._2 == id).get._1, value) + + aheadAllocatedCompressionIds += id + backing = backing.updated(value, id) + + case Some(existingId) ⇒ + throw new ConflictingCompressionException(value, id, existingId) + } + } + + def compressionIdAlreadyAllocated(id: Int): Boolean = + id <= compressionIdsAllocated || aheadAllocatedCompressionIds.contains(id) + + def compress(value: T): Int = { + backing.get(value) match { // TODO possibly optimise avoid the Option? Depends on used Map + case None ⇒ NotCompressedId + case Some(id) ⇒ id + } + } + + private def isNextCompressionId(id: Int): Boolean = + id == compressionIdsAllocated + 1 + + private def addFastForwardCompressionIdsAllocatedCounter(): Unit = { + @tailrec def fastForwardConsume(): Unit = { + val nextId = compressionIdsAllocated + 1 + if (aheadAllocatedCompressionIds.contains(nextId)) { + aheadAllocatedCompressionIds = aheadAllocatedCompressionIds.filterNot(_ == nextId) + compressionIdsAllocated += 1 + fastForwardConsume() + } else () + } + + compressionIdsAllocated += 1 + fastForwardConsume() + } + + def toDebugString: String = { + val pad = backing.keys.iterator.map(_.toString.length).max + s"""${Logging.simpleName(getClass)}( + | hashCode: ${this.hashCode()} to [$remoteAddress] + | compressionIdsAllocated: ${compressionIdsAllocated + 1}, + | aheadAllocatedCompressionIds: $aheadAllocatedCompressionIds) + | + | ${backing.map { case (k, v) ⇒ k.toString.padTo(pad, " ").mkString("") + " => " + v }.mkString("\n ")} + |)""".stripMargin + } + + override def toString = + s"""${Logging.simpleName(getClass)}(compressionIdsAllocated: ${compressionIdsAllocated + 1}, aheadAllocatedCompressionIds: $aheadAllocatedCompressionIds)""" +} +object OutboundCompressionTable { + // format: OFF + final val DeadLettersId = 0 + final val NotCompressedId = -1 + // format: ON +} + +final class ConflictingCompressionException(value: Any, id: Int, existingId: Int) + extends IllegalStateException( + s"Value [$value] was already given a compression id [$id], " + + s"yet new compressionId for it was given: $existingId. This could lead to inconsistencies!") + +final class AllocatedSameIdMultipleTimesException(id: Int, previousValue: Any, conflictingValue: Any) + extends IllegalStateException( + s"Attempted to allocate compression id [$id] second time, " + + s"was already bound to value [$previousValue], " + + s"tried to bind to [$conflictingValue]!") diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala new file mode 100644 index 0000000000..70fb236702 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery.compress + +import java.util +import java.util.Objects + +import akka.japi.Util + +import scala.annotation.{ switch, tailrec } +import scala.collection.immutable + +/** + * INTERNAL API + * + * Mutable, open-addressing with linear-probing (though naive one which in theory could get pathological) heavily optimised "top N heavy hitters" data-structure. + * + * Keeps a number of specific heavy hitters around in memory. + * + * See also Section 5.2 of http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf + * for a discussion about the assumptions made and guarantees about the Heavy Hitters made in this model. + * We assume the Cash Register model in which there are only additions, which simplifies HH detecion significantly. + */ +private[remote] final class TopHeavyHitters[T](val max: Int) { + import TopHeavyHitters._ + private[this] var _lowestHitterIdx: Int = 0 + + private[this] val hashes: Array[Int] = Array.ofDim(max) + private[this] val items: Array[T] = Array.ofDim[Object](max).asInstanceOf[Array[T]] + private[this] val weights: Array[Long] = Array.ofDim(max) + + /** Slow operation, mostly exposed for testing and debugging purposes, avoid using in hot paths. */ + def itemsSnapshot: immutable.Seq[T] = Util.immutableSeq(items).filter(_ != null) + + def toDebugString = + s"""TopHeavyHitters( + | max: $max, + | lowestHitterIdx: $lowestHitterIdx (weight: $lowestHitterWeight) + | + | hashes: ${hashes.toList.mkString("[", ", ", "]")} + | weights: ${weights.toList.mkString("[", ", ", "]")} + | items: ${items.toList.mkString("[", ", ", "]")} + |)""".stripMargin + + /** + * Attempt adding item to heavy hitters set, if it does not fit in the top yet, + * it will be dropped and the method will return `false`. + * + * @return `true` if the added item has become a heavy hitter. + */ + // TODO possibly can be optimised further? (there is a benchmark) + def update(item: T, count: Long): Boolean = + isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway + val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode + (findHashIdx(0, hashCode): @switch) match { // worst case O(n), can't really bin search here since indexes are kept in synch with other arrays hmm... + case -1 ⇒ + // not previously heavy hitter + insertKnownNewHeavy(hashCode, item, count) // O(1) + rarely O(n) if needs to update lowest hitter + + case potentialIndexGuess ⇒ + // the found index could be one of many which hash to the same value (we're using open-addressing), + // so it is only used as hint for the replace call. If the value matches, we're good, if not we need to search from here onwards. + val actualIdx = findItemIdx(potentialIndexGuess, hashCode, item) + + if (actualIdx == -1) insertKnownNewHeavy(hashCode, item, count) // O(1) + O(n), we simply replace the current lowest heavy hitter + else replaceExistingHeavyHitter(actualIdx, hashCode, item, count) // usually O(1), worst case O(n) if we need to scan due to hash conflicts + } + } + + def isHeavy(count: Long): Boolean = + count > lowestHitterWeight + + @tailrec private def findItemIdx(searchFromIndex: Int, hashCode: HashCodeVal, o: T): Int = + if (searchFromIndex == -1) -1 + else if (Objects.equals(items(searchFromIndex), o)) searchFromIndex + else findItemIdx(findHashIdx(searchFromIndex + 1, hashCode), hashCode, o) + + /** + * Replace existing heavy hitter – give it a new `count` value. + * If it was the lowest heavy hitter we update the `_lowestHitterIdx` as well, otherwise there is no need to. + * + * @return `false` to indicate "no, this insertion did not make this item a new heavy hitter" if update was successful, + * otherwise might throw [[NoSuchElementException]] if the `item` actually was not found + */ + @tailrec private def replaceExistingHeavyHitter(foundHashIndex: Int, hashCode: HashCodeVal, item: T, count: Long): Boolean = + if (foundHashIndex == -1) throw new NoSuchElementException(s"Item $item is not present in HeavyHitters, can not replace it!") + else if (Objects.equals(items(foundHashIndex), item)) { + putCount(foundHashIndex, count) // we don't need to change `hashCode` or `item`, those remain the same + if (foundHashIndex == lowestHitterIdx) updateLowestHitterIdx() // need to update the lowestHitter since we just bumped its count + false // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) + } else replaceExistingHeavyHitter(findHashIdx(foundHashIndex + 1, hashCode), hashCode, item, count) // recurse + + private def findHashIdx(searchFromIndex: Int, hashCode: HashCodeVal): Int = + findEqIndex(hashes, searchFromIndex, hashCode.get) + + /** + * Puts the item and additional information into the index of the current lowest hitter. + * + * @return index at which the insertion was performed + */ + private def insertKnownNewHeavy(hashCode: HashCodeVal, item: T, count: Long): Boolean = { + put(_lowestHitterIdx, hashCode, item, count) + updateLowestHitterIdx() + true + } + + /** + * Only update the count for a given index, e.g. if value and hashCode remained the same. + */ + private def putCount(idx: Int, count: Long): Unit = + weights(idx) = count + + private def put(idx: Int, hashCode: HashCodeVal, item: T, count: Long): Unit = { + hashes(idx) = hashCode.get + items(idx) = item + weights(idx) = count + } + + /** Perform a scan for the lowest hitter (by weight). */ + private def updateLowestHitterIdx(): Int = { + _lowestHitterIdx = findIndexOfMinimum(weights) + _lowestHitterIdx + } + + /** Weight of lowest heavy hitter, if a new inserted item has a weight greater than this it is a heavy hitter. */ + def lowestHitterWeight: Long = + weights(_lowestHitterIdx) + + // do not expose we're array based + private def lowestHitterIdx: Int = + _lowestHitterIdx + + private def findEqIndex(hashes: Array[Int], searchFromIndex: Int, hashCode: Int): Int = { + var i: Int = searchFromIndex + while (i < hashes.length) { + if (hashes(i) == hashCode) return i + i += 1 + } + -1 + } + + private def findIndexOfMinimum(weights: Array[Long]): Int = { + var _lowestHitterIdx: Int = -1 + var min: Long = Long.MaxValue + var i: Int = 0 + while (i < weights.length) { + if (weights(i) < min) { + min = weights(i) + _lowestHitterIdx = i + } + i += 1 + } + _lowestHitterIdx + } + + override def toString = + s"${getClass.getSimpleName}(max:$max)" +} + +object TopHeavyHitters { + /** Value class to avoid mixing up count and hashCode in APIs. */ + private[compress] final class HashCodeVal(val get: Int) extends AnyVal { + def isEmpty = false + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index 0184920d37..1fea6b4629 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -2,17 +2,20 @@ package akka.remote.artery import java.nio.{ ByteBuffer, ByteOrder } +import akka.actor._ +import akka.remote.artery.compress.CompressionTestUtils import akka.testkit.AkkaSpec -import akka.util.ByteString +import akka.util.{ OptionVal, ByteString } class EnvelopeBufferSpec extends AkkaSpec { + import CompressionTestUtils._ - object TestCompressor extends LiteralCompressionTable { - val refToIdx = Map( - "compressable0" → 0, - "compressable1" → 1, - "reallylongcompressablestring" → 2) - val idxToRef = refToIdx.map(_.swap) + object TestCompressor extends InboundCompression with OutboundCompression { + val refToIdx: Map[ActorRef, Int] = Map( + minimalRef("compressable0") → 0, + minimalRef("compressable1") → 1, + minimalRef("reallylongcompressablestring") → 2) + val idxToRef: Map[Int, ActorRef] = refToIdx.map(_.swap) val serializerToIdx = Map( "serializer0" → 0, @@ -24,15 +27,20 @@ class EnvelopeBufferSpec extends AkkaSpec { "manifest1" → 1) val idxToManifest = manifestToIdx.map(_.swap) - override def compressActorRef(ref: String): Int = refToIdx.getOrElse(ref, -1) - override def decompressActorRef(idx: Int): String = idxToRef(idx) + override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = ??? // dynamic allocating not implemented here + override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1) + override def hitActorRef(address: Address, ref: ActorRef): Unit = () + override def decompressActorRef(idx: Int): OptionVal[ActorRef] = OptionVal.Some(idxToRef(idx)) + + override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = ??? // dynamic allocating not implemented here override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1) - override def decompressClassManifest(idx: Int): String = idxToManifest(idx) + override def hitClassManifest(address: Address, manifest: String): Unit = () + override def decompressClassManifest(idx: Int) = OptionVal.Some(idxToManifest(idx)) } "EnvelopeBuffer" must { - val headerIn = HeaderBuilder(TestCompressor) - val headerOut = HeaderBuilder(TestCompressor) + val headerIn = HeaderBuilder.bothWays(TestCompressor, TestCompressor) + val headerOut = HeaderBuilder.bothWays(TestCompressor, TestCompressor) val byteBuffer = ByteBuffer.allocate(1024).order(ByteOrder.LITTLE_ENDIAN) val envelope = new EnvelopeBuffer(byteBuffer) @@ -41,8 +49,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn.version = 1 headerIn.uid = 42 headerIn.serializer = 4 - headerIn.senderActorRef = "compressable0" - headerIn.recipientActorRef = "compressable1" + headerIn.senderActorRef = minimalRef("compressable0") + headerIn.recipientActorRef = minimalRef("compressable1") headerIn.manifest = "manifest1" envelope.writeHeader(headerIn) @@ -54,8 +62,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) headerOut.serializer should ===(4) - headerOut.senderActorRef should ===("compressable0") - headerOut.recipientActorRef should ===("compressable1") + headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/compressable0") + headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/compressable1") headerOut.manifest should ===("manifest1") } @@ -63,14 +71,14 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn.version = 1 headerIn.uid = 42 headerIn.serializer = 4 - headerIn.senderActorRef = "uncompressable0" - headerIn.recipientActorRef = "uncompressable11" + headerIn.senderActorRef = minimalRef("uncompressable0") + headerIn.recipientActorRef = minimalRef("uncompressable11") headerIn.manifest = "uncompressable3333" val expectedHeaderLength = EnvelopeBuffer.LiteralsSectionOffset + // Constant header part - 2 + headerIn.senderActorRef.length + // Length field + literal - 2 + headerIn.recipientActorRef.length + // Length field + literal + 2 + headerIn.senderActorRefPath.length + // Length field + literal + 2 + headerIn.recipientActorRefPath.length + // Length field + literal 2 + headerIn.manifest.length // Length field + literal envelope.writeHeader(headerIn) @@ -82,8 +90,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) headerOut.serializer should ===(4) - headerOut.senderActorRef should ===("uncompressable0") - headerOut.recipientActorRef should ===("uncompressable11") + headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable0") + headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable11") headerOut.manifest should ===("uncompressable3333") } @@ -91,14 +99,14 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn.version = 1 headerIn.uid = 42 headerIn.serializer = 4 - headerIn.senderActorRef = "reallylongcompressablestring" - headerIn.recipientActorRef = "uncompressable1" + headerIn.senderActorRef = minimalRef("reallylongcompressablestring") + headerIn.recipientActorRef = minimalRef("uncompressable1") headerIn.manifest = "manifest1" envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===( EnvelopeBuffer.LiteralsSectionOffset + - 2 + headerIn.recipientActorRef.length) + 2 + headerIn.recipientActorRefPath.length) envelope.byteBuffer.flip() envelope.parseHeader(headerOut) @@ -106,21 +114,21 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) headerOut.serializer should ===(4) - headerOut.senderActorRef should ===("reallylongcompressablestring") - headerOut.recipientActorRef should ===("uncompressable1") + headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") + headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable1") headerOut.manifest should ===("manifest1") headerIn.version = 3 headerIn.uid = Long.MinValue headerIn.serializer = -1 - headerIn.senderActorRef = "uncompressable0" - headerIn.recipientActorRef = "reallylongcompressablestring" + headerIn.senderActorRef = minimalRef("uncompressable0") + headerIn.recipientActorRef = minimalRef("reallylongcompressablestring") headerIn.manifest = "longlonglongliteralmanifest" envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===( EnvelopeBuffer.LiteralsSectionOffset + - 2 + headerIn.senderActorRef.length + + 2 + headerIn.senderActorRefPath.length + 2 + headerIn.manifest.length) envelope.byteBuffer.flip() @@ -129,8 +137,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(3) headerOut.uid should ===(Long.MinValue) headerOut.serializer should ===(-1) - headerOut.senderActorRef should ===("uncompressable0") - headerOut.recipientActorRef should ===("reallylongcompressablestring") + headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable0") + headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") headerOut.manifest should ===("longlonglongliteralmanifest") } @@ -140,8 +148,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn.version = 1 headerIn.uid = 42 headerIn.serializer = 4 - headerIn.senderActorRef = "reallylongcompressablestring" - headerIn.recipientActorRef = "uncompressable1" + headerIn.senderActorRef = minimalRef("reallylongcompressablestring") + headerIn.recipientActorRef = minimalRef("uncompressable1") headerIn.manifest = "manifest1" envelope.writeHeader(headerIn) @@ -153,8 +161,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) headerOut.serializer should ===(4) - headerOut.senderActorRef should ===("reallylongcompressablestring") - headerOut.recipientActorRef should ===("uncompressable1") + headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") + headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable1") headerOut.manifest should ===("manifest1") ByteString.fromByteBuffer(envelope.byteBuffer) should ===(payload) diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala index 44c79967b3..16a7605a17 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala @@ -5,7 +5,7 @@ package akka.remote.artery import scala.concurrent.duration._ -import akka.actor.{ ActorIdentity, ActorSystem, Identify } +import akka.actor._ import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.testkit.SocketUtil import akka.testkit.TestActors @@ -20,9 +20,11 @@ object HandshakeRetrySpec { akka { actor.provider = remote remote.artery.enabled = on + remote.artery.compression.enabled = on remote.artery.hostname = localhost remote.artery.port = 0 remote.handshake-timeout = 10s + } """) diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index 1b3adef27d..3dd7227cb6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -39,7 +39,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val destination = null.asInstanceOf[RemoteActorRef] // not used TestSource.probe[String] .map(msg ⇒ Send(msg, OptionVal.None, destination, None)) - .via(new OutboundHandshake(outboundContext, timeout, retryInterval, injectHandshakeInterval)) + .via(new OutboundHandshake(system, outboundContext, timeout, retryInterval, injectHandshakeInterval)) .map { case Send(msg, _, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) .run() diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 34371d7673..0035043f87 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -75,7 +75,7 @@ private[akka] class TestOutboundContext( _associationState.uniqueRemoteAddress.value match { case Some(Success(`peer`)) ⇒ // our value case _ ⇒ - _associationState = _associationState.newIncarnation(Promise.successful(peer)) + _associationState = _associationState.newIncarnation(Promise.successful(peer), NoOutboundCompression) } } @@ -96,7 +96,7 @@ private[akka] class TestOutboundContext( private[akka] class TestControlMessageSubject extends ControlMessageSubject { - private var observers = new CopyOnWriteArrayList[ControlMessageObserver] + private val observers = new CopyOnWriteArrayList[ControlMessageObserver] override def attach(observer: ControlMessageObserver): Future[Done] = { observers.add(observer) diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala new file mode 100644 index 0000000000..8d2c230ec8 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor._ +import akka.remote.artery.compress.CompressionProtocol.Events +import akka.testkit._ +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.pattern.ask + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object CompressionIntegrationSpec { + // need the port before systems are started + val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort + + val commonConfig = ConfigFactory.parseString(s""" + akka { + loglevel = INFO + + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + remote.artery.advanced { + compression.enabled = on + compression.debug = on + } + remote.artery.hostname = localhost + remote.artery.port = 0 + remote.handshake-timeout = 10s + + } + """) + + val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB") + .withFallback(commonConfig) +} + +class CompressionIntegrationSpec extends AkkaSpec(CompressionIntegrationSpec.commonConfig) + with ImplicitSender with BeforeAndAfter { + import CompressionIntegrationSpec._ + + implicit val t = Timeout(3.seconds) + var systemB: ActorSystem = null + + before { + systemB = ActorSystem("systemB", configB) + } + + "Outgoing compression table" must { + "compress chatty actor" in { + val messagesToExchange = 10 + + // listen for compression table events + val aProbe = TestProbe() + val b1Probe = TestProbe() + system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.Event]) + systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.Event]) + + def voidSel = system.actorSelection(s"artery://systemB@localhost:$portB/user/void") + systemB.actorOf(TestActors.blackholeProps, "void") + + // cause testActor-1 to become a heavy hitter + (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised + + val a1 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + info("System [A] received: " + a1) + a1.id should ===(1) + a1.key.toString should include(testActor.path.name) + } + } + + def identify(_system: String, port: Int, name: String) = { + val selection = + system.actorSelection(s"artery://${_system}@localhost:$port/user/$name") + val ActorIdentity(1, ref) = Await.result(selection ? Identify(1), 3.seconds) + ref.get + } + + after { + shutdownAllActorSystems() + } + + override def afterTermination(): Unit = + shutdownAllActorSystems() + + private def shutdownAllActorSystems(): Unit = { + if (systemB != null) shutdown(systemB) + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestUtils.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestUtils.scala new file mode 100644 index 0000000000..6214859564 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestUtils.scala @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor._ + +object CompressionTestUtils { + + def minimalRef(name: String)(implicit system: ActorSystem): ActorRef = + new MinimalActorRef { + override def provider: ActorRefProvider = system.asInstanceOf[ActorSystemImpl].provider + override def path: ActorPath = RootActorPath(provider.getDefaultAddress) / name + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala new file mode 100644 index 0000000000..9549072906 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.{ ActorIdentity, ActorSystem, Identify } +import akka.remote.artery.compress.CompressionProtocol.Events +import akka.testkit._ +import akka.util.Timeout +import akka.pattern.ask +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object HandshakeShouldDropCompressionTableSpec { + // need the port before systemB is started + val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort + + val commonConfig = ConfigFactory.parseString(s""" + akka { + loglevel = INFO + + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + remote.artery.advanced { + compression.enabled = on + compression.debug = on + } + remote.artery.hostname = localhost + remote.artery.port = 0 + remote.handshake-timeout = 10s + + } + """) + + val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB") + .withFallback(commonConfig) + +} + +class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDropCompressionTableSpec.commonConfig) + with ImplicitSender with BeforeAndAfter { + import HandshakeShouldDropCompressionTableSpec._ + + implicit val t = Timeout(3.seconds) + var systemB: ActorSystem = null + + before { + systemB = ActorSystem("systemB", configB) + } + + "Outgoing compression table" must { + "be dropped on system restart" in { + val messagesToExchange = 10 + + // listen for compression table events + val aProbe = TestProbe() + val a1Probe = TestProbe() + val aNew2Probe = TestProbe() + val b1Probe = TestProbe() + system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.Event]) + systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.Event]) + + def voidSel = system.actorSelection(s"artery://systemB@localhost:$portB/user/void") + systemB.actorOf(TestActors.blackholeProps, "void") + + // cause testActor-1 to become a heavy hitter + (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised + // give it enough time to advertise first table + val a0 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + info("System [A] received: " + a0) + a0.id should ===(1) + a0.key.toString should include(testActor.path.name) + + // cause a1Probe to become a heavy hitter (we want to not have it in the 2nd compression table later) + (1 to messagesToExchange).foreach { i ⇒ voidSel.tell("hello", a1Probe.ref) } // does not reply, but a hot receiver should be advertised + // give it enough time to advertise first table + val a1 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + info("System [A] received: " + a1) + a1.id should ===(2) + a1.key.toString should include(a1Probe.ref.path.name) + + log.warning("SHUTTING DOWN system {}...", systemB) + shutdown(systemB) + systemB = ActorSystem("systemB", configB) + Thread.sleep(5000) + log.warning("SYSTEM READY {}...", systemB) + + systemB.actorOf(TestActors.blackholeProps, "void") // start it again + (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised + // compression triggered again + val a2 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + info("System [A] received: " + a2) + a2.id should ===(1) + a2.key.toString should include(testActor.path.name) + + (1 to messagesToExchange).foreach { i ⇒ voidSel.tell("hello", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised + // compression triggered again + val a3 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + info("Received second compression: " + a3) + a3.id should ===(2) + a3.key.toString should include(aNew2Probe.ref.path.name) + } + + } + + def identify(_system: String, port: Int, name: String) = { + val selection = + system.actorSelection(s"artery://${_system}@localhost:$port/user/$name") + val ActorIdentity(1, ref) = Await.result(selection ? Identify(1), 3.seconds) + ref.get + } + + after { + shutdownAllActorSystems() + } + + override def afterTermination(): Unit = + shutdownAllActorSystems() + + private def shutdownAllActorSystems(): Unit = { + if (systemB != null) shutdown(systemB) + } +} + diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala new file mode 100644 index 0000000000..3120140b93 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import org.scalatest.{ Matchers, WordSpecLike } + +class HeavyHittersSpec extends WordSpecLike with Matchers { + + "TopHeavyHitters" must { + "should work" in { + val hitters = new TopHeavyHitters[String](3) + hitters.update("A", 10) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A")) + + hitters.update("B", 20) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A", "B")) + + hitters.update("C", 1) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A", "B", "C")) + + hitters.update("D", 100) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A", "B", "D")) + + hitters.update("E", 200) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("B", "D", "E")) + + hitters.update("BB", 22) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("BB", "D", "E")) + + hitters.update("a", 1) shouldBe false + hitters.itemsSnapshot.toSet should ===(Set("BB", "D", "E")) + } + + "correctly replace a hitter" in { + val hitters = new TopHeavyHitters[String](3) + hitters.update("A", 10) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A")) + + hitters.update("A", 12) shouldBe false + hitters.update("A", 22) shouldBe false + hitters.itemsSnapshot.toSet should ===(Set("A")) + } + + "correctly drop least heavy hitter when more than N are inserted" in { + val hitters = new TopHeavyHitters[String](3) + + hitters.update("A", 1) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A")) + + hitters.update("B", 22) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A", "B")) + + hitters.update("C", 33) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("A", "B", "C")) + hitters.lowestHitterWeight should ===(1) + + // first item which forces dropping least heavy hitter + hitters.update("D", 100) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("B", "C", "D")) + + // second item which forces dropping least heavy hitter + hitters.update("X", 999) shouldBe true + hitters.itemsSnapshot.toSet should ===(Set("X", "C", "D")) + } + + "replace the right item even when hashCodes collide" in { + case class MockHashCode(override val toString: String, override val hashCode: Int) + val hitters = new TopHeavyHitters[MockHashCode](2) + + val a1 = MockHashCode("A", 1) + val b1 = MockHashCode("B", 1) + + hitters.update(a1, 1) + hitters.itemsSnapshot.toSet should ===(Set(a1)) + hitters.lowestHitterWeight should ===(0) + + hitters.update(b1, 2) + hitters.itemsSnapshot.toSet should ===(Set(a1, b1)) + hitters.lowestHitterWeight should ===(1) + + hitters.update(a1, 10) + hitters.itemsSnapshot.toSet should ===(Set(a1, b1)) + hitters.lowestHitterWeight should ===(2) + + hitters.update(b1, 100) + hitters.itemsSnapshot.toSet should ===(Set(a1, b1)) + hitters.lowestHitterWeight should ===(10) + } + + "behave when something drops from being a hitter and comes back" in { + val hitters = new TopHeavyHitters[String](2) + hitters.update("A", 1) should ===(true) + hitters.update("B", 2) should ===(true) + hitters.update("C", 3) should ===(true) // A was dropped now + hitters.update("A", 10) should ===(true) // TODO this is technically unexpected, we have already compressed A... + } + + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/InboundCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/InboundCompressionTableSpec.scala new file mode 100644 index 0000000000..7b7d4688eb --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/InboundCompressionTableSpec.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.Address +import akka.stream.impl.ConstantFun +import akka.testkit.AkkaSpec + +class InboundCompressionTableSpec extends AkkaSpec { + + "InboundCompressionTable" must { + val NoChange: (String, Int) = null + + "invoke callback when compression triggered" in { + var p: (String, Int) = NoChange + val heavyHitters = new TopHeavyHitters[String](2) + val advertiseCompressionId = new AdvertiseCompressionId[String] { + override def apply(remoteAddress: Address, ref: String, id: Int): Unit = + p = ref → id + } + val table = new InboundCompressionTable[String](system, heavyHitters, ConstantFun.scalaIdentityFunction, advertiseCompressionId) + + table.increment(null, "A", 1L) + p should ===("A" → 0) + + table.increment(null, "B", 1L) + p should ===("B" → 1) + + p = NoChange + table.increment(null, "A", 1L) // again, yet was already compressed (A count == 2), thus no need to compress (call callback) again + p should ===(NoChange) // no change + + table.increment(null, "B", 1L) // again, yet was already compressed (B count == 2), thus no need to compress (call callback) again + p should ===(NoChange) // no change + + table.increment(null, "C", 1L) // max hitters = 2; [A=2, B=2] C=1 + p should ===(NoChange) // no change + + table.increment(null, "C", 1L) // max hitters = 2; [A=2, B=2] C=2 – causes compression of C! + p should ===(NoChange) // no change + table.increment(null, "C", 1L) // max hitters = 2; [..., C=3] – causes compression of C! + p should ===("C" → 2) // allocated + + p = NoChange + table.increment(null, "A", 1L) // again! + p should ===(NoChange) + + p = NoChange + table.increment(null, "B", 1L) // again! + p should ===(NoChange) + + // and again and again... won't be signalled again since already compressed + table.increment(null, "A", 1L) + table.increment(null, "A", 1L) + table.increment(null, "A", 1L) + p should ===(NoChange) + } + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionTableSpec.scala new file mode 100644 index 0000000000..5af8be2fbe --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionTableSpec.scala @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor._ +import akka.testkit.AkkaSpec + +class OutboundCompressionTableSpec extends AkkaSpec { + import CompressionTestUtils._ + + val remoteAddress = Address("artery", "example", "localhost", 0) + + "OutboundCompressionTable" must { + "not compress not-known values" in { + val table = new OutboundActorRefCompressionTable(system, remoteAddress) + table.compress(minimalRef("banana")) should ===(-1) + } + } + + "OutboundActorRefCompressionTable" must { + val alice = minimalRef("alice") + val bob = minimalRef("bob") + + "always compress /deadLetters" in { + val table = new OutboundActorRefCompressionTable(system, remoteAddress) + table.compress(system.deadLetters) should ===(0) + } + + "not compress unknown actor ref" in { + val table = new OutboundActorRefCompressionTable(system, remoteAddress) + table.compress(alice) should ===(-1) // not compressed + } + + "compress previously registered actor ref" in { + val table = new OutboundActorRefCompressionTable(system, remoteAddress) + table.register(alice, 1) + table.compress(alice) should ===(1) // compressed + + table.compress(bob) should ===(-1) // not compressed + } + + "fail if same id attempted to be registered twice" in { + val table = new OutboundActorRefCompressionTable(system, remoteAddress) + table.register(alice, 1) + val ex = intercept[AllocatedSameIdMultipleTimesException] { + table.register(bob, 1) + } + + ex.getMessage should include("Attempted to allocate compression id [1] second time, " + + "was already bound to value [Actor[akka://OutboundCompressionTableSpec/alice]], " + + "tried to bind to [Actor[akka://OutboundCompressionTableSpec/bob]]!") + } + + "survive compression ahead-allocation, and then fast forward allocated Ids counter when able to (compact storage)" in { + val table = new OutboundActorRefCompressionTable(system, remoteAddress) + table.register(alice, 1) + table.compressionIdAlreadyAllocated(1) should ===(true) + + table.register(bob, 3) // ahead allocated + table.compressionIdAlreadyAllocated(2) should ===(false) + table.compressionIdAlreadyAllocated(3) should ===(true) + + table.register(minimalRef("oogie-boogie"), 4) // ahead allocated (we're able to survive re-delivery of allocation messages) + table.compressionIdAlreadyAllocated(2) should ===(false) + table.compressionIdAlreadyAllocated(4) should ===(true) + + table.register(minimalRef("jack-skellington"), 2) // missing allocation was re-delivered, cause fast-forward + + table.compressionIdAlreadyAllocated(2) should ===(true) + + table.register(minimalRef("jack-sparrow"), 5) // immediate next, after fast-forward + } + + // FIXME "fast forward" concept will not exist once we use "advertise entire table", possibly remove mentions of that + // TODO cover more cases of holes in the redeliveries of advertisements + // TODO ^ to cover the fast forward logic a bit more + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala index 54a8f93bab..9d96c802a3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala @@ -34,4 +34,5 @@ private[akka] object ConstantFun { val none = (_: Any) ⇒ None val two2none = (_: Any, _: Any) ⇒ None + } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActors.scala b/akka-testkit/src/main/scala/akka/testkit/TestActors.scala index 7603a3e735..e78b933428 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActors.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActors.scala @@ -19,6 +19,15 @@ object TestActors { } } + /** + * BlackholeActor does nothing for incoming messages, its like a blackhole. + */ + class BlackholeActor extends Actor { + override def receive = { + case _ ⇒ // ignore... + } + } + /** * ForwardActor forwards all messages as-is to specified ActorRef. * @@ -31,6 +40,7 @@ object TestActors { } val echoActorProps = Props[EchoActor]() + val blackholeProps = Props[BlackholeActor]() def forwardActorProps(ref: ActorRef) = Props(classOf[ForwardActor], ref) } diff --git a/project/plugins.sbt b/project/plugins.sbt index e863d3bbdb..a8d01a6513 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -25,7 +25,9 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3") addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "0.2.2") -addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.3") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.6") + +addSbtPlugin("pl.project13.sbt" % "sbt-jol" % "0.1.1") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0-RC1")