From 0c0e3c5efd1b01d66d92a02d3ae2ecf3d52c57f2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 24 Aug 2016 19:52:07 +0200 Subject: [PATCH] Refactoring of outbound compression, #21210 * outbound compression is now immutable, by simply using CompressionTable[ActorRef] and CompressionTable[String] * immutable outbound compression will make it possible to use them from multiple Encoder instances, when we add several lanes for parallel serialization * outbound compression tables not shared via AssociationState * the advertised tables are sent to the Encoder stage via async callback, no need to reference the tables in other places than the Encoder stage, no more races via shared mutable state * when outbound stream is started or restarted it can start out without compression, until next advertisement is received * ensure outbound compression is cleared before handshake is signaled complete --- .../akka/remote/artery/CodecBenchmark.scala | 13 +- .../compress/HeavyHittersBenchmark.scala | 2 +- .../akka/remote/artery/ArteryTransport.scala | 125 ++++++------ .../akka/remote/artery/Association.scala | 178 +++++++++--------- .../scala/akka/remote/artery/BufferPool.scala | 76 +++++--- .../scala/akka/remote/artery/Codecs.scala | 110 ++++++++--- .../scala/akka/remote/artery/Handshake.scala | 44 +++-- .../artery/compress/ActualCompressions.scala | 98 ---------- .../artery/compress/AllCompressions.scala | 39 ---- .../artery/compress/CompressionTable.scala | 9 + .../artery/compress/InboundCompressions.scala | 91 ++++++++- .../NoInboundCompressions.scala} | 21 +-- .../OutboundActorRefCompression.scala | 114 ----------- .../remote/artery/EnvelopeBufferSpec.scala | 30 ++- .../akka/remote/artery/TestContext.scala | 15 +- .../compress/OutboundCompressionSpec.scala | 29 +-- 16 files changed, 454 insertions(+), 540 deletions(-) delete mode 100644 akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala delete mode 100644 akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala rename akka-remote/src/main/scala/akka/remote/artery/{NoLiteralCompression.scala => compress/NoInboundCompressions.scala} (51%) delete mode 100644 akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index f08d5f87af..98974a245b 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -29,6 +29,8 @@ import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ import akka.util.OptionVal import akka.actor.Address +import scala.concurrent.Future +import akka.Done @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -57,7 +59,6 @@ class CodecBenchmark { private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) - val compressionOut = NoOutboundCompressions val headerIn = HeaderBuilder.in(NoInboundCompressions) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) @@ -73,7 +74,7 @@ class CodecBenchmark { // the following methods are not used by in this test override def sendControl(to: Address, message: ControlMessage): Unit = ??? override def association(remoteAddress: Address): OutboundContext = ??? - override def completeHandshake(peer: UniqueAddress): Unit = ??? + override def completeHandshake(peer: UniqueAddress): Future[Done] = ??? } private var materializer: ActorMaterializer = _ @@ -136,8 +137,8 @@ class CodecBenchmark { val latch = new CountDownLatch(1) val N = 100000 - val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, outboundEnvelopePool, envelopePool)) + val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, outboundEnvelopePool, envelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) @@ -193,8 +194,8 @@ class CodecBenchmark { val latch = new CountDownLatch(1) val N = 100000 - val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, outboundEnvelopePool, envelopePool)) + val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, outboundEnvelopePool, envelopePool)) val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) val provider = RARP(system).provider diff --git a/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala index 4297dbb171..b6e14f9465 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala @@ -49,7 +49,7 @@ class HeavyHittersBenchmark { @Param(Array("8192")) var n: Int = 0 - var topN: TopHeavyHitters[String] = _ + private var topN: TopHeavyHitters[String] = _ val rand = new Random(1001021) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 6131cb2cc6..3c4c10d5ec 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -3,22 +3,30 @@ */ package akka.remote.artery +import java.io.File +import java.net.InetSocketAddress +import java.nio.channels.{ DatagramChannel, FileChannel } + import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.TimeUnit.MICROSECONDS -import akka.remote.artery.compress.CompressionProtocol.CompressionMessage - +import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try +import scala.util.control.NonFatal + import akka.Done import akka.NotUsed import akka.actor._ +import akka.actor.Actor import akka.actor.Cancellable +import akka.actor.Props import akka.event.Logging import akka.event.LoggingAdapter import akka.remote.AddressUidExtension @@ -30,13 +38,17 @@ import akka.remote.RemoteTransport import akka.remote.RemotingLifecycleEvent import akka.remote.ThisActorSystemQuarantinedEvent import akka.remote.UniqueAddress +import akka.remote.artery.Encoder.ChangeOutboundCompression import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress +import akka.remote.artery.compress._ +import akka.remote.artery.compress.CompressionProtocol.CompressionMessage import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec -import akka.remote.artery.compress._ import akka.stream.AbruptTerminationException import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings import akka.stream.KillSwitches import akka.stream.Materializer import akka.stream.SharedKillSwitch @@ -46,35 +58,19 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.util.Helpers.ConfigOps import akka.util.Helpers.Requiring +import akka.util.OptionVal import akka.util.WildcardIndex import io.aeron.Aeron import io.aeron.AvailableImageHandler +import io.aeron.CncFileDescriptor import io.aeron.Image import io.aeron.UnavailableImageHandler import io.aeron.driver.MediaDriver +import io.aeron.driver.ThreadingMode import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil -import java.io.File -import java.net.InetSocketAddress -import java.nio.channels.{ DatagramChannel, FileChannel } - -import akka.remote.artery.OutboundControlJunction.OutboundControlIngress -import io.aeron.CncFileDescriptor -import java.util.concurrent.atomic.AtomicLong - -import scala.collection.JavaConverters._ -import akka.stream.ActorMaterializerSettings - -import scala.annotation.tailrec -import akka.util.OptionVal -import io.aeron.driver.ThreadingMode import org.agrona.concurrent.BackoffIdleStrategy -import org.agrona.concurrent.BusySpinIdleStrategy - -import scala.util.control.NonFatal -import akka.actor.Props -import akka.actor.Actor /** * INTERNAL API @@ -105,7 +101,7 @@ private[akka] trait InboundContext { */ def association(uid: Long): OptionVal[OutboundContext] - def completeHandshake(peer: UniqueAddress): Unit + def completeHandshake(peer: UniqueAddress): Future[Done] } @@ -117,8 +113,7 @@ private[akka] object AssociationState { new AssociationState( incarnation = 1, uniqueRemoteAddressPromise = Promise(), - quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], - outboundCompressions = NoOutboundCompressions) + quarantined = ImmutableLongMap.empty[QuarantinedTimestamp]) final case class QuarantinedTimestamp(nanoTime: Long) { override def toString: String = @@ -130,10 +125,9 @@ private[akka] object AssociationState { * INTERNAL API */ private[akka] final class AssociationState( - val incarnation: Int, + val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp], - val outboundCompressions: OutboundCompressions) { + val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { import AssociationState.QuarantinedTimestamp @@ -159,11 +153,8 @@ private[akka] final class AssociationState( } } - def withCompression(compression: OutboundCompressions) = - new AssociationState(incarnation, uniqueRemoteAddressPromise, quarantined, compression) - - def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompressions): AssociationState = - new AssociationState(incarnation + 1, remoteAddressPromise, quarantined, compression) + def newIncarnation(remoteAddressPromise: Promise[UniqueAddress]): AssociationState = + new AssociationState(incarnation + 1, remoteAddressPromise, quarantined) def newQuarantined(): AssociationState = uniqueRemoteAddressPromise.future.value match { @@ -171,8 +162,7 @@ private[akka] final class AssociationState( new AssociationState( incarnation, uniqueRemoteAddressPromise, - quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), - outboundCompressions = NoOutboundCompressions) // after quarantine no compression needed anymore, drop it + quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime()))) case _ ⇒ this } @@ -235,7 +225,7 @@ private[akka] trait OutboundContext { */ private[remote] object FlushOnShutdown { def props(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]): Props = { + inboundContext: InboundContext, associations: Set[Association]): Props = { require(associations.nonEmpty) Props(new FlushOnShutdown(done, timeout, inboundContext, associations)) } @@ -247,7 +237,7 @@ private[remote] object FlushOnShutdown { * INTERNAL API */ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]) extends Actor { + inboundContext: InboundContext, associations: Set[Association]) extends Actor { var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue) @@ -330,7 +320,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val priorityMessageDestinations = WildcardIndex[NotUsed]() - // this comes from remoting so is semi-ok to be hardcoded here + // These destinations are not defined in configuration because it should not + // be possible to abuse the control channel .insert(Array("system", "remote-watcher"), NotUsed) // these belongs to cluster and should come from there .insert(Array("system", "cluster", "core", "daemon", "heartbeatSender"), NotUsed) @@ -579,17 +570,27 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R case ActorRefCompressionAdvertisement(from, table) ⇒ log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table) val a = association(from.address) - a.outboundCompression.applyActorRefCompressionTable(table) - a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version)) - system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table)) + // make sure uid is same for active association + if (a.associationState.uniqueRemoteAddressValue().contains(from)) { + import system.dispatcher + a.changeActorRefCompression(table).foreach { _ ⇒ + a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version)) + system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table)) + } + } case ActorRefCompressionAdvertisementAck(from, tableVersion) ⇒ inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) case ClassManifestCompressionAdvertisement(from, table) ⇒ log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table) val a = association(from.address) - a.outboundCompression.applyClassManifestCompressionTable(table) - a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version)) - system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table)) + // make sure uid is same for active association + if (a.associationState.uniqueRemoteAddressValue().contains(from)) { + import system.dispatcher + a.changeClassManifestCompression(table).foreach { _ ⇒ + a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version)) + system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table)) + } + } case ClassManifestCompressionAdvertisementAck(from, tableVersion) ⇒ inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) } @@ -719,8 +720,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R if (testStages.isEmpty) Future.successful(false) else { - import scala.collection.JavaConverters._ import system.dispatcher + import scala.collection.JavaConverters._ val allTestStages = testStages.asScala.toVector ++ associationRegistry.allAssociations.flatMap(_.testStages) Future.sequence(allTestStages.map(_.send(cmd))).map(_ ⇒ true) } @@ -752,7 +753,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def association(uid: Long): OptionVal[Association] = associationRegistry.association(uid) - override def completeHandshake(peer: UniqueAddress): Unit = { + override def completeHandshake(peer: UniqueAddress): Future[Done] = { val a = associationRegistry.setUID(peer) a.completeHandshake(peer) } @@ -765,19 +766,22 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) } - def outbound(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = - createOutboundSink(ordinaryStreamId, outboundContext, compression, envelopePool) + def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = + createOutboundSink(ordinaryStreamId, outboundContext, envelopePool) - def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = - createOutboundSink(largeStreamId, outboundContext, compression, largeEnvelopePool) + def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] = + createOutboundSink(largeStreamId, outboundContext, largeEnvelopePool) + .mapMaterializedValue { case (_, d) ⇒ d } + + private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, + bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { - private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, Future[Done]] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) - .via(createEncoder(bufferPool, compression)) + .viaMat(createEncoder(bufferPool))(Keep.right) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, - envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) + envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.both) } /** @@ -794,25 +798,20 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } - def outboundControlPart2(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { + def outboundControlPart2(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { Flow[OutboundEnvelope] .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) - .via(encoder(compression)) + .via(createEncoder(envelopePool)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) } - def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool)) - private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) else NoInboundCompressions - def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool)) - - def encoder(compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool, compression) + def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = + Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool)) def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 6c3a29e908..82dc5bf01f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -4,41 +4,43 @@ package akka.remote.artery import java.util.Queue -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable, OutboundCompressions, OutboundCompressionsImpl } - import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration -import scala.util.Success + import akka.{ Done, NotUsed } import akka.actor.ActorRef import akka.actor.ActorSelectionMessage import akka.actor.Address -import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging import akka.remote._ +import akka.remote.DaemonMsgCreate +import akka.remote.QuarantinedEvent import akka.remote.artery.AeronSink.GaveUpSendingException +import akka.remote.artery.Encoder.ChangeOutboundCompression +import akka.remote.artery.Encoder.ChangeOutboundCompressionFailed import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException import akka.remote.artery.SendQueue.ProducerApi import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery +import akka.remote.artery.compress.CompressionProtocol._ +import akka.remote.artery.compress.CompressionTable import akka.stream.AbruptTerminationException import akka.stream.Materializer import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.util.{ Unsafe, WildcardIndex } -import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.remote.artery.compress.CompressionProtocol._ /** @@ -82,9 +84,6 @@ private[remote] class Association( // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to // start sending (enqueuing) to the Association immediate after construction. - /** Accesses the currently active outbound compression. */ - def outboundCompression: OutboundCompressions = associationState.outboundCompressions - def createQueue(capacity: Int): Queue[OutboundEnvelope] = new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) @@ -93,6 +92,25 @@ private[remote] class Association( @volatile private[this] var controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(controlQueueSize)) @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ @volatile private[this] var materializing = new CountDownLatch(1) + @volatile private[this] var changeOutboundCompression: Option[ChangeOutboundCompression] = None + + def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = + changeOutboundCompression match { + case Some(c) ⇒ c.changeActorRefCompression(table) + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + } + + def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = + changeOutboundCompression match { + case Some(c) ⇒ c.changeClassManifestCompression(table) + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + } + + def clearCompression(): Future[Done] = + changeOutboundCompression match { + case Some(c) ⇒ c.clearCompression() + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + } private val _testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList @@ -138,31 +156,41 @@ private[remote] class Association( def associationState: AssociationState = Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState] - def completeHandshake(peer: UniqueAddress): Unit = { + def completeHandshake(peer: UniqueAddress): Future[Done] = { require( remoteAddress == peer.address, s"wrong remote address in completeHandshake, got ${peer.address}, expected $remoteAddress") val current = associationState - current.uniqueRemoteAddressPromise.trySuccess(peer) + current.uniqueRemoteAddressValue() match { case Some(`peer`) ⇒ - // our value - if (current.outboundCompressions == NoOutboundCompressions) { - // enable outbound compression (here, since earlier we don't know the remote address) - swapState(current, current.withCompression(createOutboundCompressions(remoteAddress))) - } + // handshake already completed + Future.successful(Done) case _ ⇒ - val newState = current.newIncarnation(Promise.successful(peer), createOutboundCompressions(remoteAddress)) - if (swapState(current, newState)) { + // clear outbound compression, it's safe to do that several times if someone else + // completes handshake at same time, but it's important to clear it before + // we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess) + import transport.system.dispatcher + clearCompression().map { _ ⇒ + current.uniqueRemoteAddressPromise.trySuccess(peer) current.uniqueRemoteAddressValue() match { - case Some(old) ⇒ - log.debug( - "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", - newState.incarnation, peer.address, peer.uid, old.uid) - case None ⇒ - // Failed, nothing to do + case Some(`peer`) ⇒ + // our value + case _ ⇒ + val newState = current.newIncarnation(Promise.successful(peer)) + if (swapState(current, newState)) { + current.uniqueRemoteAddressValue() match { + case Some(old) ⇒ + log.debug( + "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", + newState.incarnation, peer.address, peer.uid, old.uid) + case None ⇒ + // Failed, nothing to do + } + // if swap failed someone else completed before us, and that is fine + } } - // if swap failed someone else completed before us, and that is fine + Done } } } @@ -253,6 +281,8 @@ private[remote] class Association( log.warning( "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", remoteAddress, u, reason) + // clear outbound compression + clearCompression() // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644 transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt)) // end delivery of system messages to that incarnation after this point @@ -291,20 +321,16 @@ private[remote] class Association( } private def runOutboundStreams(): Unit = { - // TODO no compression for control / large streams currently - val disableCompression = NoOutboundCompressions - // it's important to materialize the outboundControl stream first, // so that outboundControlIngress is ready when stages for all streams start - runOutboundControlStream(disableCompression) - runOutboundOrdinaryMessagesStream(CurrentAssociationStateOutboundCompressionsProxy) + runOutboundControlStream() + runOutboundOrdinaryMessagesStream() - if (transport.largeMessageChannelEnabled) { - runOutboundLargeMessagesStream(disableCompression) - } + if (transport.largeMessageChannelEnabled) + runOutboundLargeMessagesStream() } - private def runOutboundControlStream(compression: OutboundCompressions): Unit = { + private def runOutboundControlStream(): Unit = { // stage in the control stream may access the outboundControlIngress before returned here // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress materializing = new CountDownLatch(1) @@ -318,14 +344,14 @@ private[remote] class Association( Source.fromGraph(new SendQueue[OutboundEnvelope]) .via(transport.outboundControlPart1(this)) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundControlPart2(this, compression))(Keep.both) + .toMat(transport.outboundControlPart2(this))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, (control, completed)) } else { Source.fromGraph(new SendQueue[OutboundEnvelope]) .via(transport.outboundControlPart1(this)) - .toMat(transport.outboundControlPart2(this, compression))(Keep.both) + .toMat(transport.outboundControlPart2(this))(Keep.both) .run()(materializer) } @@ -335,7 +361,7 @@ private[remote] class Association( _outboundControlIngress = control materializing.countDown() attachStreamRestart("Outbound control stream", completed, cause ⇒ { - runOutboundControlStream(compression) + runOutboundControlStream() cause match { case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID case _ ⇒ quarantine("Outbound control stream restarted") @@ -351,32 +377,33 @@ private[remote] class Association( QueueWrapper(createQueue(capacity)) } - private def runOutboundOrdinaryMessagesStream(compression: OutboundCompressions): Unit = { + private def runOutboundOrdinaryMessagesStream(): Unit = { val wrapper = getOrCreateQueueWrapper(queue, queueSize) queue = wrapper // use new underlying queue immediately for restarts - val (queueValue, completed) = + val (queueValue, (changeCompression, completed)) = if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outbound(this, compression))(Keep.both) + .toMat(transport.outbound(this))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { Source.fromGraph(new SendQueue[OutboundEnvelope]) - .toMat(transport.outbound(this, compression))(Keep.both) + .toMat(transport.outbound(this))(Keep.both) .run()(materializer) } queueValue.inject(wrapper.queue) // replace with the materialized value, still same underlying queue queue = queueValue + changeOutboundCompression = Some(changeCompression) - attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream(compression)) + attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) } - private def runOutboundLargeMessagesStream(compression: OutboundCompressions): Unit = { + private def runOutboundLargeMessagesStream(): Unit = { val wrapper = getOrCreateQueueWrapper(largeQueue, largeQueueSize) largeQueue = wrapper // use new underlying queue immediately for restarts @@ -384,20 +411,20 @@ private[remote] class Association( if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundLarge(this, compression))(Keep.both) + .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { Source.fromGraph(new SendQueue[OutboundEnvelope]) - .toMat(transport.outboundLarge(this, compression))(Keep.both) + .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) } queueValue.inject(wrapper.queue) // replace with the materialized value, still same underlying queue largeQueue = queueValue - attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream(compression)) + attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream()) } private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable ⇒ Unit): Unit = { @@ -421,41 +448,6 @@ private[remote] class Association( } } - // TODO: Make sure that once other channels use Compression, each gets it's own - private def createOutboundCompressions(remoteAddress: Address): OutboundCompressions = { - if (transport.provider.remoteSettings.ArteryCompressionSettings.enabled) { - val compression = new OutboundCompressionsImpl(transport.system, remoteAddress) - log.debug("Creating Outbound compression table to [{}]", remoteAddress) - compression - } else NoOutboundCompressions - } - - /** - * This proxy uses the current associationStates compression table, which is reset for a new incarnation. - * This way the same outgoing stream will switch to using the new table without the need of restarting it. - */ - private object CurrentAssociationStateOutboundCompressionsProxy extends OutboundCompressions { - - override def actorRefCompressionTableVersion: Int = - associationState.outboundCompressions.actorRefCompressionTableVersion - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = { - associationState.outboundCompressions.applyActorRefCompressionTable(table) - } - override final def compressActorRef(ref: ActorRef): Int = { - associationState.outboundCompressions.compressActorRef(ref) - } - - override def classManifestCompressionTableVersion: Int = - associationState.outboundCompressions.classManifestCompressionTableVersion - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = - associationState.outboundCompressions.applyClassManifestCompressionTable(table) - override final def compressClassManifest(manifest: String): Int = - associationState.outboundCompressions.compressClassManifest(manifest) - - override def toString = - s"${Logging.simpleName(getClass)}(current delegate: ${associationState.outboundCompressions})" - } - override def toString: String = s"Association($localAddress -> $remoteAddress with $associationState)" @@ -489,17 +481,23 @@ private[remote] class AssociationRegistry(createAssociation: Address ⇒ Associa @tailrec final def setUID(peer: UniqueAddress): Association = { val currentMap = associationsByUid.get val a = association(peer.address) - // make sure we don't overwrite same UID with different association + currentMap.get(peer.uid) match { - case OptionVal.Some(previous) if (previous ne a) ⇒ - throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]") - case _ ⇒ // ok + case OptionVal.Some(previous) ⇒ + if (previous eq a) + // associationsByUid Map already contains the right association + a + else + // make sure we don't overwrite same UID with different association + throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]") + case _ ⇒ + // update associationsByUid Map with the uid -> assocation + val newMap = currentMap.updated(peer.uid, a) + if (associationsByUid.compareAndSet(currentMap, newMap)) + a + else + setUID(peer) // lost CAS, retry } - val newMap = currentMap.updated(peer.uid, a) - if (associationsByUid.compareAndSet(currentMap, newMap)) - a - else - setUID(peer) // lost CAS, retry } def allAssociations: Set[Association] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 0b4bfbf558..dc07002d35 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -9,12 +9,13 @@ import java.nio.{ ByteBuffer, ByteOrder } import akka.actor.{ ActorRef, Address } import akka.remote.artery.compress.CompressionProtocol._ -import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions } +import akka.remote.artery.compress.{ CompressionTable, InboundCompressions } import akka.serialization.Serialization import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } import akka.util.{ OptionVal, Unsafe } import scala.util.control.NonFatal +import akka.remote.artery.compress.NoInboundCompressions /** * INTERNAL API @@ -77,25 +78,34 @@ private[remote] object HeaderBuilder { // We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl // we inject no-op compression's of the "other side". - def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoOutboundCompressions) - def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoInboundCompressions, compression) + def in(compression: InboundCompressions): HeaderBuilder = + new HeaderBuilderImpl(compression, CompressionTable.empty[ActorRef], CompressionTable.empty[String]) + def out(): HeaderBuilder = + new HeaderBuilderImpl(NoInboundCompressions, CompressionTable.empty[ActorRef], CompressionTable.empty[String]) /** INTERNAL API, FOR TESTING ONLY */ - private[remote] def bothWays(in: InboundCompressions, out: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(in, out) + private[remote] def bothWays( + in: InboundCompressions, + outboundActorRefCompression: CompressionTable[ActorRef], + outboundClassManifestCompression: CompressionTable[String]): HeaderBuilder = + new HeaderBuilderImpl(in, outboundActorRefCompression, outboundClassManifestCompression) } /** * INTERNAL API */ -sealed trait HeaderBuilder { +private[remote] sealed trait HeaderBuilder { def setVersion(v: Int): Unit def version: Int - def setActorRefCompressionTableVersion(v: Int): Unit - def actorRefCompressionTableVersion: Int + def inboundActorRefCompressionTableVersion: Int + def inboundClassManifestCompressionTableVersion: Int - def setClassManifestCompressionTableVersion(v: Int): Unit - def classManifestCompressionTableVersion: Int + def outboundActorRefCompression: CompressionTable[ActorRef] + def setOutboundActorRefCompression(table: CompressionTable[ActorRef]): Unit + + def outboundClassManifestCompression: CompressionTable[String] + def setOutboundClassManifestCompression(table: CompressionTable[String]): Unit def setUid(u: Long): Unit def uid: Long @@ -140,12 +150,15 @@ sealed trait HeaderBuilder { /** * INTERNAL API */ -private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompressions, outboundCompression: OutboundCompressions) extends HeaderBuilder { +private[remote] final class HeaderBuilderImpl( + inboundCompression: InboundCompressions, + var _outboundActorRefCompression: CompressionTable[ActorRef], + var _outboundClassManifestCompression: CompressionTable[String]) extends HeaderBuilder { // Fields only available for EnvelopeBuffer var _version: Int = _ var _uid: Long = _ - var _actorRefCompressionTableVersion: Int = 0 - var _classManifestCompressionTableVersion: Int = 0 + var _inboundActorRefCompressionTableVersion: Int = 0 + var _inboundClassManifestCompressionTableVersion: Int = 0 var _senderActorRef: String = null var _senderActorRefIdx: Int = -1 @@ -162,14 +175,19 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres override def setUid(uid: Long) = _uid = uid override def uid: Long = _uid - override def setActorRefCompressionTableVersion(v: Int): Unit = _actorRefCompressionTableVersion = v - override def actorRefCompressionTableVersion: Int = _actorRefCompressionTableVersion + override def inboundActorRefCompressionTableVersion: Int = _inboundActorRefCompressionTableVersion + override def inboundClassManifestCompressionTableVersion: Int = _inboundClassManifestCompressionTableVersion - override def setClassManifestCompressionTableVersion(v: Int): Unit = _classManifestCompressionTableVersion = v - override def classManifestCompressionTableVersion: Int = _classManifestCompressionTableVersion + def setOutboundActorRefCompression(table: CompressionTable[ActorRef]): Unit = + _outboundActorRefCompression = table + override def outboundActorRefCompression: CompressionTable[ActorRef] = _outboundActorRefCompression + + def setOutboundClassManifestCompression(table: CompressionTable[String]): Unit = + _outboundClassManifestCompression = table + def outboundClassManifestCompression: CompressionTable[String] = _outboundClassManifestCompression override def setSenderActorRef(ref: ActorRef): Unit = { - _senderActorRefIdx = outboundCompression.compressActorRef(ref) + _senderActorRefIdx = outboundActorRefCompression.compress(ref) if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref) // includes local address from `currentTransportInformation` } override def setNoSender(): Unit = { @@ -179,7 +197,8 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres override def isNoSender: Boolean = (_senderActorRef eq null) && _senderActorRefIdx == EnvelopeBuffer.DeadLettersCode override def senderActorRef(originUid: Long): OptionVal[ActorRef] = - if (_senderActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _senderActorRefIdx) + if (_senderActorRef eq null) + inboundCompression.decompressActorRef(originUid, inboundActorRefCompressionTableVersion, _senderActorRefIdx) else OptionVal.None def senderActorRefPath: OptionVal[String] = OptionVal(_senderActorRef) @@ -192,13 +211,14 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres (_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode def setRecipientActorRef(ref: ActorRef): Unit = { - _recipientActorRefIdx = outboundCompression.compressActorRef(ref) + _recipientActorRefIdx = outboundActorRefCompression.compress(ref) if (_recipientActorRefIdx == -1) { _recipientActorRef = ref.path.toSerializationFormat } } def recipientActorRef(originUid: Long): OptionVal[ActorRef] = - if (_recipientActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _recipientActorRefIdx) + if (_recipientActorRef eq null) + inboundCompression.decompressActorRef(originUid, inboundActorRefCompressionTableVersion, _recipientActorRefIdx) else OptionVal.None def recipientActorRefPath: OptionVal[String] = OptionVal(_recipientActorRef) @@ -210,13 +230,15 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres _serializer override def setManifest(manifest: String): Unit = { - _manifestIdx = outboundCompression.compressClassManifest(manifest) + _manifestIdx = outboundClassManifestCompression.compress(manifest) if (_manifestIdx == -1) _manifest = manifest } override def manifest(originUid: Long): String = { if (_manifest ne null) _manifest else { - _manifest = inboundCompression.decompressClassManifest(originUid, classManifestCompressionTableVersion, _manifestIdx).get + _manifest = inboundCompression.decompressClassManifest( + originUid, + inboundClassManifestCompressionTableVersion, _manifestIdx).get _manifest } } @@ -224,8 +246,6 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres override def toString = "HeaderBuilderImpl(" + "version:" + version + ", " + - "actorRefCompressionTableVersion:" + actorRefCompressionTableVersion + ", " + - "classManifestCompressionTableVersion:" + classManifestCompressionTableVersion + ", " + "uid:" + uid + ", " + "_senderActorRef:" + _senderActorRef + ", " + "_senderActorRefIdx:" + _senderActorRefIdx + ", " + @@ -257,8 +277,8 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.putInt(header.serializer) // compression table version numbers - byteBuffer.putInt(ActorRefCompressionTableVersionTagOffset, header._actorRefCompressionTableVersion | TagTypeMask) - byteBuffer.putInt(ClassManifestCompressionTableVersionTagOffset, header._classManifestCompressionTableVersion | TagTypeMask) + byteBuffer.putInt(ActorRefCompressionTableVersionTagOffset, header.outboundActorRefCompression.version | TagTypeMask) + byteBuffer.putInt(ClassManifestCompressionTableVersionTagOffset, header.outboundClassManifestCompression.version | TagTypeMask) // Write compressable, variable-length parts always to the actual position of the buffer // Write tag values explicitly in their proper offset @@ -294,11 +314,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // compression table versions (stored in the Tag) val refCompressionVersionTag = byteBuffer.getInt(ActorRefCompressionTableVersionTagOffset) if ((refCompressionVersionTag & TagTypeMask) != 0) { - header setActorRefCompressionTableVersion refCompressionVersionTag & TagValueMask + header._inboundActorRefCompressionTableVersion = refCompressionVersionTag & TagValueMask } val manifestCompressionVersionTag = byteBuffer.getInt(ClassManifestCompressionTableVersionTagOffset) if ((manifestCompressionVersionTag & TagTypeMask) != 0) { - header setClassManifestCompressionTableVersion manifestCompressionVersionTag & TagValueMask + header._inboundClassManifestCompressionTableVersion = manifestCompressionVersionTag & TagValueMask } // Read compressable, variable-length parts always from the actual position of the buffer diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 6c9a1149dc..5c2243d97b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -13,9 +13,28 @@ import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.util.{ ByteString, OptionVal, PrettyByteString } import akka.actor.EmptyLocalActorRef -import akka.remote.artery.compress.{ InboundCompressions, OutboundCompressions, OutboundCompressionsImpl } +import akka.remote.artery.compress.InboundCompressions import akka.stream.stage.TimerGraphStageLogic import java.util.concurrent.TimeUnit +import scala.concurrent.Future +import akka.remote.artery.compress.CompressionTable +import akka.Done +import akka.stream.stage.GraphStageWithMaterializedValue +import scala.concurrent.Promise + +/** + * INTERNAL API + */ +private[remote] object Encoder { + private[remote] trait ChangeOutboundCompression { + def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] + def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] + def clearCompression(): Future[Done] + } + + private[remote] class ChangeOutboundCompressionFailed extends RuntimeException( + "Change of outbound compression table failed (will be retried), because materialization did not complete yet") +} /** * INTERNAL API @@ -23,42 +42,49 @@ import java.util.concurrent.TimeUnit private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, system: ActorSystem, - compression: OutboundCompressions, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], bufferPool: EnvelopeBufferPool) - extends GraphStage[FlowShape[OutboundEnvelope, EnvelopeBuffer]] { + extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.ChangeOutboundCompression] { + import Encoder._ val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in") val out: Outlet[EnvelopeBuffer] = Outlet("Artery.Encoder.out") val shape: FlowShape[OutboundEnvelope, EnvelopeBuffer] = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ChangeOutboundCompression) = { + val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with ChangeOutboundCompression { - private val headerBuilder = HeaderBuilder.out(compression) + private val headerBuilder = HeaderBuilder.out() headerBuilder setVersion ArteryTransport.Version headerBuilder setUid uniqueLocalAddress.uid private val localAddress = uniqueLocalAddress.address private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) + private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] { + case (table, done) ⇒ + headerBuilder.setOutboundActorRefCompression(table) + done.success(Done) + } + + private val changeClassManifsetCompressionCb = getAsyncCallback[(CompressionTable[String], Promise[Done])] { + case (table, done) ⇒ + headerBuilder.setOutboundClassManifestCompression(table) + done.success(Done) + } + + private val clearCompressionCb = getAsyncCallback[Promise[Done]] { done ⇒ + headerBuilder.setOutboundActorRefCompression(CompressionTable.empty[ActorRef]) + headerBuilder.setOutboundClassManifestCompression(CompressionTable.empty[String]) + done.success(Done) + } + override protected def logSource = classOf[Encoder] override def onPush(): Unit = { val outboundEnvelope = grab(in) val envelope = bufferPool.acquire() - // FIXME: OMG race between setting the version, and using the table!!!! - // incoming messages are concurrent to outgoing ones - // incoming message may be table advertisement - // which swaps the table in Outgoing*Compression for the new one (n+1) - // by itself it does so atomically, - // race: however here we store the compression table version separately from actually using it (storing the refs / manifests etc). - // so there is a slight race IF the table is swapped right between us setting the version n here [then the table being swapped to n+1] and then we use the n+1 version to compressions the compressions (which the receiving end will fail to read, since the encoding could be completely different, and it picks the table based on the version Int). - // A solution would be to getTable => use it to set and compress things - headerBuilder setActorRefCompressionTableVersion compression.actorRefCompressionTableVersion - headerBuilder setClassManifestCompressionTableVersion compression.classManifestCompressionTableVersion - // internally compression is applied by the builder: outboundEnvelope.recipient match { case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r @@ -109,8 +135,49 @@ private[remote] class Encoder( override def onPull(): Unit = pull(in) + /** + * External call from ChangeOutboundCompression materialized value + */ + override def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = { + val done = Promise[Done]() + try changeActorRefCompressionCb.invoke((table, done)) catch { + // This is a harmless failure, it will be retried on next advertisement or handshake attempt. + // It will only occur when callback is invoked before preStart. That is highly unlikely to + // happen since advertisement is not done immediately and handshake involves network roundtrip. + case NonFatal(_) ⇒ done.tryFailure(new ChangeOutboundCompressionFailed) + } + done.future + } + + /** + * External call from ChangeOutboundCompression materialized value + */ + override def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = { + val done = Promise[Done]() + try changeClassManifsetCompressionCb.invoke((table, done)) catch { + // in case materialization not completed yet + case NonFatal(_) ⇒ done.tryFailure(new ChangeOutboundCompressionFailed) + } + done.future + } + + /** + * External call from ChangeOutboundCompression materialized value + */ + override def clearCompression(): Future[Done] = { + val done = Promise[Done]() + try clearCompressionCb.invoke(done) catch { + // in case materialization not completed yet + case NonFatal(_) ⇒ done.tryFailure(new ChangeOutboundCompressionFailed) + } + done.future + } + setHandlers(in, out, this) } + + (logic, logic) + } } /** @@ -198,20 +265,17 @@ private[remote] class Decoder( val remoteAddress = assoc.remoteAddress sender match { case OptionVal.Some(snd) ⇒ - compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, - remoteAddress, snd, 1) + compression.hitActorRef(originUid, remoteAddress, snd, 1) case OptionVal.None ⇒ } recipient match { case OptionVal.Some(rcp) ⇒ - compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, - remoteAddress, rcp, 1) + compression.hitActorRef(originUid, remoteAddress, rcp, 1) case OptionVal.None ⇒ } - compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, - remoteAddress, classManifest, 1) + compression.hitClassManifest(originUid, remoteAddress, classManifest, 1) case _ ⇒ // we don't want to record hits for compression while handshake is still in progress. diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index a7c8420377..5f4a50a3b4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -18,6 +18,8 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import akka.util.OptionVal +import akka.Done +import scala.concurrent.Future /** * INTERNAL API @@ -177,8 +179,9 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt env.message match { case HandshakeReq(from) ⇒ onHandshakeReq(from) case HandshakeRsp(from) ⇒ - inboundContext.completeHandshake(from) - pull(in) + after(inboundContext.completeHandshake(from)) { + pull(in) + } case _ ⇒ onMessage(env) } @@ -197,21 +200,32 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt }) private def onHandshakeReq(from: UniqueAddress): Unit = { - inboundContext.completeHandshake(from) - inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) - pull(in) + after(inboundContext.completeHandshake(from)) { + inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) + pull(in) + } + } + + private def after(first: Future[Done])(thenInside: ⇒ Unit): Unit = { + first.value match { + case Some(_) ⇒ + // This in the normal case (all but the first). The future will be completed + // because handshake was already completed. Note that we send those HandshakeReq + // periodically. + thenInside + case None ⇒ + implicit val ec = materializer.executionContext + first.onComplete { _ ⇒ + getAsyncCallback[Done](_ ⇒ thenInside).invoke(Done) + } + } + } private def onMessage(env: InboundEnvelope): Unit = { if (isKnownOrigin(env)) push(out, env) else { - // FIXME remove, only debug - log.warning( - s"Dropping message [{}] from unknown system with UID [{}]. " + - "This system with UID [{}] was probably restarted. " + - "Messages will be accepted when new handshake has been completed.", - env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid) if (log.isDebugEnabled) log.debug( s"Dropping message [{}] from unknown system with UID [{}]. " + @@ -222,8 +236,12 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt } } - private def isKnownOrigin(env: InboundEnvelope): Boolean = - env.association.isDefined + private def isKnownOrigin(env: InboundEnvelope): Boolean = { + // the association is passed in the envelope from the Decoder stage to avoid + // additional lookup. The second OR case is because if we didn't use fusing it + // would be possible that it was not found by Decoder (handshake not completed yet) + env.association.isDefined || inboundContext.association(env.originUid).isDefined + } // OutHandler override def onPull(): Unit = pull(in) diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala deleted file mode 100644 index fdbc067d61..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import java.util.function.LongFunction - -import akka.actor.{ ActorRef, ActorSystem, Address } -import akka.remote.artery._ -import akka.util.OptionVal -import org.agrona.collections.Long2ObjectHashMap - -/** INTERNAL API */ -private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompressions { - - private val actorRefsOut = new OutboundActorRefCompression(system, remoteAddress) - private val classManifestsOut = new OutboundClassManifestCompression(system, remoteAddress) - - // actor ref compression --- - - override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) - override def actorRefCompressionTableVersion: Int = actorRefsOut.activeCompressionTableVersion - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = - actorRefsOut.flipTable(table) - - // class manifest compression --- - - override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) - override def classManifestCompressionTableVersion: Int = classManifestsOut.activeCompressionTableVersion - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = - classManifestsOut.flipTable(table) -} - -/** - * INTERNAL API - * - * One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand. - */ -private[remote] final class InboundCompressionsImpl( - system: ActorSystem, - inboundContext: InboundContext) extends InboundCompressions { - - private val settings = CompressionSettings(system) - - // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers - private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() - private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { - override def apply(originUid: Long): InboundActorRefCompression = { - val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) - new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters) - } - } - private def actorRefsIn(originUid: Long): InboundActorRefCompression = - _actorRefsIns.computeIfAbsent(originUid, createInboundActorRefsForOrigin) - - private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]() - private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { - override def apply(originUid: Long): InboundManifestCompression = { - val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) - new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters) - } - } - private def classManifestsIn(originUid: Long): InboundManifestCompression = - _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin) - - // actor ref compression --- - - override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = - actorRefsIn(originUid).decompress(tableVersion, idx) - override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef, n: Int): Unit = - actorRefsIn(originUid).increment(address, ref, n) - override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = - actorRefsIn(originUid).confirmAdvertisement(tableVersion) - - // class manifest compression --- - - override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = - classManifestsIn(originUid).decompress(tableVersion, idx) - override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String, n: Int): Unit = - classManifestsIn(originUid).increment(address, manifest, n) - override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = - actorRefsIn(originUid).confirmAdvertisement(tableVersion) - - // testing utilities --- - - /** INTERNAL API: for testing only */ - private[remote] def runNextActorRefAdvertisement() = { - import scala.collection.JavaConverters._ - _actorRefsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } - } - - /** INTERNAL API: for testing only */ - private[remote] def runNextClassManifestAdvertisement() = { - import scala.collection.JavaConverters._ - _classManifestsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } - } -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala deleted file mode 100644 index e90db8bd31..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import akka.actor.{ ActorRef, Address } -import akka.util.OptionVal - -/** - * INTERNAL API - * Decompress and cause compression advertisements. - * - * One per inbound message stream thus must demux by originUid to use the right tables. - */ -private[remote] trait InboundCompressions { - def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit - def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] - def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit - - def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit - def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] - def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit -} -/** - * INTERNAL API - * Compress outgoing data and handle compression advertisements to fill compression table. - * - * One per outgoing message stream. - */ -private[remote] trait OutboundCompressions { - def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit - def actorRefCompressionTableVersion: Int - def compressActorRef(ref: ActorRef): Int - - def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit - def classManifestCompressionTableVersion: Int - def compressClassManifest(manifest: String): Int -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala index dfc612868b..d07866e239 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala @@ -6,6 +6,13 @@ package akka.remote.artery.compress /** INTERNAL API: Versioned compression table to be advertised between systems */ private[akka] final case class CompressionTable[T](version: Int, map: Map[T, Int]) { + import CompressionTable.NotCompressedId + + def compress(value: T): Int = + map.get(value) match { + case Some(id) ⇒ id + case None ⇒ NotCompressedId + } def invert: DecompressionTable[T] = if (map.isEmpty) DecompressionTable.empty[T].copy(version = version) @@ -25,6 +32,8 @@ private[akka] final case class CompressionTable[T](version: Int, map: Map[T, Int } /** INTERNAL API */ private[remote] object CompressionTable { + final val NotCompressedId = -1 + private[this] val _empty = new CompressionTable[Any](0, Map.empty) def empty[T] = _empty.asInstanceOf[CompressionTable[T]] } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 7fd5b5bfd4..5be9354a32 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -4,12 +4,97 @@ package akka.remote.artery.compress +import java.util.concurrent.atomic.AtomicReference +import java.util.function.LongFunction + +import scala.concurrent.duration.{ Duration, FiniteDuration } + import akka.actor.{ ActorRef, ActorSystem, Address } import akka.event.{ Logging, NoLogging } import akka.remote.artery.{ InboundContext, OutboundContext } import akka.util.{ OptionVal, PrettyDuration } -import scala.concurrent.duration.{ Duration, FiniteDuration } -import java.util.concurrent.atomic.AtomicReference +import org.agrona.collections.Long2ObjectHashMap + +/** + * INTERNAL API + * Decompress and cause compression advertisements. + * + * One per inbound message stream thus must demux by originUid to use the right tables. + */ +private[remote] trait InboundCompressions { + def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit + def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] + def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit + + def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit + def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] + def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit +} + +/** + * INTERNAL API + * + * One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand. + */ +private[remote] final class InboundCompressionsImpl( + system: ActorSystem, + inboundContext: InboundContext) extends InboundCompressions { + + private val settings = CompressionSettings(system) + + // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers + private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() + private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { + override def apply(originUid: Long): InboundActorRefCompression = { + val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) + new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters) + } + } + private def actorRefsIn(originUid: Long): InboundActorRefCompression = + _actorRefsIns.computeIfAbsent(originUid, createInboundActorRefsForOrigin) + + private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]() + private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { + override def apply(originUid: Long): InboundManifestCompression = { + val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) + new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters) + } + } + private def classManifestsIn(originUid: Long): InboundManifestCompression = + _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin) + + // actor ref compression --- + + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = + actorRefsIn(originUid).decompress(tableVersion, idx) + override def hitActorRef(originUid: Long, address: Address, ref: ActorRef, n: Int): Unit = + actorRefsIn(originUid).increment(address, ref, n) + override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = + actorRefsIn(originUid).confirmAdvertisement(tableVersion) + + // class manifest compression --- + + override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = + classManifestsIn(originUid).decompress(tableVersion, idx) + override def hitClassManifest(originUid: Long, address: Address, manifest: String, n: Int): Unit = + classManifestsIn(originUid).increment(address, manifest, n) + override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = + actorRefsIn(originUid).confirmAdvertisement(tableVersion) + + // testing utilities --- + + /** INTERNAL API: for testing only */ + private[remote] def runNextActorRefAdvertisement() = { + import scala.collection.JavaConverters._ + _actorRefsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } + } + + /** INTERNAL API: for testing only */ + private[remote] def runNextClassManifestAdvertisement() = { + import scala.collection.JavaConverters._ + _classManifestsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } + } +} /** * INTERNAL API @@ -105,7 +190,7 @@ private[remote] abstract class InboundCompression[T >: Null]( lazy val log = Logging(system, getClass.getSimpleName) - // TODO NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet, + // FIXME NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet, // yet we technically could already prepare table 3, then it starts using table 1 suddenly. Edge cases like that. // SOLUTION 1: We don't start building new tables until we've seen the previous one be used (move from new to active) // This is nice as it practically disables all the "build the table" work when the other side is not interested in using it. diff --git a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala similarity index 51% rename from akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala rename to akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala index 63886945d5..7219cb0cfa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala @@ -1,10 +1,9 @@ /** * Copyright (C) 2016 Lightbend Inc. */ -package akka.remote.artery +package akka.remote.artery.compress import akka.actor.{ ActorRef, Address } -import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions } import akka.util.OptionVal /** @@ -13,30 +12,16 @@ import akka.util.OptionVal * Literarily, no compression! */ case object NoInboundCompressions extends InboundCompressions { - override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = () + override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else OptionVal.None override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () - override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = () + override def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit = () override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else OptionVal.None override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () } -/** - * INTERNAL API - * - * Literarily, no compression! - */ -case object NoOutboundCompressions extends OutboundCompressions { - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = () - override def actorRefCompressionTableVersion: Int = 0 - override def compressActorRef(ref: ActorRef): Int = -1 - - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = () - override def classManifestCompressionTableVersion: Int = 0 - override def compressClassManifest(manifest: String): Int = -1 -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala deleted file mode 100644 index 59756afe32..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import java.util.concurrent.atomic.AtomicReference -import java.{ util ⇒ ju } - -import akka.actor.{ ActorRef, ActorSystem, Address } -import akka.event.{ Logging, LoggingAdapter } -import akka.remote.artery.compress.OutboundCompression.OutboundCompressionState - -import scala.annotation.tailrec - -/** INTERNAL API */ -private[remote] final class OutboundActorRefCompression(system: ActorSystem, remoteAddress: Address) - extends OutboundCompressionTable[ActorRef](system, remoteAddress) { - - flipTable(CompressionTable( - version = 0, - map = Map( - system.deadLetters → 0))) -} - -/** INTERNAL API */ -private[remote] final class OutboundClassManifestCompression(system: ActorSystem, remoteAddress: Address) - extends OutboundCompressionTable[String](system, remoteAddress) { - - flipTable(CompressionTable(version = 0, Map.empty)) -} - -/** - * INTERNAL API - * Base class for all outgoing compression. - * Encapsulates the compressedId registration and lookup. - */ -private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAddress: Address) - extends AtomicReference[OutboundCompressionState[T]](OutboundCompressionState.initial) { // TODO could be instead via Unsafe - import OutboundCompression._ - - // TODO: The compression map may benefit from padding if we want multiple compressions to be running in parallel - - protected val log: LoggingAdapter = Logging(system, Logging.simpleName(getClass)) - - // TODO this exposes us to a race between setting the Version and USING the table...? - def activeCompressionTableVersion = { - val version = get.version - version - } - - /** - * Flips the currently used compression table to the new one (iff the new one has a version number higher than the currently used one). - */ - // (╯°□°)╯︵ ┻━┻ - @tailrec final def flipTable(activate: CompressionTable[T]): Unit = { - val state = get() - if (activate.version > state.version) // TODO this should handle roll-over as we move to Byte - if (compareAndSet(state, prepareState(activate))) - log.debug(s"Successfully flipped compression table versions {}=>{}, for outgoing to [{}]", state.version, activate.version, remoteAddress) - else - flipTable(activate) // retry - else if (state.version == activate.version) - log.debug("Received duplicate compression table (version: {})! Ignoring it.", state.version) - else - log.error("Received unexpected compression table with version nr [{}]! " + - "Current version number is [{}].", activate.version, state.version) - - } - - // TODO this is crazy hot-path; optimised FastUtil-like Object->int hash map would perform better here (and avoid Integer) allocs - final def compress(value: T): Int = - get().table.getOrDefault(value, NotCompressedId) - - private final def prepareState(activate: CompressionTable[T]): OutboundCompressionState[T] = { - val size = activate.map.size - // load factor is `1` since we will never grow this table beyond the initial size, - // this way we can avoid any rehashing from happening. - val m = new ju.HashMap[T, Integer](size, 1.0f) // TODO could be replaced with primitive `int` specialized version - activate.map.foreach { - case (key, value) ⇒ m.put(key, value) // TODO boxing :< - } - OutboundCompressionState(activate.version, m) - } - - def toDebugString: String = { - s"""${Logging.simpleName(getClass)}( - | version: ${get.version} to [$remoteAddress] - | ${get.table} - |)""".stripMargin - } - - override def toString = { - val s = get - s"""${Logging.simpleName(getClass)}(to: $remoteAddress, version: ${s.version}, compressedEntries: ${s.table.size})""" - } - -} - -/** INTERNAL API */ -private[remote] object OutboundCompression { - // format: OFF - final val DeadLettersId = 0 - final val NotCompressedId = -1 - - // format: ON - - /** INTERNAL API */ - private[remote] final case class OutboundCompressionState[T](version: Int, table: ju.Map[T, Integer]) - private[remote] object OutboundCompressionState { - def initial[T] = OutboundCompressionState[T](-1, ju.Collections.emptyMap()) - } - -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index d9a3040a28..644b321491 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -7,14 +7,14 @@ package akka.remote.artery import java.nio.{ ByteBuffer, ByteOrder } import akka.actor._ -import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils, InboundCompressions, OutboundCompressions } +import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils, InboundCompressions } import akka.testkit.AkkaSpec import akka.util.{ ByteString, OptionVal } class EnvelopeBufferSpec extends AkkaSpec { import CompressionTestUtils._ - object TestCompressor extends InboundCompressions with OutboundCompressions { + object TestCompressor extends InboundCompressions { val refToIdx: Map[ActorRef, Int] = Map( minimalRef("compressable0") → 0, minimalRef("compressable1") → 1, @@ -31,24 +31,24 @@ class EnvelopeBufferSpec extends AkkaSpec { "manifest1" → 1) val idxToManifest = manifestToIdx.map(_.swap) - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests - override def actorRefCompressionTableVersion: Int = 0 - override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1) - override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = () + val outboundActorRefTable: CompressionTable[ActorRef] = + CompressionTable(version = 0xCAFE, refToIdx) + + val outboundClassManifestTable: CompressionTable[String] = + CompressionTable(version = 0xBABE, manifestToIdx) + + override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx)) override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests - override def classManifestCompressionTableVersion: Int = 0 - override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1) - override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = () + override def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit = () override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx)) override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () } "EnvelopeBuffer" must { - val headerIn = HeaderBuilder.bothWays(TestCompressor, TestCompressor) - val headerOut = HeaderBuilder.bothWays(TestCompressor, TestCompressor) + val headerIn = HeaderBuilder.bothWays(TestCompressor, TestCompressor.outboundActorRefTable, TestCompressor.outboundClassManifestTable) + val headerOut = HeaderBuilder.bothWays(TestCompressor, TestCompressor.outboundActorRefTable, TestCompressor.outboundClassManifestTable) val byteBuffer = ByteBuffer.allocate(1024).order(ByteOrder.LITTLE_ENDIAN) val envelope = new EnvelopeBuffer(byteBuffer) @@ -59,8 +59,6 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn setVersion 1 headerIn setUid 42 headerIn setSerializer 4 - headerIn setActorRefCompressionTableVersion 0xCAFE - headerIn setClassManifestCompressionTableVersion 0xBABE headerIn setRecipientActorRef minimalRef("compressable1") headerIn setSenderActorRef minimalRef("compressable0") @@ -74,8 +72,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) - headerOut.actorRefCompressionTableVersion should ===(0xCAFE) - headerOut.classManifestCompressionTableVersion should ===(0xBABE) + headerOut.inboundActorRefCompressionTableVersion should ===(0xCAFE) + headerOut.inboundClassManifestCompressionTableVersion should ===(0xBABE) headerOut.serializer should ===(4) headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/compressable0") headerOut.senderActorRefPath should ===(OptionVal.None) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 41c12280f8..04233b14ef 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -18,6 +18,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.util.OptionVal import akka.actor.InternalActorRef +import akka.dispatch.ExecutionContexts private[akka] class TestInboundContext( override val localAddress: UniqueAddress, @@ -47,10 +48,13 @@ private[akka] class TestInboundContext( override def association(uid: Long): OptionVal[OutboundContext] = OptionVal(associationsByUid.get(uid)) - override def completeHandshake(peer: UniqueAddress): Unit = { + override def completeHandshake(peer: UniqueAddress): Future[Done] = { val a = association(peer.address).asInstanceOf[TestOutboundContext] - a.completeHandshake(peer) - associationsByUid.put(peer.uid, a) + val done = a.completeHandshake(peer) + done.foreach { _ ⇒ + associationsByUid.put(peer.uid, a) + }(ExecutionContexts.sameThreadExecutionContext) + done } protected def createAssociation(remoteAddress: Address): TestOutboundContext = @@ -70,13 +74,14 @@ private[akka] class TestOutboundContext( _associationState } - def completeHandshake(peer: UniqueAddress): Unit = synchronized { + def completeHandshake(peer: UniqueAddress): Future[Done] = synchronized { _associationState.uniqueRemoteAddressPromise.trySuccess(peer) _associationState.uniqueRemoteAddress.value match { case Some(Success(`peer`)) ⇒ // our value case _ ⇒ - _associationState = _associationState.newIncarnation(Promise.successful(peer), NoOutboundCompressions) + _associationState = _associationState.newIncarnation(Promise.successful(peer)) } + Future.successful(Done) } override def quarantine(reason: String): Unit = synchronized { diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala index 723a4c9a13..ac04610e9b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala @@ -10,40 +10,23 @@ import akka.testkit.AkkaSpec class OutboundCompressionSpec extends AkkaSpec { import CompressionTestUtils._ - val remoteAddress = Address("artery", "example", "localhost", 0) - - "OutboundCompression" must { - "not compress not-known values" in { - val table = new OutboundActorRefCompression(system, remoteAddress) - table.compress(minimalRef("banana")) should ===(-1) - } - } - - "OutboundActorRefCompression" must { + "Outbound ActorRef compression" must { val alice = minimalRef("alice") val bob = minimalRef("bob") - "always compress /deadLetters" in { - val table = new OutboundActorRefCompression(system, remoteAddress) - table.compress(system.deadLetters) should ===(0) - } - "not compress unknown actor ref" in { - val table = new OutboundActorRefCompression(system, remoteAddress) + val table = CompressionTable.empty[ActorRef] table.compress(alice) should ===(-1) // not compressed } "compress previously registered actor ref" in { - val compression = new OutboundActorRefCompression(system, remoteAddress) val table = CompressionTable(1, Map(system.deadLetters → 0, alice → 1)) - compression.flipTable(table) - compression.compress(alice) should ===(1) // compressed - compression.compress(bob) should ===(-1) // not compressed + table.compress(alice) should ===(1) // compressed + table.compress(bob) should ===(-1) // not compressed val table2 = table.copy(2, map = table.map.updated(bob, 2)) - compression.flipTable(table2) - compression.compress(alice) should ===(1) // compressed - compression.compress(bob) should ===(2) // compressed + table2.compress(alice) should ===(1) // compressed + table2.compress(bob) should ===(2) // compressed } }