diff --git a/akka-actor/src/main/scala/akka/util/PrettyDuration.scala b/akka-actor/src/main/scala/akka/util/PrettyDuration.scala index 9471151864..0f9694b8b9 100644 --- a/akka-actor/src/main/scala/akka/util/PrettyDuration.scala +++ b/akka-actor/src/main/scala/akka/util/PrettyDuration.scala @@ -7,7 +7,8 @@ import java.util.Locale import scala.concurrent.duration._ -object PrettyDuration { +/** INTERNAL API */ +private[akka] object PrettyDuration { /** * JAVA API diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 9a1da92bd6..0454c0e1c1 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -60,8 +60,8 @@ class CodecBenchmark { create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear() ) - val compressionOut = NoOutboundCompression - val headerIn = HeaderBuilder.in(NoopInboundCompression) + val compressionOut = NoOutboundCompressions + val headerIn = HeaderBuilder.in(NoopInboundCompressions) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) val uniqueLocalAddress = UniqueAddress( @@ -103,12 +103,12 @@ class CodecBenchmark { recipientStringB = remoteRefB.path.toSerializationFormatWithAddress(addressB) val envelope = new EnvelopeBuffer(envelopeTemplateBuffer) - headerIn.version = 1 - headerIn.uid = 42 - headerIn.serializer = 4 - headerIn.senderActorRef = actorOnSystemA - headerIn.recipientActorRef = remoteRefB - headerIn.manifest = "" + headerIn setVersion 1 + headerIn setUid 42 + headerIn setSerializer 4 + headerIn setSenderActorRef actorOnSystemA + headerIn setRecipientActorRef remoteRefB + headerIn setManifest "" envelope.writeHeader(headerIn) envelope.byteBuffer.put(payload) envelope.byteBuffer.flip() @@ -169,7 +169,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, NoopInboundCompression, envelopePool, inboundEnvelopePool)) + resolveActorRefWithLocalAddress, NoopInboundCompressions, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map { _ => @@ -210,7 +210,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, NoopInboundCompression, envelopePool, inboundEnvelopePool)) + resolveActorRefWithLocalAddress, NoopInboundCompressions, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/InvertCompressionTableBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/InvertCompressionTableBenchmark.scala new file mode 100644 index 0000000000..bc6cf2eba1 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/InvertCompressionTableBenchmark.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import java.util.concurrent.ThreadLocalRandom + +import org.openjdk.jmh.annotations._ + +@Fork(1) +@State(Scope.Benchmark) +class InvertCompressionTableBenchmark { + + /* + TODO: Possibly specialise the inversion, it's not in hot path so not doing it for now + a.r.artery.compress.CompressionTableBenchmark.invert_comp_to_decomp_1024 N/A thrpt 20 5828.963 ± 281.631 ops/s + a.r.artery.compress.CompressionTableBenchmark.invert_comp_to_decomp_256 N/A thrpt 20 29040.889 ± 345.425 ops/s + */ + + def randomName = ThreadLocalRandom.current().nextInt(1000).toString + val compTable_256 = CompressionTable(2, Map(Vector.fill[String](256)(randomName).zipWithIndex: _*)) + val compTable_1024 = CompressionTable(3, Map(Vector.fill[String](1024)(randomName).zipWithIndex: _*)) + + @Benchmark def invert_comp_to_decomp_256 = compTable_256.invert + @Benchmark def invert_comp_to_decomp_1024 = compTable_1024.invert +} diff --git a/akka-bench-jmh/src/main/scala/akka/remote/compress/OutboundCompressionTableBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/compress/OutboundCompressionTableBenchmark.scala deleted file mode 100644 index 7f7da57b01..0000000000 --- a/akka-bench-jmh/src/main/scala/akka/remote/compress/OutboundCompressionTableBenchmark.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.compress - -import java.util.Random - -import akka.actor.{ ActorSystem, Address } -import akka.event.NoLogging -import akka.remote.artery.compress.{ OutboundCompressionTable, TopHeavyHitters } -import org.openjdk.jmh.annotations._ -import org.openjdk.jmh.infra.Blackhole - -@State(Scope.Benchmark) -@BenchmarkMode(Array(Mode.Throughput)) -@Fork(2) -class OutboundCompressionTableBenchmark { - - @Param(Array("512", "8192")) - var registered: Int = 0 - - implicit val system = ActorSystem("TestSystem") - - var outgoingCompression: OutboundCompressionTable[String] = _ - - val rand = new Random(1001021) - - var preallocatedNums: Array[Long] = _ - var preallocatedStrings: Array[String] = _ - - var i = 0 - - @Setup - def init(): Unit = { - preallocatedNums = Array.ofDim(registered) - preallocatedStrings = Array.ofDim(8192) - - outgoingCompression = new OutboundCompressionTable(system, Address("akka", "remote-system")) - - var i = 0 - while (i < registered) { - outgoingCompression.register(i.toString, i) - preallocatedNums(i) = rand.nextLong() - preallocatedStrings(i) = i.toString - i += 1 - } - } - - // @Benchmark - // @BenchmarkMode(Array(Mode.SingleShotTime)) - // def registerThenCompress(): Int = { - // outgoingCompression.register("new", i) - // outgoingCompression.compress("new") - // } - - @Benchmark - def compressKnown(): Int = - outgoingCompression.compress("1") - -} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 19bf6c8e99..121a166dc6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -6,7 +6,6 @@ package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.NANOSECONDS - import scala.concurrent.duration._ import akka.actor._ import akka.remote.RemoteActorRefProvider @@ -55,6 +54,12 @@ object MaxThroughputSpec extends MultiNodeConfig { # See akka-remote-tests/src/test/resources/aeron.properties #advanced.embedded-media-driver = off #advanced.aeron-dir = "target/aeron" + + #advanced.compression { + # enabled = on + # actor-refs.enabled = on + # manifests.enabled = on + #} } } """))) diff --git a/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java b/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java index 577b8718c2..37b2baffad 100644 --- a/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java +++ b/akka-remote/src/main/java/akka/remote/artery/compress/CountMinSketch.java @@ -198,14 +198,20 @@ public class CountMinSketch { // TODO replace with Scala's Murmur3, it's much faster private static class MurmurHash { + // FIXME: This overload isn't actually ever used public static int hash(Object o) { if (o == null) { return 0; } + if (o instanceof ActorRef) { // TODO possibly scary optimisation + // ActorRef hashcode is the ActorPath#uid, which is a random number assigned at its creation, + // thus no hashing happens here - the value is already cached. + // TODO it should be thought over if this preciseness (just a random number, and not hashing) is good enough here? + return o.hashCode(); + } if (o instanceof String) { return hash(((String) o).getBytes()); } - // TODO consider calling hashCode on ActorRef here directly? It is just a random number though so possibly not as evenly distributed...? if (o instanceof Long) { return hashLong((Long) o); } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index d3425fbd81..21f1eae9a7 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -151,7 +151,11 @@ akka { # compression table once in a while), and this setting is only about the total number # of compressions within a single such table. # Must be a positive natural number. - max = 256 + max = 256 + + # interval between new table compression advertisements. + # this means the time during which we collect heavy-hitter data and then turn it into a compression table. + advertisement-interval = "1 minute" # TODO find good number as default, for benchmarks trigger immediately } manifests { enabled = off # TODO possibly remove on/off option once we have battle proven it? @@ -162,6 +166,10 @@ akka { # of compressions within a single such table. # Must be a positive natural number. max = 256 + + # interval between new table compression advertisements. + # this means the time during which we collect heavy-hitter data and then turn it into a compression table. + advertisement-interval = "1 minute" # TODO find good number as default, for benchmarks trigger immediately } } } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index a52fb00c50..981b2b5bed 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -52,7 +52,7 @@ private[akka] object MessageSerializer { def serializeForArtery(serialization: Serialization, message: AnyRef, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = { val serializer = serialization.findSerializerFor(message) - headerBuilder.serializer = serializer.identifier + headerBuilder setSerializer serializer.identifier def manifest: String = serializer match { case ser: SerializerWithStringManifest ⇒ ser.manifest(message) @@ -61,21 +61,21 @@ private[akka] object MessageSerializer { serializer match { case ser: ByteBufferSerializer ⇒ - headerBuilder.manifest = manifest + headerBuilder setManifest manifest envelope.writeHeader(headerBuilder) ser.toBinary(message, envelope.byteBuffer) case _ ⇒ - headerBuilder.manifest = manifest + headerBuilder setManifest manifest envelope.writeHeader(headerBuilder) envelope.byteBuffer.put(serializer.toBinary(message)) } } - def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder, + def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): AnyRef = { serialization.deserializeByteBuffer( envelope.byteBuffer, headerBuilder.serializer, - headerBuilder.manifest) + headerBuilder.manifest(originUid)) // FIXME currently compression will not work for manifests } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 3cb59d8e97..10417fdc2f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -32,10 +32,9 @@ import akka.remote.ThisActorSystemQuarantinedEvent import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject -import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec -import akka.remote.artery.compress.{ AdvertiseCompressionId, InboundCompressionImpl, CompressionProtocol } +import akka.remote.artery.compress.{ InboundCompressionsImpl, CompressionProtocol } import akka.stream.AbruptTerminationException import akka.stream.ActorMaterializer import akka.stream.KillSwitches @@ -203,7 +202,7 @@ private[akka] object AssociationState { incarnation = 1, uniqueRemoteAddressPromise = Promise(), quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], - outboundCompression = NoOutboundCompression) + outboundCompression = NoOutboundCompressions) final case class QuarantinedTimestamp(nanoTime: Long) { override def toString: String = @@ -218,7 +217,7 @@ private[akka] final class AssociationState( val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp], - val outboundCompression: OutboundCompression) { + val outboundCompression: OutboundCompressions) { import AssociationState.QuarantinedTimestamp @@ -244,7 +243,7 @@ private[akka] final class AssociationState( } } - def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompression): AssociationState = + def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompressions): AssociationState = new AssociationState(incarnation + 1, remoteAddressPromise, quarantined, compression) def newQuarantined(): AssociationState = @@ -254,7 +253,7 @@ private[akka] final class AssociationState( incarnation, uniqueRemoteAddressPromise, quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), - outboundCompression = NoOutboundCompression) // after quarantine no compression needed anymore, drop it + outboundCompression = NoOutboundCompressions) // after quarantine no compression needed anymore, drop it case _ ⇒ this } @@ -534,17 +533,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundStreams(): Unit = { - val noCompression = new NoInboundCompression(system) // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082 - val compression = createInboundCompressionTable(this) + val noCompressions = new NoInboundCompressions(system) // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082 + val compressions = createInboundCompressions(this) - runInboundControlStream(noCompression) - runInboundOrdinaryMessagesStream(compression) + runInboundControlStream(noCompressions) + runInboundOrdinaryMessagesStream(compressions) if (largeMessageDestinationsEnabled) { runInboundLargeMessagesStream() } } - private def runInboundControlStream(compression: InboundCompression): Unit = { + private def runInboundControlStream(compression: InboundCompressions): Unit = { val (ctrl, completed) = if (remoteSettings.TestMode) { val (mgmt, (ctrl, completed)) = @@ -584,15 +583,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R inboundEnvelope.message match { case m: CompressionMessage ⇒ m match { - case CompressionProtocol.ActorRefCompressionAdvertisement(from, ref, id) ⇒ - log.debug("Incoming ActorRef compression advertisement from [{}], allocating: [{} => {}]", from, ref, id) - association(from.address).compression.allocateActorRefCompressionId(ref, id) - system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionAdvertisement(from, ref, id)) + case CompressionProtocol.ActorRefCompressionAdvertisement(from, table) ⇒ + log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table) + association(from.address).compression.applyActorRefCompressionTable(table) + system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) - case CompressionProtocol.ClassManifestCompressionAdvertisement(from, manifest, id) ⇒ - log.debug("Incoming Class Manifest compression advertisement from [{}], allocating: [{} => {}]", from, manifest, id) - association(from.address).compression.allocateClassManifestCompressionId(manifest, id) - system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionAdvertisement(from, manifest, id)) + case CompressionProtocol.ClassManifestCompressionAdvertisement(from, table) ⇒ + log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table) + association(from.address).compression.applyClassManifestCompressionTable(table) + system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) } case _ ⇒ // not interested in non CompressionMessages } @@ -601,7 +600,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R attachStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream(compression)) } - private def runInboundOrdinaryMessagesStream(compression: InboundCompression): Unit = { + private def runInboundOrdinaryMessagesStream(compression: InboundCompressions): Unit = { val completed = if (remoteSettings.TestMode) { val (mgmt, c) = aeronSource(ordinaryStreamId, envelopePool) @@ -622,7 +621,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundLargeMessagesStream(): Unit = { - val compression = new NoInboundCompression(system) // no compression on large message stream for now + val compression = new NoInboundCompressions(system) // no compression on large message stream for now val completed = if (remoteSettings.TestMode) { @@ -738,7 +737,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) } - def outbound(outboundContext: OutboundContext, compression: OutboundCompression): Sink[Send, Future[Done]] = { + def outbound(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, Future[Done]] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(encoder(compression)) @@ -746,7 +745,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) } - def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompression): Sink[Send, Future[Done]] = { + def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, Future[Done]] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(createEncoder(largeEnvelopePool, compression)) @@ -754,7 +753,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) } - def outboundControl(outboundContext: OutboundContext, compression: OutboundCompression): Sink[Send, (OutboundControlIngress, Future[Done])] = { + def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, (OutboundControlIngress, Future[Done])] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval, @@ -767,17 +766,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } - def createEncoder(compression: OutboundCompression, bufferPool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = + def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = Flow.fromGraph(new Encoder(localAddress, system, compression, bufferPool)) - private def createInboundCompressionTable(inboundContext: InboundContext): InboundCompression = - if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionImpl(system, inboundContext) - else new NoInboundCompression(system) + private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = + if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) + else new NoInboundCompressions(system) - def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompression): Flow[Send, EnvelopeBuffer, NotUsed] = + def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[Send, EnvelopeBuffer, NotUsed] = Flow.fromGraph(new Encoder(localAddress, system, compression, pool)) - def encoder(compression: OutboundCompression): Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool, compression) + def encoder(compression: OutboundCompressions): Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool, compression) def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, @@ -788,14 +787,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R inboundEnvelopePool.release(m) } - def createDecoder(compression: InboundCompression, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = recipient ⇒ provider.resolveActorRefWithLocalAddress(recipient, localAddress.address) Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool, inboundEnvelopePool)) } - def decoder(compression: InboundCompression): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = + def decoder(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = createDecoder(compression, envelopePool) def inboundSink: Sink[InboundEnvelope, Future[Done]] = @@ -804,13 +803,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .via(new InboundQuarantineCheck(this)) .toMat(messageDispatcherSink)(Keep.right) - def inboundFlow(compression: InboundCompression): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + def inboundFlow(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { Flow[EnvelopeBuffer] .via(killSwitch.flow) .via(decoder(compression)) } - def inboundLargeFlow(compression: InboundCompression): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + def inboundLargeFlow(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { Flow[EnvelopeBuffer] .via(killSwitch.flow) .via(createDecoder(compression, largeEnvelopePool)) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 4a73807ba8..523da7ff8f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -9,8 +9,7 @@ import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.remote.artery.compress.{ OutboundCompressionImpl, CompressionProtocol } - +import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable, OutboundCompressionsImpl } import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise @@ -40,6 +39,7 @@ import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal import akka.remote.QuarantinedEvent import akka.remote.DaemonMsgCreate +import akka.remote.artery.compress.CompressionProtocol._ /** * INTERNAL API @@ -82,7 +82,7 @@ private[remote] class Association( // start sending (enqueuing) to the Association immediate after construction. /** Accesses the currently active outbound compression. */ - def compression: OutboundCompression = associationState.outboundCompression + def compression: OutboundCompressions = associationState.outboundCompression def createQueue(capacity: Int): Queue[Send] = new ManyToOneConcurrentArrayQueue[Send](capacity) @@ -283,19 +283,19 @@ private[remote] class Association( private def runOutboundStreams(): Unit = { // TODO no compression for control / large streams currently - val disableCompression = NoOutboundCompression + val disableCompression = NoOutboundCompressions // it's important to materialize the outboundControl stream first, // so that outboundControlIngress is ready when stages for all streams start runOutboundControlStream(disableCompression) - runOutboundOrdinaryMessagesStream(CurrentAssociationStateOutboundCompressionProxy) + runOutboundOrdinaryMessagesStream(CurrentAssociationStateOutboundCompressionsProxy) if (largeMessageChannelEnabled) { runOutboundLargeMessagesStream(disableCompression) } } - private def runOutboundControlStream(compression: OutboundCompression): Unit = { + private def runOutboundControlStream(compression: OutboundCompressions): Unit = { // stage in the control stream may access the outboundControlIngress before returned here // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress materializing = new CountDownLatch(1) @@ -340,7 +340,7 @@ private[remote] class Association( QueueWrapper(createQueue(capacity)) } - private def runOutboundOrdinaryMessagesStream(compression: OutboundCompression): Unit = { + private def runOutboundOrdinaryMessagesStream(compression: OutboundCompressions): Unit = { val wrapper = getOrCreateQueueWrapper(queue, queueSize) queue = wrapper // use new underlying queue immediately for restarts @@ -365,7 +365,7 @@ private[remote] class Association( attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream(compression)) } - private def runOutboundLargeMessagesStream(compression: OutboundCompression): Unit = { + private def runOutboundLargeMessagesStream(compression: OutboundCompressions): Unit = { val wrapper = getOrCreateQueueWrapper(queue, largeQueueSize) largeQueue = wrapper // use new underlying queue immediately for restarts @@ -411,25 +411,25 @@ private[remote] class Association( } // TODO: Make sure that once other channels use Compression, each gets it's own - private def createOutboundCompressionTable(remoteAddress: Address): OutboundCompression = { + private def createOutboundCompressionTable(remoteAddress: Address): OutboundCompressions = { if (transport.provider.remoteSettings.ArteryCompressionSettings.enabled) { - val compression = new OutboundCompressionImpl(transport.system, remoteAddress) + val compression = new OutboundCompressionsImpl(transport.system, remoteAddress) // FIXME should use verion number of table instead of hashCode log.info("Creating Outbound compression table ({}) to [{}]", compression.hashCode, remoteAddress) compression - } else NoOutboundCompression + } else NoOutboundCompressions } - /* + /** * This proxy uses the current associationStates compression table, which is reset for a new incarnation. * This way the same outgoing stream will switch to using the new table without the need of restarting it. */ - object CurrentAssociationStateOutboundCompressionProxy extends OutboundCompression { - override final def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = - associationState.outboundCompression.allocateActorRefCompressionId(ref, id) + private object CurrentAssociationStateOutboundCompressionsProxy extends OutboundCompressions { + override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = + associationState.outboundCompression.applyActorRefCompressionTable(table) - override final def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = - associationState.outboundCompression.allocateClassManifestCompressionId(manifest, id) + override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = + associationState.outboundCompression.applyClassManifestCompressionTable(table) override final def compressActorRef(ref: ActorRef): Int = associationState.outboundCompression.compressActorRef(ref) diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index ee32b9a187..d2c2f1883e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -7,8 +7,9 @@ package akka.remote.artery import java.nio.charset.Charset import java.nio.{ ByteBuffer, ByteOrder } -import akka.actor.{ Address, ActorRef } -import akka.remote.artery.compress.{ NoopOutboundCompression, NoopInboundCompression } +import akka.actor.{ ActorRef, Address } +import akka.remote.artery.compress.CompressionProtocol._ +import akka.remote.artery.compress.{ CompressionTable, NoopInboundCompressions, NoopOutboundCompressions } import akka.serialization.Serialization import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } import akka.util.{ OptionVal, Unsafe } @@ -56,8 +57,10 @@ private[remote] object EnvelopeBuffer { val SenderActorRefTagOffset = 16 // Int val RecipientActorRefTagOffset = 20 // Int val ClassManifestTagOffset = 24 // Int + val ActorRefCompressionTableVersionTagOffset = 28 // Int + val ClassManifestCompressionTableVersionTagOffset = 32 // Int - val LiteralsSectionOffset = 28 + val LiteralsSectionOffset = 36 val UsAscii = Charset.forName("US-ASCII") @@ -70,51 +73,70 @@ private[remote] object EnvelopeBuffer { /** * INTERNAL API * Decompress and cause compression advertisements. + * + * One per inbound message stream thus must demux by originUid to use the right tables. */ -private[remote] trait InboundCompression { - def hitActorRef(remote: Address, ref: ActorRef): Unit - def decompressActorRef(idx: Int): OptionVal[ActorRef] +private[remote] trait InboundCompressions { + def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit + def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] - def hitClassManifest(remote: Address, manifest: String): Unit - def decompressClassManifest(idx: Int): OptionVal[String] + def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit + def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] } /** * INTERNAL API * Compress outgoing data and handle compression advertisements to fill compression table. + * + * One per outgoing message stream. */ -private[remote] trait OutboundCompression { - def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit +private[remote] trait OutboundCompressions { + def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit def compressActorRef(ref: ActorRef): Int - def allocateClassManifestCompressionId(manifest: String, id: Int): Unit + def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit def compressClassManifest(manifest: String): Int } -object HeaderBuilder { +/** INTERNAL API */ +private[remote] object HeaderBuilder { // We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl // we inject no-op compression's of the "other side". - def in(compression: InboundCompression): HeaderBuilder = new HeaderBuilderImpl(compression, NoopOutboundCompression) - def out(compression: OutboundCompression): HeaderBuilder = new HeaderBuilderImpl(NoopInboundCompression, compression) + def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoopOutboundCompressions) + def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoopInboundCompressions, compression) /** INTERNAL API, FOR TESTING ONLY */ - private[remote] def bothWays(in: InboundCompression, out: OutboundCompression): HeaderBuilder = new HeaderBuilderImpl(in, out) + private[remote] def bothWays(in: InboundCompressions, out: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(in, out) } /** * INTERNAL API */ sealed trait HeaderBuilder { - def version_=(v: Int): Unit + def setVersion(v: Int): Unit def version: Int - def uid_=(u: Long): Unit + def setActorRefCompressionTableVersion(v: Int): Unit + def actorRefCompressionTableVersion: Int + + def setClassManifestCompressionTableVersion(v: Int): Unit + def classManifestCompressionTableVersion: Int + + def setUid(u: Long): Unit def uid: Long - def senderActorRef_=(ref: ActorRef): Unit - def senderActorRef: OptionVal[ActorRef] - def senderActorRefPath: String + def setSenderActorRef(ref: ActorRef): Unit + /** + * Retrive the compressed ActorRef by the compressionId carried by this header. + * Returns `None` if ActorRef was not compressed, and then the literal [[senderActorRefPath]] should be used. + */ + def senderActorRef(originUid: Long): OptionVal[ActorRef] + /** + * Retrive the raw literal actor path, instead of using the compressed value. + * Returns `None` if ActorRef was compressed (!). To obtain the path in such case call [[senderActorRef]] and extract the path from it directly. + */ + def senderActorRefPath: OptionVal[String] def setNoSender(): Unit def isNoSender: Boolean @@ -122,25 +144,35 @@ sealed trait HeaderBuilder { def setNoRecipient(): Unit def isNoRecipient: Boolean - def recipientActorRef_=(ref: ActorRef): Unit - def recipientActorRef: OptionVal[ActorRef] - def recipientActorRefPath: String + def setRecipientActorRef(ref: ActorRef): Unit + /** + * Retrive the compressed ActorRef by the compressionId carried by this header. + * Returns `None` if ActorRef was not compressed, and then the literal [[recipientActorRefPath]] should be used. + */ + def recipientActorRef(originUid: Long): OptionVal[ActorRef] + /** + * Retrive the raw literal actor path, instead of using the compressed value. + * Returns `None` if ActorRef was compressed (!). To obtain the path in such case call [[recipientActorRefPath]] and extract the path from it directly. + */ + def recipientActorRefPath: OptionVal[String] - def serializer_=(serializer: Int): Unit + def setSerializer(serializer: Int): Unit def serializer: Int - def manifest_=(manifest: String): Unit - def manifest: String + def setManifest(manifest: String): Unit + def manifest(originUid: Long): String } /** * INTERNAL API */ -private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompression, outboundCompression: OutboundCompression) extends HeaderBuilder { - var version: Int = _ - var uid: Long = _ - +private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompressions, outboundCompression: OutboundCompressions) extends HeaderBuilder { // Fields only available for EnvelopeBuffer + var _version: Int = _ + var _uid: Long = _ + var _actorRefCompressionTableVersion: Int = -1 + var _classManifestCompressionTableVersion: Int = -1 + var _senderActorRef: String = null var _senderActorRefIdx: Int = -1 var _recipientActorRef: String = null @@ -150,25 +182,33 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres var _manifest: String = null var _manifestIdx: Int = -1 - def senderActorRef_=(ref: ActorRef): Unit = { + override def setVersion(v: Int) = _version = v + override def version = _version + + override def setUid(uid: Long) = _uid = uid + override def uid: Long = _uid + + override def setActorRefCompressionTableVersion(v: Int): Unit = _actorRefCompressionTableVersion = v + override def actorRefCompressionTableVersion: Int = _actorRefCompressionTableVersion + + override def setClassManifestCompressionTableVersion(v: Int): Unit = _classManifestCompressionTableVersion = v + override def classManifestCompressionTableVersion: Int = _classManifestCompressionTableVersion + + override def setSenderActorRef(ref: ActorRef): Unit = { _senderActorRefIdx = outboundCompression.compressActorRef(ref) if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref) // includes local address from `currentTransportInformation` } - def setNoSender(): Unit = { + override def setNoSender(): Unit = { _senderActorRef = null _senderActorRefIdx = EnvelopeBuffer.DeadLettersCode } - def isNoSender: Boolean = + override def isNoSender: Boolean = (_senderActorRef eq null) && _senderActorRefIdx == EnvelopeBuffer.DeadLettersCode - def senderActorRef: OptionVal[ActorRef] = - if (_senderActorRef eq null) inboundCompression.decompressActorRef(_senderActorRefIdx) + override def senderActorRef(originUid: Long): OptionVal[ActorRef] = + if (_senderActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _senderActorRefIdx) else OptionVal.None - def senderActorRefPath: String = - if (_senderActorRef ne null) _senderActorRef - else { - _senderActorRef = inboundCompression.decompressActorRef(_senderActorRefIdx).get.path.toSerializationFormat - _senderActorRef - } + def senderActorRefPath: OptionVal[String] = + OptionVal(_senderActorRef) def setNoRecipient(): Unit = { _recipientActorRef = null @@ -177,39 +217,48 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres def isNoRecipient: Boolean = (_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode - def recipientActorRef_=(ref: ActorRef): Unit = { + def setRecipientActorRef(ref: ActorRef): Unit = { _recipientActorRefIdx = outboundCompression.compressActorRef(ref) if (_recipientActorRefIdx == -1) _recipientActorRef = ref.path.toSerializationFormat } - def recipientActorRef: OptionVal[ActorRef] = - if (_recipientActorRef eq null) inboundCompression.decompressActorRef(_recipientActorRefIdx) + def recipientActorRef(originUid: Long): OptionVal[ActorRef] = + if (_recipientActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _recipientActorRefIdx) else OptionVal.None - def recipientActorRefPath: String = - if (_recipientActorRef ne null) _recipientActorRef - else { - _recipientActorRef = inboundCompression.decompressActorRef(_recipientActorRefIdx).get.path.toSerializationFormat - _recipientActorRef - } + def recipientActorRefPath: OptionVal[String] = + OptionVal(_recipientActorRef) - override def serializer_=(serializer: Int): Unit = { + override def setSerializer(serializer: Int): Unit = { _serializer = serializer } override def serializer: Int = _serializer - override def manifest_=(manifest: String): Unit = { + override def setManifest(manifest: String): Unit = { _manifestIdx = outboundCompression.compressClassManifest(manifest) if (_manifestIdx == -1) _manifest = manifest } - override def manifest: String = { + override def manifest(originUid: Long): String = { if (_manifest ne null) _manifest else { - _manifest = inboundCompression.decompressClassManifest(_manifestIdx).get + _manifest = inboundCompression.decompressClassManifest(originUid, classManifestCompressionTableVersion, _manifestIdx).get _manifest } } - override def toString = s"HeaderBuilderImpl($version, $uid, ${_senderActorRef}, ${_senderActorRefIdx}, ${_recipientActorRef}, ${_recipientActorRefIdx}, ${_serializer}, ${_manifest}, ${_manifestIdx})" + override def toString = + "HeaderBuilderImpl(" + + version + ", " + + actorRefCompressionTableVersion + ", " + + classManifestCompressionTableVersion + ", " + + uid + ", " + + _senderActorRef + ", " + + _senderActorRefIdx + ", " + + _recipientActorRef + ", " + + _recipientActorRefIdx + ", " + + _serializer + ", " + + _manifest + ", " + + _manifestIdx + ")" + } /** @@ -231,6 +280,10 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.putLong(header.uid) byteBuffer.putInt(header.serializer) + // compression table version numbers + byteBuffer.putInt(ActorRefCompressionTableVersionTagOffset, header._actorRefCompressionTableVersion | TagTypeMask) + byteBuffer.putInt(ClassManifestCompressionTableVersionTagOffset, header._classManifestCompressionTableVersion | TagTypeMask) + // Write compressable, variable-length parts always to the actual position of the buffer // Write tag values explicitly in their proper offset byteBuffer.position(LiteralsSectionOffset) @@ -258,9 +311,19 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { val header = h.asInstanceOf[HeaderBuilderImpl] // Read fixed length parts - header.version = byteBuffer.getInt - header.uid = byteBuffer.getLong - header.serializer = byteBuffer.getInt + header setVersion byteBuffer.getInt + header setUid byteBuffer.getLong + header setSerializer byteBuffer.getInt + + // compression table versions (stored in the Tag) + val refCompressionVersionTag = byteBuffer.getInt(ActorRefCompressionTableVersionTagOffset) + if ((refCompressionVersionTag & TagTypeMask) != 0) { + header setActorRefCompressionTableVersion refCompressionVersionTag & TagValueMask + } + val manifestCompressionVersionTag = byteBuffer.getInt(ClassManifestCompressionTableVersionTagOffset) + if ((manifestCompressionVersionTag & TagTypeMask) != 0) { + header setClassManifestCompressionTableVersion manifestCompressionVersionTag & TagValueMask + } // Read compressable, variable-length parts always from the actual position of the buffer // Read tag values explicitly from their proper offset diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 61ae1368df..49d4c6bdd8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -23,7 +23,7 @@ import akka.stream.stage.TimerGraphStageLogic private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, system: ActorSystem, - compression: OutboundCompression, + compression: OutboundCompressions, bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { @@ -35,8 +35,8 @@ private[remote] class Encoder( new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { private val headerBuilder = HeaderBuilder.out(compression) - headerBuilder.version = ArteryTransport.Version - headerBuilder.uid = uniqueLocalAddress.uid + headerBuilder setVersion ArteryTransport.Version + headerBuilder setUid uniqueLocalAddress.uid private val localAddress = uniqueLocalAddress.address private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) @@ -48,7 +48,7 @@ private[remote] class Encoder( val envelope = bufferPool.acquire() // internally compression is applied by the builder: - headerBuilder.recipientActorRef = send.recipient + headerBuilder setRecipientActorRef send.recipient try { // avoiding currentTransportInformation.withValue due to thunk allocation @@ -58,7 +58,7 @@ private[remote] class Encoder( send.senderOption match { case OptionVal.None ⇒ headerBuilder.setNoSender() - case OptionVal.Some(s) ⇒ headerBuilder.senderActorRef = s + case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s } MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope) @@ -109,7 +109,7 @@ private[remote] class Decoder( inboundContext: InboundContext, system: ExtendedActorSystem, resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, - compression: InboundCompression, + compression: InboundCompressions, // TODO has to do demuxing on remote address It would seem, as decoder does not yet know bufferPool: EnvelopeBufferPool, inEnvelopePool: ObjectPool[InboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") @@ -135,23 +135,29 @@ private[remote] class Decoder( val originUid = headerBuilder.uid val association = inboundContext.association(originUid) - val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef match { - case OptionVal.Some(ref) ⇒ OptionVal(ref.asInstanceOf[InternalActorRef]) - case OptionVal.None ⇒ resolveRecipient(headerBuilder.recipientActorRefPath) + val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef(originUid) match { + case OptionVal.Some(ref) ⇒ + OptionVal(ref.asInstanceOf[InternalActorRef]) + case OptionVal.None ⇒ + // `get` on Path is safe because it surely is not a compressed value here + resolveRecipient(headerBuilder.recipientActorRefPath.get) } - val sender: InternalActorRef = headerBuilder.senderActorRef match { - case OptionVal.Some(ref) ⇒ ref.asInstanceOf[InternalActorRef] - case OptionVal.None ⇒ resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath) + val sender: InternalActorRef = headerBuilder.senderActorRef(originUid) match { + case OptionVal.Some(ref) ⇒ + ref.asInstanceOf[InternalActorRef] + case OptionVal.None ⇒ + // `get` on Path is safe because it surely is not a compressed value here + resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get) } // --- hit refs and manifests for heavy-hitter counting association match { case OptionVal.Some(assoc) ⇒ val remoteAddress = assoc.remoteAddress - compression.hitActorRef(remoteAddress, sender) - if (recipient.isDefined) compression.hitActorRef(remoteAddress, recipient.get) - compression.hitClassManifest(remoteAddress, headerBuilder.manifest) + compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender) + if (recipient.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, recipient.get) + compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, remoteAddress, headerBuilder.manifest(originUid)) case _ ⇒ // we don't want to record hits for compression while handshake is still in progress. log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") @@ -160,7 +166,7 @@ private[remote] class Decoder( try { val deserializedMessage = MessageSerializer.deserializeForArtery( - system, serialization, headerBuilder, envelope) + system, originUid, serialization, headerBuilder, envelope) val decoded = inEnvelopePool.acquire() decoded.asInstanceOf[ReusableInboundEnvelope].init( @@ -176,14 +182,14 @@ private[remote] class Decoder( // recipient for the first message that is sent to it, best effort retry scheduleOnce(RetryResolveRemoteDeployedRecipient( retryResolveRemoteDeployedRecipientAttempts, - headerBuilder.recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval) + headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE? } else push(out, decoded) } catch { case NonFatal(e) ⇒ log.warning( "Failed to deserialize message with serializer id [{}] and manifest [{}]. {}", - headerBuilder.serializer, headerBuilder.manifest, e.getMessage) + headerBuilder.serializer, headerBuilder.manifest(originUid), e.getMessage) pull(in) } finally { bufferPool.release(envelope) diff --git a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala index be70489b85..fe7502ea5e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala @@ -3,7 +3,8 @@ */ package akka.remote.artery -import akka.actor.{ Address, InternalActorRef, ActorSystem, ActorRef } +import akka.actor.{ ActorRef, ActorSystem, Address, InternalActorRef } +import akka.remote.artery.compress.CompressionTable import akka.util.OptionVal /** @@ -11,15 +12,15 @@ import akka.util.OptionVal * * Literarily, no compression! */ -final class NoInboundCompression(system: ActorSystem) extends InboundCompression { - override def hitActorRef(address: Address, ref: ActorRef): Unit = () - override def decompressActorRef(idx: Int): OptionVal[ActorRef] = +final class NoInboundCompressions(system: ActorSystem) extends InboundCompressions { + override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else if (idx == 0) OptionVal.Some(system.deadLetters) // special case deadLetters else OptionVal.None - override def hitClassManifest(address: Address, manifest: String): Unit = () - override def decompressClassManifest(idx: Int): OptionVal[String] = + override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () + override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else OptionVal.None } @@ -29,10 +30,10 @@ final class NoInboundCompression(system: ActorSystem) extends InboundCompression * * Literarily, no compression! */ -object NoOutboundCompression extends OutboundCompression { - override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = () +object NoOutboundCompressions extends OutboundCompressions { + override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = () override def compressActorRef(ref: ActorRef): Int = -1 - override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = () + override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = () override def compressClassManifest(manifest: String): Int = -1 } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressionTables.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressionTables.scala deleted file mode 100644 index 9aebf26f62..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressionTables.scala +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import akka.actor.{ Address, ActorRef, ActorSystem } -import akka.remote.artery._ -import akka.remote.artery.compress.CompressionProtocol.Events -import akka.serialization.Serialization -import akka.stream.impl.ConstantFun -import akka.util.OptionVal - -/** INTERNAL API */ -private[remote] final class OutboundCompressionImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompression { - - private val settings = CompressionSettings(system) - - private val actorRefsOut = new OutboundActorRefCompressionTable(system, remoteAddress) - - private val classManifestsOut = new OutboundCompressionTable[String](system, remoteAddress) - - // actor ref compression --- - - override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = actorRefsOut.register(ref, id) - override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) - - // class manifest compression --- - - override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) - override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = classManifestsOut.register(manifest, id) -} - -/** INTERNAL API */ -private[remote] final class InboundCompressionImpl( - system: ActorSystem, - inboundContext: InboundContext -) extends InboundCompression { - - private val settings = CompressionSettings(system) - private val log = system.log - - private val localAddress = inboundContext.localAddress - - // TODO maybe use inbound context to get remoteAddress instead? - val advertiseActorRef = new AdvertiseCompressionId[ActorRef] { - override def apply(remoteAddress: Address, ref: ActorRef, id: Int): Unit = { - - log.debug(s"Advertise ActorRef compression [$ref => $id] to [$remoteAddress]") - // TODO could use remote address via association lookup??? could be more lookups though - inboundContext.sendControl(remoteAddress, CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, ref, id)) - } - } - val advertiseManifest = new AdvertiseCompressionId[String] { - override def apply(remoteAddress: Address, man: String, id: Int): Unit = { - log.error(s"Advertise ClassManifest compression [$man => $id] to [$remoteAddress]") - inboundContext.sendControl(remoteAddress, CompressionProtocol.ClassManifestCompressionAdvertisement(localAddress, man, id)) - } - } - - private val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) - private val actorRefsIn = new InboundActorRefCompressionTable(system, actorRefHitters, advertiseActorRef) - - private val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) - private val classManifestsIn = new InboundCompressionTable[String](system, manifestHitters, ConstantFun.scalaIdentityFunction, advertiseManifest) - - // actor ref compression --- - - override def decompressActorRef(idx: Int): OptionVal[ActorRef] = { - val value = actorRefsIn.decompress(idx) - OptionVal.Some(value) - } - override def hitActorRef(address: Address, ref: ActorRef): Unit = { - actorRefsIn.increment(address, ref, 1L) - } - - // class manifest compression --- - - override def decompressClassManifest(idx: Int): OptionVal[String] = { - val value = classManifestsIn.decompress(idx) - OptionVal.Some(value) - } - override def hitClassManifest(address: Address, manifest: String): Unit = { - classManifestsIn.increment(address, manifest, 1L) - } -} - -object NoopInboundCompression extends InboundCompression { - override def hitActorRef(remote: Address, ref: ActorRef): Unit = () - override def decompressActorRef(idx: Int): OptionVal[ActorRef] = OptionVal.None - - override def hitClassManifest(remote: Address, manifest: String): Unit = () - override def decompressClassManifest(idx: Int): OptionVal[String] = OptionVal.None -} - -object NoopOutboundCompression extends OutboundCompression { - override def compressActorRef(ref: ActorRef): Int = -1 - override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = () - - override def compressClassManifest(manifest: String): Int = -1 - override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = () -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala new file mode 100644 index 0000000000..1580709a1b --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import java.util.function.LongFunction + +import akka.actor.{ ActorRef, ActorSystem, Address } +import akka.remote.artery._ +import akka.util.OptionVal +import akka.remote.artery.OutboundCompressions +import org.agrona.collections.Long2ObjectHashMap + +/** INTERNAL API */ +private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompressions { + + private val actorRefsOut = new OutboundActorRefCompression(system, remoteAddress) + private val classManifestsOut = new OutboundCompressionTable[String](system, remoteAddress) + + // actor ref compression --- + + override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) + override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = + actorRefsOut.flipTable(table) + + // class manifest compression --- + + override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) + override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = + classManifestsOut.flipTable(table) +} + +/** + * INTERNAL API + * + * One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand. + */ +private[remote] final class InboundCompressionsImpl( + system: ActorSystem, + inboundContext: InboundContext +) extends InboundCompressions { + + private val settings = CompressionSettings(system) + private val localAddress = inboundContext.localAddress + + // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers + private[this] val _actorRefsIn = new Long2ObjectHashMap[InboundActorRefCompression]() + private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { + override def apply(originUid: Long): InboundActorRefCompression = { + val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) + new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters) + } + } + private def actorRefsIn(originUid: Long): InboundActorRefCompression = + _actorRefsIn.computeIfAbsent(originUid, createInboundActorRefsForOrigin) + + private[this] val _classManifestsIn = new Long2ObjectHashMap[InboundManifestCompression]() + private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { + override def apply(originUid: Long): InboundManifestCompression = { + val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) + new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters) + } + } + private def classManifestsIn(originUid: Long): InboundManifestCompression = + _classManifestsIn.computeIfAbsent(originUid, createInboundManifestsForOrigin) + + // actor ref compression --- + + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = + actorRefsIn(originUid).decompress(tableVersion, idx) + override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef): Unit = { + actorRefsIn(originUid).increment(address, ref, 1L) + } + + // class manifest compression --- + + override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = + classManifestsIn(originUid).decompress(tableVersion, idx) + override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String): Unit = { + classManifestsIn(originUid).increment(address, manifest, 1L) + } +} + +object NoopInboundCompressions extends InboundCompressions { + override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal.None + + override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () + override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal.None +} + +object NoopOutboundCompressions extends OutboundCompressions { + override def compressActorRef(ref: ActorRef): Int = -1 + override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = () + + override def compressClassManifest(manifest: String): Int = -1 + override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = () +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala index 4dfe2763ce..5cee5f77f3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionProtocol.scala @@ -4,6 +4,7 @@ package akka.remote.artery.compress +import scala.language.existentials import akka.actor.{ ActorRef, Address } import akka.remote.UniqueAddress import akka.remote.artery.ControlMessage @@ -19,14 +20,14 @@ object CompressionProtocol { * INTERNAL API * Sent by the "receiving" node after allocating a compression id to a given [[akka.actor.ActorRef]] */ - private[remote] final case class ActorRefCompressionAdvertisement(from: UniqueAddress, ref: ActorRef, id: Int) + private[remote] final case class ActorRefCompressionAdvertisement(from: UniqueAddress, table: CompressionTable[ActorRef]) extends ControlMessage with CompressionMessage /** * INTERNAL API * Sent by the "receiving" node after allocating a compression id to a given class manifest */ - private[remote] final case class ClassManifestCompressionAdvertisement(from: UniqueAddress, manifest: String, id: Int) + private[remote] final case class ClassManifestCompressionAdvertisement(from: UniqueAddress, table: CompressionTable[String]) extends ControlMessage with CompressionMessage /** INTERNAL API */ @@ -38,7 +39,7 @@ object CompressionProtocol { final case class HeavyHitterDetected(key: Any, id: Int, count: Long) extends Event /** INTERNAL API */ - final case class ReceivedCompressionAdvertisement(from: UniqueAddress, key: Any, id: Int) extends Event + final case class ReceivedCompressionTable[T](from: UniqueAddress, table: CompressionTable[T]) extends Event } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala index 2e27d244d5..1b7e1ca6ef 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionSettings.scala @@ -3,9 +3,13 @@ */ package akka.remote.artery.compress +import java.util.concurrent.TimeUnit + import akka.actor.ActorSystem import com.typesafe.config.Config +import scala.concurrent.duration._ + /** INTERNAL API */ private[akka] class CompressionSettings(_config: Config) { val enabled = _config.getBoolean("enabled") @@ -17,12 +21,14 @@ private[akka] class CompressionSettings(_config: Config) { private val c = _config.getConfig("actor-refs") val enabled = globalEnabled && c.getBoolean("enabled") + val advertisementInterval = c.getDuration("advertisement-interval", TimeUnit.MILLISECONDS).millis val max = c.getInt("max") } object manifests { private val c = _config.getConfig("manifests") val enabled = globalEnabled && c.getBoolean("enabled") + val advertisementInterval = c.getDuration("advertisement-interval", TimeUnit.MILLISECONDS).millis val max = c.getInt("max") } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala new file mode 100644 index 0000000000..6125a2b3ea --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +/** INTERNAL API: Versioned compression table to be advertised between systems */ +private[akka] final case class CompressionTable[T](version: Long, map: Map[T, Int]) { + + def invert: DecompressionTable[T] = + if (map.isEmpty) DecompressionTable.empty[T].copy(version = version) + else { + // TODO: these are some expensive sanity checks, about the numbers being consequitive, without gaps + // TODO: we can remove them, make them re-map (not needed I believe though) + val expectedGaplessSum = Integer.valueOf((map.size * (map.size + 1)) / 2) /* Dirichlet */ + require(map.values.min == 0, "Compression table should start allocating from 0, yet lowest allocated id was " + map.values.min) + require(map.values.sum + map.size == expectedGaplessSum, "Given compression map does not seem to be gap-less and starting from zero, " + + "which makes compressing it into an Array difficult, bailing out! Map was: " + map) + + val vals = map.toList.sortBy(_._2).iterator.map(_._1) + val dtab = Array.ofDim[Object](map.size).asInstanceOf[Array[T]] + vals.copyToArray(dtab) // TODO HEAVY, AVOID COPYING AND THE MAP ETC!!! + DecompressionTable[T](version, dtab) + } +} +/** INTERNAL API */ +private[remote] object CompressionTable { + private[this] val _empty = new CompressionTable[Any](0, Map.empty) + def empty[T] = _empty.asInstanceOf[CompressionTable[T]] +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala new file mode 100644 index 0000000000..2434112982 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +/** INTERNAL API */ +private[remote] final case class DecompressionTable[T](version: Long, table: Array[T]) { + def get(idx: Int): T = table(idx) + + def invert: CompressionTable[T] = + CompressionTable(version, Map(table.zipWithIndex: _*)) + + /** Writes complete table as String (heavy operation) */ + def toDebugString = + getClass.getName + + s"(version: $version, " + + ( + if (table.length == 0) "[empty]" + else s"table: [${table.zipWithIndex.map({ case (t, i) ⇒ s"$i -> $t" }).mkString(",")}" + ) + "])" +} + +/** INTERNAL API */ +private[remote] object DecompressionTable { + private[this] val _empty = DecompressionTable(0, Array.empty) + def empty[T] = _empty.asInstanceOf[DecompressionTable[T]] +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressionTable.scala deleted file mode 100644 index bae6264695..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressionTable.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import akka.actor.{ Address, ActorRef, ActorSystem } -import akka.event.Logging - -final class InboundActorRefCompressionTable( - system: ActorSystem, - heavyHitters: TopHeavyHitters[ActorRef], - onNewHeavyHitterDetected: AdvertiseCompressionId[ActorRef] -) extends InboundCompressionTable[ActorRef](system, heavyHitters, _.path.toSerializationFormat, onNewHeavyHitterDetected) { - - preAllocate( - system.deadLetters - ) - - /* Since the table is empty here, anything we increment here becomes a heavy hitter immediately. */ - def preAllocate(allocations: ActorRef*): Unit = { - allocations foreach { case ref ⇒ increment(null, ref, 100000) } - } - - override def shouldAdvertiseCompressionId(idx: Int): Boolean = - idx > 0 // 0 is special => deadLetters - - override def decompress(idx: Int): ActorRef = - if (idx == 0) system.deadLetters - else super.decompress(idx) -} - -/** - * Handles counting and detecting of heavy-hitters and compressing them via a table lookup. - * Mutable and not thread-safe. - * - * Compression flow goes like: - * [1] on each message we add the actor path here - * [2] if it becomes a heavy hitter, we allocate an identifier for it and invoke the callback - * [3]> the callback for example then triggers an CompressionAdvertisement to the receiving side - */ -// TODO should the onHeavyHitter be inside HeavyHitters? -class InboundCompressionTable[T]( - system: ActorSystem, - heavyHitters: TopHeavyHitters[T], - convertKeyToString: T ⇒ String, - onNewHeavyHitterDetected: AdvertiseCompressionId[T]) { - require(heavyHitters != null, "heavyHitters must not be null") - - private val settings = CompressionSettings(system) - val log = Logging(system, "InboundCompressionTable") - - // TODO calibrate properly (h/w have direct relation to preciseness and max capacity) - private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt) - - @volatile private[this] var compressionAllocations = Map.empty[Int, T] // TODO replace with a specialized LongMap - private[this] var currentCompressionId = InboundCompressionTable.CompressionAllocationCounterStart - - /** - * Decompress given identifier into original String representation. - * - * @throws UnknownCompressedIdException if given id is not known, this may indicate a bug – such situation should not happen. - */ - def decompress(idx: Int): T = { - if (settings.debug) log.debug(s"Decompress [{}] => {}", idx, compressionAllocations.get(idx)) - compressionAllocations.get(idx) match { - case Some(value) ⇒ value - case None ⇒ throw new UnknownCompressedIdException(idx) - } - } - - /** - * Add `n` occurance for the given key and call `heavyHittedDetected` if element has become a heavy hitter. - * Empty keys are omitted. - */ - // TODO not so happy about passing around address here, but in incoming there's no other earlier place to get it? - def increment(remoteAddress: Address, value: T, n: Long): Unit = { - val key = convertKeyToString(value) - if (shouldIgnore(key)) { - // ignore... - } else { - // val countBefore = cms.estimateCount(key) - val count = cms.addAndEstimateCount(key, n) - // log.warning(s"HIT: increment $key + $n => ($countBefore->) $count; (addAndCheckIfheavyHitterDetected(value, count) = ${addAndCheckIfheavyHitterDetected(value, count)}); (!wasCompressedPreviously(key) = ${!wasCompressedPreviously(key)})") - - // TODO optimise order of these, what is more expensive? (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering. - if (addAndCheckIfheavyHitterDetected(value, count) && !wasCompressedPreviously(key)) { - val idx = allocateCompressedId(value) - log.debug("Allocated compression id [" + idx + "] for [" + value + "], in association with [" + remoteAddress + "]") - if (shouldAdvertiseCompressionId(idx)) { // TODO change to "time based accumulate new table => advertise it" - // TODO guard with if - log.debug(s"Inbound: Heavy hitter detected: [{} => $idx], {} hits recorded for it (confidence: {}, relative error (eps) {}).\n" + - s"All allocations: ${compressionAllocations}", key, count, cms.getConfidence, cms.getRelativeError) - onNewHeavyHitterDetected(remoteAddress, value, idx) // would be used to signal via side-channel to OutboundCompression that we want to send a ActorRefCompressionAdvertisement - } - } - } - } - - /** Some compression IDs are special and known upfront by both sides, thus need not be advertised (e.g. deadLetters => 0) */ - def shouldAdvertiseCompressionId(idx: Int): Boolean = - true // TODO this will be different in the "advertise entire table mode", it will be "once table is big enough or much time passed" - - private def shouldIgnore(key: String) = { // TODO this is hacky, if we'd do this we trigger compression too early (before association exists, so control messages fail) - key match { - case null ⇒ true - case "" ⇒ true // empty class manifest for example - case _ ⇒ key.endsWith("/system/dummy") || key.endsWith("/") // TODO dummy likely shouldn't exist? can we remove it? - } - } - - // TODO this must be optimised, we really don't want to scan the entire key-set each time to make sure - private def wasCompressedPreviously(key: String): Boolean = - compressionAllocations.values.exists(_ == key) // TODO expensive, aprox or something sneakier? - - /** Mutates heavy hitters */ - private def addAndCheckIfheavyHitterDetected(value: T, count: Long): Boolean = { - heavyHitters.update(value, count) - } - - private def allocateCompressedId(value: T): Int = { - val idx = nextCompressionId() - compressionAllocations.get(idx) match { - case Some(previouslyCompressedValue) ⇒ - // should never really happen, but let's not assume that - throw new ExistingcompressedIdReuseAttemptException(idx, previouslyCompressedValue) - - case None ⇒ - // good, the idx is not used so we can allocate it - compressionAllocations = compressionAllocations.updated(idx, value) - idx - } - } - - private def nextCompressionId(): Int = { - val id = currentCompressionId - currentCompressionId += 1 - id - } - - override def toString = - s"""${getClass.getSimpleName}(countMinSketch: $cms, heavyHitters: $heavyHitters)""" - -} - -object InboundCompressionTable { - val CompressionAllocationCounterStart = 0 - // val CompressionAllocationCounterStart = 64L // we leave 64 slots (0 counts too) for pre-allocated Akka compressions -} - -final class ExistingcompressedIdReuseAttemptException(id: Long, value: Any) - extends RuntimeException( - s"Attempted to re-allocate compressedId [$id] which is still in use for compressing [$value]! " + - s"This should never happen and is likely an implementation bug.") - -final class UnknownCompressedIdException(id: Long) - extends RuntimeException( - s"Attempted de-compress unknown id [$id]! " + - s"This could happen if this node has started a new ActorSystem bound to the same address as previously, " + - s"and previous messages from a remote system were still in flight (using an old compression table). " + - s"The remote system is expected to drop the compression table and this system will advertise a new one.") diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala new file mode 100644 index 0000000000..5464d71f72 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -0,0 +1,276 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ ActorRef, ActorSystem, Address } +import akka.event.Logging +import akka.remote.artery.{ InboundContext, OutboundContext } +import akka.stream.impl.ConstantFun +import akka.util.{ OptionVal, PrettyDuration } + +import scala.concurrent.duration.{ Duration, FiniteDuration } + +/** + * INTERNAL API + * Dedicated per remote system inbound compression table. + * + * The outbound context is available by looking it up in the association. + * It can be used to advertise a compression table. + * If the association is not complete - we simply dont advertise the table, which is fine (handshake not yet complete). + */ +private[remote] final class InboundActorRefCompression( + system: ActorSystem, + settings: CompressionSettings, + originUid: Long, + inboundContext: InboundContext, + heavyHitters: TopHeavyHitters[ActorRef] +) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters, _.path.toSerializationFormat) { + + preAllocate(system.deadLetters) + + /* Since the table is empty here, anything we increment here becomes a heavy hitter immediately. */ + def preAllocate(allocations: ActorRef*): Unit = { + allocations foreach { case ref ⇒ increment(null, ref, 100000) } + } + + override def decompress(tableId: Long, idx: Int): OptionVal[ActorRef] = + if (idx == 0) OptionVal.Some(system.deadLetters) + else super.decompress(tableId, idx) + + scheduleNextTableAdvertisement() + override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval + + def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[ActorRef]): Unit = { + log.debug(s"Advertise ActorRef compression [$table] to [${association.remoteAddress}]") + association.sendControl(CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table)) + } +} + +final class InboundManifestCompression( + system: ActorSystem, + settings: CompressionSettings, + originUid: Long, + inboundContext: InboundContext, + heavyHitters: TopHeavyHitters[String] +) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters, ConstantFun.scalaIdentityFunction) { + + scheduleNextTableAdvertisement() + override protected def tableAdvertisementInterval = settings.manifests.advertisementInterval + + override def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[String]): Unit = { + log.debug(s"Advertise ClassManifest compression [$table] to [${association.remoteAddress}]") + association.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table)) + } +} + +/** + * INTERNAL API + * Handles counting and detecting of heavy-hitters and compressing them via a table lookup. + */ +private[remote] abstract class InboundCompression[T >: Null]( + val system: ActorSystem, + val settings: CompressionSettings, + originUid: Long, + inboundContext: InboundContext, + val heavyHitters: TopHeavyHitters[T], + convertKeyToString: T ⇒ String) { // TODO avoid converting to string, in order to use the ActorRef.hashCode! + + val log = Logging(system, "InboundCompressionTable") + + // TODO atomic / state machine? the InbouncCompression could even extend ActomicReference[State]! + + // TODO NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet, + // yet we technically could already prepare table 3, then it starts using table 1 suddenly. Edge cases like that. + // SOLUTION 1: We don't start building new tables until we've seen the previous one be used (move from new to active) + // This is nice as it practically disables all the "build the table" work when the other side is not interested in using it. + // SOLUTION 2: We end up dropping messages when old table comes in (we do that anyway) + + // TODO have a marker that "advertised table XXX", so we don't generate a new-new one until the new one is in use? + + // 2 tables are used, one is "still in use", and the + @volatile private[this] var activeTable = DecompressionTable.empty[T] + @volatile private[this] var nextTable = DecompressionTable.empty[T] + + // TODO calibrate properly (h/w have direct relation to preciseness and max capacity) + private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt) + + /* ==== COMPRESSION ==== */ + + /** + * Decompress given identifier into its original representation. + * Passed in tableIds must only ever be in not-decreasing order (as old tables are dropped), + * tableIds must not have gaps. If an "old" tableId is received the value will fail to be decompressed. + * + * @throws UnknownCompressedIdException if given id is not known, this may indicate a bug – such situation should not happen. + */ + // not tailrec because we allow special casing in sub-class, however recursion is always at most 1 level deep + def decompress(tableVersion: Long, idx: Int): OptionVal[T] = { + val activeVersion = activeTable.version // TODO move into state + + if (tableVersion == -1) OptionVal.None // no compression, bail out early + else if (tableVersion == activeVersion) { + val value: T = activeTable.get(idx) + if (settings.debug) log.debug(s"Decompress [{}] => {}", idx, value) + if (value != null) OptionVal.Some[T](value) + else throw new UnknownCompressedIdException(idx) + } else if (tableVersion < activeVersion) { + log.warning("Received value compressed with old table: [{}], current table version is: [{}]", tableVersion, activeVersion) + OptionVal.None + } else if (tableVersion == nextTable.version) { + flipTables() + decompress(tableVersion, idx) // recurse, activeTable will not be able to handle this + } else { + // which means that incoming version was > nextTable.version, which likely is a bug + log.error("Inbound message is using compression table version higher than the highest allocated table on this node. " + + "This should not happen! State: activeTable: {}, nextTable, incoming tableVersion: {}", activeVersion, nextTable, tableVersion) + OptionVal.None + } + + } + + /** + * Add `n` occurance for the given key and call `heavyHittedDetected` if element has become a heavy hitter. + * Empty keys are omitted. + */ + // TODO not so happy about passing around address here, but in incoming there's no other earlier place to get it? + def increment(remoteAddress: Address, value: T, n: Long): Unit = { + val key = convertKeyToString(value) + if (shouldIgnore(key)) { + // ignore... + } else { + val count = cms.addAndEstimateCount(key, n) + + // TODO optimise order of these, what is more expensive? + // TODO (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering. + val wasHeavyHitter = addAndCheckIfheavyHitterDetected(value, count) + if (wasHeavyHitter) + log.debug(s"Heavy hitter detected: {} [count: {}]", value, count) + // if (wasHeavyHitter && !wasCompressedPreviously(key)) { + // val idx = prepareCompressionAdvertisement() + // log.debug("Allocated compression id [" + idx + "] for [" + value + "], in association with [" + remoteAddress + "]") + // } + } + } + + private def shouldIgnore(key: String) = { // TODO this is hacky, if we'd do this we trigger compression too early (before association exists, so control messages fail) + key match { + case null ⇒ true + case "" ⇒ true // empty class manifest for example + case _ ⇒ key.endsWith("/system/dummy") || key.endsWith("/") // TODO dummy likely shouldn't exist? can we remove it? + } + } + + // TODO this must be optimised, we really don't want to scan the entire key-set each time to make sure + private def wasCompressedPreviously(key: String): Boolean = { + var i = 0 + val len = activeTable.table.length + while (i < len) { + if (activeTable.table(i) == key) return true + i += 1 + } + false + } + + /** Mutates heavy hitters */ + private def addAndCheckIfheavyHitterDetected(value: T, count: Long): Boolean = { + heavyHitters.update(value, count) + } + + /* ==== TABLE ADVERTISEMENT ==== */ + + protected def tableAdvertisementInterval: Duration + + /** + * INTERNAL / TESTING API + * Used for manually triggering when a compression table should be advertised. + * Note that most likely you'd want to set the advertisment-interval to `0` when using this. + * + * TODO: Technically this would be solvable by a "triggerable" scheduler. + */ + private[remote] def triggerNextTableAdvertisement(): Unit = // TODO expose and use in tests + runNextTableAdvertisement() + + def scheduleNextTableAdvertisement(): Unit = + tableAdvertisementInterval match { + case d: FiniteDuration ⇒ + try { + system.scheduler.scheduleOnce(d, ScheduledTableAdvertisementRunnable)(system.dispatcher) + log.debug("Scheduled {} advertisement in [{}] from now...", getClass.getSimpleName, PrettyDuration.format(tableAdvertisementInterval, includeNanos = false, 1)) + } catch { + case ex: IllegalStateException ⇒ + log.warning("Unable to schedule {} advertisement, " + + "likely system is shutting down. " + + "Reason: {}", getClass.getName, ex.getMessage) + } + case _ ⇒ // ignore... + } + + private val ScheduledTableAdvertisementRunnable = new Runnable { + override def run(): Unit = + try runNextTableAdvertisement() + finally scheduleNextTableAdvertisement() + } + + /** + * Entry point to advertising a new compression table. + * + * [1] First we must *hand the new table over to the Incoming compression side on this system*, + * so it will not be used by someone else before "we" know about it in the Decoder. + * [2] Then the table must be *advertised to the remote system*, and MAY start using it immediately + * + * It must be advertised to the other side so it can start using it in its outgoing compression. + * Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing. + */ + def runNextTableAdvertisement() = { // TODO guard against re-entrancy? + inboundContext.association(originUid) match { + case OptionVal.Some(association) ⇒ + val table = prepareCompressionAdvertisement() + nextTable = table.invert // TODO expensive, check if building the other way wouldn't be faster? + advertiseCompressionTable(association, table) + + case OptionVal.None ⇒ + // otherwise it's too early, association not ready yet. + // so we don't build the table since we would not be able to send it anyway. + log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid) + } + } + + /** + * Must be implementeed by extending classes in order to send a [[akka.remote.artery.ControlMessage]] + * of apropriate type to the remote system in order to advertise the compression table to it. + */ + protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit + + /** Drop `activeTable` and start using the `nextTable` in its place. */ + private def flipTables(): Unit = { + log.debug("Swaping active decompression table to version {}.", nextTable.version) + activeTable = nextTable + nextTable = DecompressionTable.empty + // TODO we want to keep the currentTableVersion in State too, update here as well then + } + + private def prepareCompressionAdvertisement(): CompressionTable[T] = { + // TODO surely we can do better than that, optimise + CompressionTable(activeTable.version + 1, Map(heavyHitters.snapshot.filterNot(_ == null).zipWithIndex: _*)) + } + + override def toString = + s"""${getClass.getSimpleName}(countMinSketch: $cms, heavyHitters: $heavyHitters)""" + +} + +final class ExistingcompressedIdReuseAttemptException(id: Long, value: Any) + extends RuntimeException( + s"Attempted to re-allocate compressedId [$id] which is still in use for compressing [$value]! " + + s"This should never happen and is likely an implementation bug.") + +final class UnknownCompressedIdException(id: Long) + extends RuntimeException( + s"Attempted de-compress unknown id [$id]! " + + s"This could happen if this node has started a new ActorSystem bound to the same address as previously, " + + s"and previous messages from a remote system were still in flight (using an old compression table). " + + s"The remote system is expected to drop the compression table and this system will advertise a new one.") diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala new file mode 100644 index 0000000000..03224b491c --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import java.util.concurrent.atomic.AtomicReference +import java.{ util ⇒ ju } + +import akka.actor.{ ActorRef, ActorSystem, Address } +import akka.event.Logging +import akka.remote.artery.compress.OutboundCompression.OutboundCompressionState + +import scala.annotation.tailrec + +/** INTERNAL API */ +private[remote] final class OutboundActorRefCompression(system: ActorSystem, remoteAddress: Address) + extends OutboundCompressionTable[ActorRef](system, remoteAddress) { + + flipTable(CompressionTable( + version = 0, + map = Map( + system.deadLetters → 0 + ) + )) +} + +/** + * INTERNAL API + * Base class for all outgoing compression. + * Encapsulates the compressedId registration and lookup. + */ +private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAddress: Address) + extends AtomicReference[OutboundCompressionState[T]](OutboundCompressionState.initial) { // TODO could be instead via Unsafe + import OutboundCompression._ + + // TODO: The compression map may benefit from padding if we want multiple compressions to be running in parallel + + private[this] val log = Logging(system, "OutboundCompressionTable") + + /** + * Flips the currently used compression table to the new one (iff the new one has a version number higher than the currently used one). + */ + // (╯°□°)╯︵ ┻━┻ + @tailrec final def flipTable(activate: CompressionTable[T]): Unit = { + val state = get() + if (state.version < activate.version) // TODO or we could demand it to be strictly `currentVersion + 1` + if (compareAndSet(state, prepareState(activate))) + log.debug("Successfully flipped compression table to version {}, for ourgoing connection to {}", activate.version, remoteAddress) + else + flipTable(activate) // retry + else if (state.version == activate.version) + log.warning("Received duplicate compression table (version: {})! Ignoring it.", state.version) + else + log.error("Received unexpected compression table with version nr [{}]! " + + "Current version number is []") + + } + + // TODO this is crazy hot-path; optimised FastUtil-like Object->int hash map would perform better here (and avoid Integer) allocs + final def compress(value: T): Int = + get().table.getOrDefault(value, NotCompressedId) + + private final def prepareState(activate: CompressionTable[T]): OutboundCompressionState[T] = { + val size = activate.map.size + // load factor is `1` since we will never grow this table beyond the initial size, + // this way we can avoid any rehashing from happening. + val m = new ju.HashMap[T, Integer](size, 1.0f) // TODO could be replaced with primitive `int` specialized version + val it = activate.map.keysIterator + var i = 0 + while (it.hasNext) { + m.put(it.next(), i) // TODO boxing :< + i += 1 + } + OutboundCompressionState(activate.version, m) + } + + def toDebugString: String = { + s"""${Logging.simpleName(getClass)}( + | version: ${get.version} to [$remoteAddress] + | ${get.table} + |)""".stripMargin + } + + override def toString = { + val s = get + s"""${Logging.simpleName(getClass)}(to: $remoteAddress, version: ${s.version}, compressedEntries: ${s.table.size})""" + } + +} + +/** INTERNAL API */ +private[remote] object OutboundCompression { + // format: OFF + final val DeadLettersId = 0 + final val NotCompressedId = -1 + + // format: ON + + /** INTERNAL API */ + private[remote] final case class OutboundCompressionState[T](version: Long, table: ju.Map[T, Integer]) + private[remote] object OutboundCompressionState { + def initial[T] = OutboundCompressionState[T](-1, ju.Collections.emptyMap()) + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompressionTable.scala deleted file mode 100644 index 0b22effa2b..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompressionTable.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import akka.actor.{ Address, ActorRef, ActorSystem } -import akka.event.Logging - -import scala.annotation.tailrec - -final class OutboundActorRefCompressionTable(system: ActorSystem, remoteAddress: Address) - extends OutboundCompressionTable[ActorRef](system, remoteAddress) { - - preAllocate( - system.deadLetters → 0 - ) - - // if (system.toString.contains("systemB")) - // system.log.error(new Throwable, "new OutboundActorRefCompressionTable = " + this.hashCode()) - - def preAllocate(allocations: (ActorRef, Int)*): Unit = - allocations foreach { case (ref, id) ⇒ register(ref, id) } -} - -/** - * Base class for all outgoing compression. - * Encapsulates the compressedId registration and lookup. - * - * Not thread safe. - */ -class OutboundCompressionTable[T](system: ActorSystem, remoteAddress: Address) { - import OutboundCompressionTable._ - - private val settings = CompressionSettings(system) - - private val log = system.log - - // TODO can we specialize this? (tuning due here) - @volatile private[this] var backing = Map.empty[T, Int] // TODO could use unsafe to swap the map instead of volatile - - // mapping guarding - private[this] var compressionIdsAllocated = -1 - private[this] var aheadAllocatedCompressionIds = Set.empty[Int] - - def register(value: T, id: Int): Unit = { - backing.get(value) match { - case None if isNextCompressionId(id) ⇒ - log.debug("Outbound: Registering new compression from [{}] to [{}].", value, id) // TODO should be debug - addFastForwardCompressionIdsAllocatedCounter() - backing = backing.updated(value, id) - - if (settings.debug) log.debug("Outgoing: Updated compression table state: \n{}", toDebugString) // TODO debug - - case None ⇒ - // TODO could be wrong? since we can not guarantee alocations come in sequence? - if (compressionIdAlreadyAllocated(id)) - throw new AllocatedSameIdMultipleTimesException(id, backing.find(_._2 == id).get._1, value) - - aheadAllocatedCompressionIds += id - backing = backing.updated(value, id) - - case Some(existingId) ⇒ - throw new ConflictingCompressionException(value, id, existingId) - } - } - - def compressionIdAlreadyAllocated(id: Int): Boolean = - id <= compressionIdsAllocated || aheadAllocatedCompressionIds.contains(id) - - def compress(value: T): Int = { - backing.get(value) match { // TODO possibly optimise avoid the Option? Depends on used Map - case None ⇒ NotCompressedId - case Some(id) ⇒ id - } - } - - private def isNextCompressionId(id: Int): Boolean = - id == compressionIdsAllocated + 1 - - private def addFastForwardCompressionIdsAllocatedCounter(): Unit = { - @tailrec def fastForwardConsume(): Unit = { - val nextId = compressionIdsAllocated + 1 - if (aheadAllocatedCompressionIds.contains(nextId)) { - aheadAllocatedCompressionIds = aheadAllocatedCompressionIds.filterNot(_ == nextId) - compressionIdsAllocated += 1 - fastForwardConsume() - } else () - } - - compressionIdsAllocated += 1 - fastForwardConsume() - } - - def toDebugString: String = { - val pad = backing.keys.iterator.map(_.toString.length).max - s"""${Logging.simpleName(getClass)}( - | hashCode: ${this.hashCode()} to [$remoteAddress] - | compressionIdsAllocated: ${compressionIdsAllocated + 1}, - | aheadAllocatedCompressionIds: $aheadAllocatedCompressionIds) - | - | ${backing.map { case (k, v) ⇒ k.toString.padTo(pad, " ").mkString("") + " => " + v }.mkString("\n ")} - |)""".stripMargin - } - - override def toString = - s"""${Logging.simpleName(getClass)}(compressionIdsAllocated: ${compressionIdsAllocated + 1}, aheadAllocatedCompressionIds: $aheadAllocatedCompressionIds)""" -} -object OutboundCompressionTable { - // format: OFF - final val DeadLettersId = 0 - final val NotCompressedId = -1 - // format: ON -} - -final class ConflictingCompressionException(value: Any, id: Int, existingId: Int) - extends IllegalStateException( - s"Value [$value] was already given a compression id [$id], " + - s"yet new compressionId for it was given: $existingId. This could lead to inconsistencies!") - -final class AllocatedSameIdMultipleTimesException(id: Int, previousValue: Any, conflictingValue: Any) - extends IllegalStateException( - s"Attempted to allocate compression id [$id] second time, " + - s"was already bound to value [$previousValue], " + - s"tried to bind to [$conflictingValue]!") diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala index 70fb236702..4f6a9e0a15 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala @@ -30,8 +30,13 @@ private[remote] final class TopHeavyHitters[T](val max: Int) { private[this] val items: Array[T] = Array.ofDim[Object](max).asInstanceOf[Array[T]] private[this] val weights: Array[Long] = Array.ofDim(max) - /** Slow operation, mostly exposed for testing and debugging purposes, avoid using in hot paths. */ - def itemsSnapshot: immutable.Seq[T] = Util.immutableSeq(items).filter(_ != null) + // TODO think if we could get away without copy + /** Returns copy(!) of items which are currently considered to be heavy hitters. */ + def snapshot: Array[T] = { + val snap = Array.ofDim(max).asInstanceOf[Array[T]] + System.arraycopy(items, 0, snap, 0, items.length) + snap + } def toDebugString = s"""TopHeavyHitters( diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index 1fea6b4629..3aa3c64773 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -3,14 +3,14 @@ package akka.remote.artery import java.nio.{ ByteBuffer, ByteOrder } import akka.actor._ -import akka.remote.artery.compress.CompressionTestUtils +import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils } import akka.testkit.AkkaSpec -import akka.util.{ OptionVal, ByteString } +import akka.util.{ ByteString, OptionVal } class EnvelopeBufferSpec extends AkkaSpec { import CompressionTestUtils._ - object TestCompressor extends InboundCompression with OutboundCompression { + object TestCompressor extends InboundCompressions with OutboundCompressions { val refToIdx: Map[ActorRef, Int] = Map( minimalRef("compressable0") → 0, minimalRef("compressable1") → 1, @@ -27,15 +27,15 @@ class EnvelopeBufferSpec extends AkkaSpec { "manifest1" → 1) val idxToManifest = manifestToIdx.map(_.swap) - override def allocateActorRefCompressionId(ref: ActorRef, id: Int): Unit = ??? // dynamic allocating not implemented here + override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1) - override def hitActorRef(address: Address, ref: ActorRef): Unit = () - override def decompressActorRef(idx: Int): OptionVal[ActorRef] = OptionVal.Some(idxToRef(idx)) + override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx)) - override def allocateClassManifestCompressionId(manifest: String, id: Int): Unit = ??? // dynamic allocating not implemented here + override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1) - override def hitClassManifest(address: Address, manifest: String): Unit = () - override def decompressClassManifest(idx: Int) = OptionVal.Some(idxToManifest(idx)) + override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () + override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx)) } "EnvelopeBuffer" must { @@ -45,13 +45,18 @@ class EnvelopeBufferSpec extends AkkaSpec { val byteBuffer = ByteBuffer.allocate(1024).order(ByteOrder.LITTLE_ENDIAN) val envelope = new EnvelopeBuffer(byteBuffer) + val originUid = 1L + "be able to encode and decode headers with compressed literals" in { - headerIn.version = 1 - headerIn.uid = 42 - headerIn.serializer = 4 - headerIn.senderActorRef = minimalRef("compressable0") - headerIn.recipientActorRef = minimalRef("compressable1") - headerIn.manifest = "manifest1" + headerIn setVersion 1 + headerIn setUid 42 + headerIn setSerializer 4 + headerIn setActorRefCompressionTableVersion 0xCAFE + headerIn setClassManifestCompressionTableVersion 0xBABE + headerIn setRecipientActorRef minimalRef("compressable1") + headerIn setSenderActorRef minimalRef("compressable0") + + headerIn setManifest "manifest1" envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===(EnvelopeBuffer.LiteralsSectionOffset) // Fully compressed header @@ -61,25 +66,29 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) + headerOut.actorRefCompressionTableVersion should ===(0xCAFE) + headerOut.classManifestCompressionTableVersion should ===(0xBABE) headerOut.serializer should ===(4) - headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/compressable0") - headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/compressable1") - headerOut.manifest should ===("manifest1") + headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/compressable0") + headerOut.senderActorRefPath should ===(OptionVal.None) + headerOut.recipientActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/compressable1") + headerOut.recipientActorRefPath should ===(OptionVal.None) + headerOut.manifest(originUid) should ===("manifest1") } "be able to encode and decode headers with uncompressed literals" in { - headerIn.version = 1 - headerIn.uid = 42 - headerIn.serializer = 4 - headerIn.senderActorRef = minimalRef("uncompressable0") - headerIn.recipientActorRef = minimalRef("uncompressable11") - headerIn.manifest = "uncompressable3333" + headerIn setVersion 1 + headerIn setUid 42 + headerIn setSerializer 4 + headerIn setSenderActorRef minimalRef("uncompressable0") + headerIn setRecipientActorRef minimalRef("uncompressable11") + headerIn setManifest "uncompressable3333" val expectedHeaderLength = EnvelopeBuffer.LiteralsSectionOffset + // Constant header part - 2 + headerIn.senderActorRefPath.length + // Length field + literal - 2 + headerIn.recipientActorRefPath.length + // Length field + literal - 2 + headerIn.manifest.length // Length field + literal + 2 + headerIn.senderActorRefPath.get.length + // Length field + literal + 2 + headerIn.recipientActorRefPath.get.length + // Length field + literal + 2 + headerIn.manifest(originUid).length // Length field + literal envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===(expectedHeaderLength) @@ -90,23 +99,25 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) headerOut.serializer should ===(4) - headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable0") - headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable11") - headerOut.manifest should ===("uncompressable3333") + headerOut.senderActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable0")) + headerOut.senderActorRef(originUid) should ===(OptionVal.None) + headerOut.recipientActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable11")) + headerOut.recipientActorRef(originUid) should ===(OptionVal.None) + headerOut.manifest(originUid) should ===("uncompressable3333") } "be able to encode and decode headers with mixed literals" in { - headerIn.version = 1 - headerIn.uid = 42 - headerIn.serializer = 4 - headerIn.senderActorRef = minimalRef("reallylongcompressablestring") - headerIn.recipientActorRef = minimalRef("uncompressable1") - headerIn.manifest = "manifest1" + headerIn setVersion 1 + headerIn setUid 42 + headerIn setSerializer 4 + headerIn setSenderActorRef minimalRef("reallylongcompressablestring") + headerIn setRecipientActorRef minimalRef("uncompressable1") + headerIn setManifest "manifest1" envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===( EnvelopeBuffer.LiteralsSectionOffset + - 2 + headerIn.recipientActorRefPath.length) + 2 + headerIn.recipientActorRefPath.get.length) envelope.byteBuffer.flip() envelope.parseHeader(headerOut) @@ -114,22 +125,24 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) headerOut.serializer should ===(4) - headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") - headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable1") - headerOut.manifest should ===("manifest1") + headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") + headerOut.senderActorRefPath should ===(OptionVal.None) + headerOut.recipientActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable1")) + headerOut.recipientActorRef(originUid) should ===(OptionVal.None) + headerOut.manifest(originUid) should ===("manifest1") - headerIn.version = 3 - headerIn.uid = Long.MinValue - headerIn.serializer = -1 - headerIn.senderActorRef = minimalRef("uncompressable0") - headerIn.recipientActorRef = minimalRef("reallylongcompressablestring") - headerIn.manifest = "longlonglongliteralmanifest" + headerIn setVersion 3 + headerIn setUid Long.MinValue + headerIn setSerializer -1 + headerIn setSenderActorRef minimalRef("uncompressable0") + headerIn setRecipientActorRef minimalRef("reallylongcompressablestring") + headerIn setManifest "longlonglongliteralmanifest" envelope.writeHeader(headerIn) envelope.byteBuffer.position() should ===( EnvelopeBuffer.LiteralsSectionOffset + - 2 + headerIn.senderActorRefPath.length + - 2 + headerIn.manifest.length) + 2 + headerIn.senderActorRefPath.get.length + + 2 + headerIn.manifest(originUid).length) envelope.byteBuffer.flip() envelope.parseHeader(headerOut) @@ -137,20 +150,22 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(3) headerOut.uid should ===(Long.MinValue) headerOut.serializer should ===(-1) - headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable0") - headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") - headerOut.manifest should ===("longlonglongliteralmanifest") + headerOut.senderActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable0")) + headerOut.senderActorRef(originUid) should ===(OptionVal.None) + headerOut.recipientActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") + headerOut.recipientActorRefPath should ===(OptionVal.None) + headerOut.manifest(originUid) should ===("longlonglongliteralmanifest") } "be able to encode and decode headers with mixed literals and payload" in { val payload = ByteString("Hello Artery!") - headerIn.version = 1 - headerIn.uid = 42 - headerIn.serializer = 4 - headerIn.senderActorRef = minimalRef("reallylongcompressablestring") - headerIn.recipientActorRef = minimalRef("uncompressable1") - headerIn.manifest = "manifest1" + headerIn setVersion 1 + headerIn setUid 42 + headerIn setSerializer 4 + headerIn setSenderActorRef minimalRef("reallylongcompressablestring") + headerIn setRecipientActorRef minimalRef("uncompressable1") + headerIn setManifest "manifest1" envelope.writeHeader(headerIn) envelope.byteBuffer.put(payload.toByteBuffer) @@ -161,9 +176,11 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) headerOut.serializer should ===(4) - headerOut.senderActorRefPath should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") - headerOut.recipientActorRefPath should ===("akka://EnvelopeBufferSpec/uncompressable1") - headerOut.manifest should ===("manifest1") + headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") + headerOut.senderActorRefPath should ===(OptionVal.None) + headerOut.recipientActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable1")) + headerOut.recipientActorRef(originUid) should ===(OptionVal.None) + headerOut.manifest(originUid) should ===("manifest1") ByteString.fromByteBuffer(envelope.byteBuffer) should ===(payload) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 0035043f87..1d378a6ea9 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -75,7 +75,7 @@ private[akka] class TestOutboundContext( _associationState.uniqueRemoteAddress.value match { case Some(Success(`peer`)) ⇒ // our value case _ ⇒ - _associationState = _associationState.newIncarnation(Promise.successful(peer), NoOutboundCompression) + _associationState = _associationState.newIncarnation(Promise.successful(peer), NoOutboundCompressions) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index 8d2c230ec8..f312f118d1 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -33,6 +33,14 @@ object CompressionIntegrationSpec { remote.artery.port = 0 remote.handshake-timeout = 10s + remote.artery.advanced.compression { + enabled = on + actor-refs { + enabled = on + advertisement-interval = 3 seconds + } + } + } """) @@ -67,13 +75,19 @@ class CompressionIntegrationSpec extends AkkaSpec(CompressionIntegrationSpec.com // cause testActor-1 to become a heavy hitter (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised - val a1 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + val a1 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a1) - a1.id should ===(1) - a1.key.toString should include(testActor.path.name) + assertCompression[ActorRef](a1.table, 0, _ should ===(system.deadLetters)) + assertCompression[ActorRef](a1.table, 1, _ should ===(testActor)) } } + def assertCompression[T](table: CompressionTable[T], id: Int, assertion: T ⇒ Unit): Unit = { + table.map.find(_._2 == id) + .orElse { throw new AssertionError(s"No key was compressed to the id [$id]! Table was: $table") } + .foreach(i ⇒ assertion(i._1)) + } + def identify(_system: String, port: Int, name: String) = { val selection = system.actorSelection(s"artery://${_system}@localhost:$port/user/$name") diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTableSpec.scala new file mode 100644 index 0000000000..27b61c03d2 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTableSpec.scala @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.testkit.AkkaSpec + +class CompressionTableSpec extends AkkaSpec { + + "CompressionTable" must { + "should invert" in { + val decomp = CompressionTable(1, Map("0" → 0, "1" → 1, "2" → 2, "3" → 3)).invert + decomp.table should ===(Array("0", "1", "2", "3")) + } + + "enforce to start allocating from 0th index" in { + val compressionTable = CompressionTable(1, Map("1" → 1, "3" → 3)) // missing 0 is a gap too + + val ex = intercept[IllegalArgumentException] { + compressionTable.invert + } + ex.getMessage should include("Compression table should start allocating from 0, yet lowest allocated id was 1") + } + + "should not allow having gaps in compression ids (inversion would fail)" in { + val compressionTable = CompressionTable(1, Map("0" → 0, "1" → 1, "3" → 3)) // missing 0 is a gap too + + val ex = intercept[IllegalArgumentException] { + compressionTable.invert + } + ex.getMessage should include("Given compression map does not seem to be gap-less") + } + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala new file mode 100644 index 0000000000..9872374252 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +/* INTERNAL API */ +private[remote] trait CompressionTestKit { + def assertCompression[T](table: CompressionTable[T], id: Int, assertion: T ⇒ Unit): Unit = { + table.map.find(_._2 == id) + .orElse { throw new AssertionError(s"No key was compressed to the id [$id]! Table was: $table") } + .foreach(i ⇒ assertion(i._1)) + } +} + +/* INTERNAL API */ +private[remote] object CompressionTestKit extends CompressionTestKit diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala index 9549072906..433c2c5acc 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -4,7 +4,7 @@ package akka.remote.artery.compress -import akka.actor.{ ActorIdentity, ActorSystem, Identify } +import akka.actor.{ ActorIdentity, ActorRef, ActorSystem, Identify } import akka.remote.artery.compress.CompressionProtocol.Events import akka.testkit._ import akka.util.Timeout @@ -25,14 +25,17 @@ object HandshakeShouldDropCompressionTableSpec { actor.provider = "akka.remote.RemoteActorRefProvider" remote.artery.enabled = on - remote.artery.advanced { - compression.enabled = on - compression.debug = on - } remote.artery.hostname = localhost remote.artery.port = 0 remote.handshake-timeout = 10s - + + remote.artery.advanced.compression { + enabled = on + actor-refs { + enabled = on + advertisement-interval = 3 seconds + } + } } """) @@ -42,7 +45,8 @@ object HandshakeShouldDropCompressionTableSpec { } class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDropCompressionTableSpec.commonConfig) - with ImplicitSender with BeforeAndAfter { + with ImplicitSender with BeforeAndAfter + with CompressionTestKit { import HandshakeShouldDropCompressionTableSpec._ implicit val t = Timeout(3.seconds) @@ -70,18 +74,16 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr // cause testActor-1 to become a heavy hitter (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised // give it enough time to advertise first table - val a0 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + val a0 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a0) - a0.id should ===(1) - a0.key.toString should include(testActor.path.name) + assertCompression[ActorRef](a0.table, 1, _.toString should include(testActor.path.name)) // cause a1Probe to become a heavy hitter (we want to not have it in the 2nd compression table later) (1 to messagesToExchange).foreach { i ⇒ voidSel.tell("hello", a1Probe.ref) } // does not reply, but a hot receiver should be advertised // give it enough time to advertise first table - val a1 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + val a1 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a1) - a1.id should ===(2) - a1.key.toString should include(a1Probe.ref.path.name) + assertCompression[ActorRef](a1.table, 2, _.toString should include(a1Probe.ref.path.name)) log.warning("SHUTTING DOWN system {}...", systemB) shutdown(systemB) @@ -92,17 +94,15 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr systemB.actorOf(TestActors.blackholeProps, "void") // start it again (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised // compression triggered again - val a2 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + val a2 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a2) - a2.id should ===(1) - a2.key.toString should include(testActor.path.name) + assertCompression[ActorRef](a2.table, 1, _.toString should include(testActor.path.name)) (1 to messagesToExchange).foreach { i ⇒ voidSel.tell("hello", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised // compression triggered again - val a3 = aProbe.expectMsgType[Events.ReceivedCompressionAdvertisement](10.seconds) + val a3 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) info("Received second compression: " + a3) - a3.id should ===(2) - a3.key.toString should include(aNew2Probe.ref.path.name) + assertCompression[ActorRef](a3.table, 2, _.toString should include(aNew2Probe.ref.path.name)) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala index 3120140b93..647d81f318 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala @@ -12,57 +12,57 @@ class HeavyHittersSpec extends WordSpecLike with Matchers { "should work" in { val hitters = new TopHeavyHitters[String](3) hitters.update("A", 10) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A")) hitters.update("B", 20) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A", "B")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B")) hitters.update("C", 1) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A", "B", "C")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C")) hitters.update("D", 100) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A", "B", "D")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "D")) hitters.update("E", 200) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("B", "D", "E")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "D", "E")) hitters.update("BB", 22) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("BB", "D", "E")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("BB", "D", "E")) hitters.update("a", 1) shouldBe false - hitters.itemsSnapshot.toSet should ===(Set("BB", "D", "E")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("BB", "D", "E")) } "correctly replace a hitter" in { val hitters = new TopHeavyHitters[String](3) hitters.update("A", 10) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A")) hitters.update("A", 12) shouldBe false hitters.update("A", 22) shouldBe false - hitters.itemsSnapshot.toSet should ===(Set("A")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A")) } "correctly drop least heavy hitter when more than N are inserted" in { val hitters = new TopHeavyHitters[String](3) hitters.update("A", 1) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A")) hitters.update("B", 22) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A", "B")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B")) hitters.update("C", 33) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("A", "B", "C")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C")) hitters.lowestHitterWeight should ===(1) // first item which forces dropping least heavy hitter hitters.update("D", 100) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("B", "C", "D")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "C", "D")) // second item which forces dropping least heavy hitter hitters.update("X", 999) shouldBe true - hitters.itemsSnapshot.toSet should ===(Set("X", "C", "D")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("X", "C", "D")) } "replace the right item even when hashCodes collide" in { @@ -73,19 +73,19 @@ class HeavyHittersSpec extends WordSpecLike with Matchers { val b1 = MockHashCode("B", 1) hitters.update(a1, 1) - hitters.itemsSnapshot.toSet should ===(Set(a1)) + hitters.snapshot.filter(_ ne null).toSet should ===(Set(a1)) hitters.lowestHitterWeight should ===(0) hitters.update(b1, 2) - hitters.itemsSnapshot.toSet should ===(Set(a1, b1)) + hitters.snapshot.filter(_ ne null).toSet should ===(Set(a1, b1)) hitters.lowestHitterWeight should ===(1) hitters.update(a1, 10) - hitters.itemsSnapshot.toSet should ===(Set(a1, b1)) + hitters.snapshot.filter(_ ne null).toSet should ===(Set(a1, b1)) hitters.lowestHitterWeight should ===(2) hitters.update(b1, 100) - hitters.itemsSnapshot.toSet should ===(Set(a1, b1)) + hitters.snapshot.filter(_ ne null).toSet should ===(Set(a1, b1)) hitters.lowestHitterWeight should ===(10) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/InboundCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/InboundCompressionTableSpec.scala deleted file mode 100644 index 7b7d4688eb..0000000000 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/InboundCompressionTableSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import akka.actor.Address -import akka.stream.impl.ConstantFun -import akka.testkit.AkkaSpec - -class InboundCompressionTableSpec extends AkkaSpec { - - "InboundCompressionTable" must { - val NoChange: (String, Int) = null - - "invoke callback when compression triggered" in { - var p: (String, Int) = NoChange - val heavyHitters = new TopHeavyHitters[String](2) - val advertiseCompressionId = new AdvertiseCompressionId[String] { - override def apply(remoteAddress: Address, ref: String, id: Int): Unit = - p = ref → id - } - val table = new InboundCompressionTable[String](system, heavyHitters, ConstantFun.scalaIdentityFunction, advertiseCompressionId) - - table.increment(null, "A", 1L) - p should ===("A" → 0) - - table.increment(null, "B", 1L) - p should ===("B" → 1) - - p = NoChange - table.increment(null, "A", 1L) // again, yet was already compressed (A count == 2), thus no need to compress (call callback) again - p should ===(NoChange) // no change - - table.increment(null, "B", 1L) // again, yet was already compressed (B count == 2), thus no need to compress (call callback) again - p should ===(NoChange) // no change - - table.increment(null, "C", 1L) // max hitters = 2; [A=2, B=2] C=1 - p should ===(NoChange) // no change - - table.increment(null, "C", 1L) // max hitters = 2; [A=2, B=2] C=2 – causes compression of C! - p should ===(NoChange) // no change - table.increment(null, "C", 1L) // max hitters = 2; [..., C=3] – causes compression of C! - p should ===("C" → 2) // allocated - - p = NoChange - table.increment(null, "A", 1L) // again! - p should ===(NoChange) - - p = NoChange - table.increment(null, "B", 1L) // again! - p should ===(NoChange) - - // and again and again... won't be signalled again since already compressed - table.increment(null, "A", 1L) - table.increment(null, "A", 1L) - table.increment(null, "A", 1L) - p should ===(NoChange) - } - } - -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala new file mode 100644 index 0000000000..723a4c9a13 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor._ +import akka.testkit.AkkaSpec + +class OutboundCompressionSpec extends AkkaSpec { + import CompressionTestUtils._ + + val remoteAddress = Address("artery", "example", "localhost", 0) + + "OutboundCompression" must { + "not compress not-known values" in { + val table = new OutboundActorRefCompression(system, remoteAddress) + table.compress(minimalRef("banana")) should ===(-1) + } + } + + "OutboundActorRefCompression" must { + val alice = minimalRef("alice") + val bob = minimalRef("bob") + + "always compress /deadLetters" in { + val table = new OutboundActorRefCompression(system, remoteAddress) + table.compress(system.deadLetters) should ===(0) + } + + "not compress unknown actor ref" in { + val table = new OutboundActorRefCompression(system, remoteAddress) + table.compress(alice) should ===(-1) // not compressed + } + + "compress previously registered actor ref" in { + val compression = new OutboundActorRefCompression(system, remoteAddress) + val table = CompressionTable(1, Map(system.deadLetters → 0, alice → 1)) + compression.flipTable(table) + compression.compress(alice) should ===(1) // compressed + compression.compress(bob) should ===(-1) // not compressed + + val table2 = table.copy(2, map = table.map.updated(bob, 2)) + compression.flipTable(table2) + compression.compress(alice) should ===(1) // compressed + compression.compress(bob) should ===(2) // compressed + } + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionTableSpec.scala deleted file mode 100644 index 5af8be2fbe..0000000000 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionTableSpec.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import akka.actor._ -import akka.testkit.AkkaSpec - -class OutboundCompressionTableSpec extends AkkaSpec { - import CompressionTestUtils._ - - val remoteAddress = Address("artery", "example", "localhost", 0) - - "OutboundCompressionTable" must { - "not compress not-known values" in { - val table = new OutboundActorRefCompressionTable(system, remoteAddress) - table.compress(minimalRef("banana")) should ===(-1) - } - } - - "OutboundActorRefCompressionTable" must { - val alice = minimalRef("alice") - val bob = minimalRef("bob") - - "always compress /deadLetters" in { - val table = new OutboundActorRefCompressionTable(system, remoteAddress) - table.compress(system.deadLetters) should ===(0) - } - - "not compress unknown actor ref" in { - val table = new OutboundActorRefCompressionTable(system, remoteAddress) - table.compress(alice) should ===(-1) // not compressed - } - - "compress previously registered actor ref" in { - val table = new OutboundActorRefCompressionTable(system, remoteAddress) - table.register(alice, 1) - table.compress(alice) should ===(1) // compressed - - table.compress(bob) should ===(-1) // not compressed - } - - "fail if same id attempted to be registered twice" in { - val table = new OutboundActorRefCompressionTable(system, remoteAddress) - table.register(alice, 1) - val ex = intercept[AllocatedSameIdMultipleTimesException] { - table.register(bob, 1) - } - - ex.getMessage should include("Attempted to allocate compression id [1] second time, " + - "was already bound to value [Actor[akka://OutboundCompressionTableSpec/alice]], " + - "tried to bind to [Actor[akka://OutboundCompressionTableSpec/bob]]!") - } - - "survive compression ahead-allocation, and then fast forward allocated Ids counter when able to (compact storage)" in { - val table = new OutboundActorRefCompressionTable(system, remoteAddress) - table.register(alice, 1) - table.compressionIdAlreadyAllocated(1) should ===(true) - - table.register(bob, 3) // ahead allocated - table.compressionIdAlreadyAllocated(2) should ===(false) - table.compressionIdAlreadyAllocated(3) should ===(true) - - table.register(minimalRef("oogie-boogie"), 4) // ahead allocated (we're able to survive re-delivery of allocation messages) - table.compressionIdAlreadyAllocated(2) should ===(false) - table.compressionIdAlreadyAllocated(4) should ===(true) - - table.register(minimalRef("jack-skellington"), 2) // missing allocation was re-delivered, cause fast-forward - - table.compressionIdAlreadyAllocated(2) should ===(true) - - table.register(minimalRef("jack-sparrow"), 5) // immediate next, after fast-forward - } - - // FIXME "fast forward" concept will not exist once we use "advertise entire table", possibly remove mentions of that - // TODO cover more cases of holes in the redeliveries of advertisements - // TODO ^ to cover the fast forward logic a bit more - } - -}