diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index f08d5f87af..98974a245b 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -29,6 +29,8 @@ import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ import akka.util.OptionVal import akka.actor.Address +import scala.concurrent.Future +import akka.Done @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -57,7 +59,6 @@ class CodecBenchmark { private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) - val compressionOut = NoOutboundCompressions val headerIn = HeaderBuilder.in(NoInboundCompressions) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) @@ -73,7 +74,7 @@ class CodecBenchmark { // the following methods are not used by in this test override def sendControl(to: Address, message: ControlMessage): Unit = ??? override def association(remoteAddress: Address): OutboundContext = ??? - override def completeHandshake(peer: UniqueAddress): Unit = ??? + override def completeHandshake(peer: UniqueAddress): Future[Done] = ??? } private var materializer: ActorMaterializer = _ @@ -136,8 +137,8 @@ class CodecBenchmark { val latch = new CountDownLatch(1) val N = 100000 - val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, outboundEnvelopePool, envelopePool)) + val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, outboundEnvelopePool, envelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) @@ -193,8 +194,8 @@ class CodecBenchmark { val latch = new CountDownLatch(1) val N = 100000 - val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, outboundEnvelopePool, envelopePool)) + val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, outboundEnvelopePool, envelopePool)) val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) val provider = RARP(system).provider diff --git a/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala index 4297dbb171..b6e14f9465 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala @@ -49,7 +49,7 @@ class HeavyHittersBenchmark { @Param(Array("8192")) var n: Int = 0 - var topN: TopHeavyHitters[String] = _ + private var topN: TopHeavyHitters[String] = _ val rand = new Random(1001021) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 6131cb2cc6..3c4c10d5ec 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -3,22 +3,30 @@ */ package akka.remote.artery +import java.io.File +import java.net.InetSocketAddress +import java.nio.channels.{ DatagramChannel, FileChannel } + import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.TimeUnit.MICROSECONDS -import akka.remote.artery.compress.CompressionProtocol.CompressionMessage - +import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try +import scala.util.control.NonFatal + import akka.Done import akka.NotUsed import akka.actor._ +import akka.actor.Actor import akka.actor.Cancellable +import akka.actor.Props import akka.event.Logging import akka.event.LoggingAdapter import akka.remote.AddressUidExtension @@ -30,13 +38,17 @@ import akka.remote.RemoteTransport import akka.remote.RemotingLifecycleEvent import akka.remote.ThisActorSystemQuarantinedEvent import akka.remote.UniqueAddress +import akka.remote.artery.Encoder.ChangeOutboundCompression import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress +import akka.remote.artery.compress._ +import akka.remote.artery.compress.CompressionProtocol.CompressionMessage import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec -import akka.remote.artery.compress._ import akka.stream.AbruptTerminationException import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings import akka.stream.KillSwitches import akka.stream.Materializer import akka.stream.SharedKillSwitch @@ -46,35 +58,19 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.util.Helpers.ConfigOps import akka.util.Helpers.Requiring +import akka.util.OptionVal import akka.util.WildcardIndex import io.aeron.Aeron import io.aeron.AvailableImageHandler +import io.aeron.CncFileDescriptor import io.aeron.Image import io.aeron.UnavailableImageHandler import io.aeron.driver.MediaDriver +import io.aeron.driver.ThreadingMode import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil -import java.io.File -import java.net.InetSocketAddress -import java.nio.channels.{ DatagramChannel, FileChannel } - -import akka.remote.artery.OutboundControlJunction.OutboundControlIngress -import io.aeron.CncFileDescriptor -import java.util.concurrent.atomic.AtomicLong - -import scala.collection.JavaConverters._ -import akka.stream.ActorMaterializerSettings - -import scala.annotation.tailrec -import akka.util.OptionVal -import io.aeron.driver.ThreadingMode import org.agrona.concurrent.BackoffIdleStrategy -import org.agrona.concurrent.BusySpinIdleStrategy - -import scala.util.control.NonFatal -import akka.actor.Props -import akka.actor.Actor /** * INTERNAL API @@ -105,7 +101,7 @@ private[akka] trait InboundContext { */ def association(uid: Long): OptionVal[OutboundContext] - def completeHandshake(peer: UniqueAddress): Unit + def completeHandshake(peer: UniqueAddress): Future[Done] } @@ -117,8 +113,7 @@ private[akka] object AssociationState { new AssociationState( incarnation = 1, uniqueRemoteAddressPromise = Promise(), - quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], - outboundCompressions = NoOutboundCompressions) + quarantined = ImmutableLongMap.empty[QuarantinedTimestamp]) final case class QuarantinedTimestamp(nanoTime: Long) { override def toString: String = @@ -130,10 +125,9 @@ private[akka] object AssociationState { * INTERNAL API */ private[akka] final class AssociationState( - val incarnation: Int, + val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp], - val outboundCompressions: OutboundCompressions) { + val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { import AssociationState.QuarantinedTimestamp @@ -159,11 +153,8 @@ private[akka] final class AssociationState( } } - def withCompression(compression: OutboundCompressions) = - new AssociationState(incarnation, uniqueRemoteAddressPromise, quarantined, compression) - - def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompressions): AssociationState = - new AssociationState(incarnation + 1, remoteAddressPromise, quarantined, compression) + def newIncarnation(remoteAddressPromise: Promise[UniqueAddress]): AssociationState = + new AssociationState(incarnation + 1, remoteAddressPromise, quarantined) def newQuarantined(): AssociationState = uniqueRemoteAddressPromise.future.value match { @@ -171,8 +162,7 @@ private[akka] final class AssociationState( new AssociationState( incarnation, uniqueRemoteAddressPromise, - quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), - outboundCompressions = NoOutboundCompressions) // after quarantine no compression needed anymore, drop it + quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime()))) case _ ⇒ this } @@ -235,7 +225,7 @@ private[akka] trait OutboundContext { */ private[remote] object FlushOnShutdown { def props(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]): Props = { + inboundContext: InboundContext, associations: Set[Association]): Props = { require(associations.nonEmpty) Props(new FlushOnShutdown(done, timeout, inboundContext, associations)) } @@ -247,7 +237,7 @@ private[remote] object FlushOnShutdown { * INTERNAL API */ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]) extends Actor { + inboundContext: InboundContext, associations: Set[Association]) extends Actor { var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue) @@ -330,7 +320,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val priorityMessageDestinations = WildcardIndex[NotUsed]() - // this comes from remoting so is semi-ok to be hardcoded here + // These destinations are not defined in configuration because it should not + // be possible to abuse the control channel .insert(Array("system", "remote-watcher"), NotUsed) // these belongs to cluster and should come from there .insert(Array("system", "cluster", "core", "daemon", "heartbeatSender"), NotUsed) @@ -579,17 +570,27 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R case ActorRefCompressionAdvertisement(from, table) ⇒ log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table) val a = association(from.address) - a.outboundCompression.applyActorRefCompressionTable(table) - a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version)) - system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table)) + // make sure uid is same for active association + if (a.associationState.uniqueRemoteAddressValue().contains(from)) { + import system.dispatcher + a.changeActorRefCompression(table).foreach { _ ⇒ + a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version)) + system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table)) + } + } case ActorRefCompressionAdvertisementAck(from, tableVersion) ⇒ inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) case ClassManifestCompressionAdvertisement(from, table) ⇒ log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table) val a = association(from.address) - a.outboundCompression.applyClassManifestCompressionTable(table) - a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version)) - system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table)) + // make sure uid is same for active association + if (a.associationState.uniqueRemoteAddressValue().contains(from)) { + import system.dispatcher + a.changeClassManifestCompression(table).foreach { _ ⇒ + a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version)) + system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table)) + } + } case ClassManifestCompressionAdvertisementAck(from, tableVersion) ⇒ inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) } @@ -719,8 +720,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R if (testStages.isEmpty) Future.successful(false) else { - import scala.collection.JavaConverters._ import system.dispatcher + import scala.collection.JavaConverters._ val allTestStages = testStages.asScala.toVector ++ associationRegistry.allAssociations.flatMap(_.testStages) Future.sequence(allTestStages.map(_.send(cmd))).map(_ ⇒ true) } @@ -752,7 +753,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def association(uid: Long): OptionVal[Association] = associationRegistry.association(uid) - override def completeHandshake(peer: UniqueAddress): Unit = { + override def completeHandshake(peer: UniqueAddress): Future[Done] = { val a = associationRegistry.setUID(peer) a.completeHandshake(peer) } @@ -765,19 +766,22 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) } - def outbound(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = - createOutboundSink(ordinaryStreamId, outboundContext, compression, envelopePool) + def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = + createOutboundSink(ordinaryStreamId, outboundContext, envelopePool) - def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = - createOutboundSink(largeStreamId, outboundContext, compression, largeEnvelopePool) + def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] = + createOutboundSink(largeStreamId, outboundContext, largeEnvelopePool) + .mapMaterializedValue { case (_, d) ⇒ d } + + private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, + bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { - private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, Future[Done]] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) - .via(createEncoder(bufferPool, compression)) + .viaMat(createEncoder(bufferPool))(Keep.right) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, - envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) + envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.both) } /** @@ -794,25 +798,20 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } - def outboundControlPart2(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { + def outboundControlPart2(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { Flow[OutboundEnvelope] .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) - .via(encoder(compression)) + .via(createEncoder(envelopePool)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) } - def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool)) - private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) else NoInboundCompressions - def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool)) - - def encoder(compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool, compression) + def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = + Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool)) def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 6c3a29e908..82dc5bf01f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -4,41 +4,43 @@ package akka.remote.artery import java.util.Queue -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable, OutboundCompressions, OutboundCompressionsImpl } - import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration -import scala.util.Success + import akka.{ Done, NotUsed } import akka.actor.ActorRef import akka.actor.ActorSelectionMessage import akka.actor.Address -import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging import akka.remote._ +import akka.remote.DaemonMsgCreate +import akka.remote.QuarantinedEvent import akka.remote.artery.AeronSink.GaveUpSendingException +import akka.remote.artery.Encoder.ChangeOutboundCompression +import akka.remote.artery.Encoder.ChangeOutboundCompressionFailed import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException import akka.remote.artery.SendQueue.ProducerApi import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery +import akka.remote.artery.compress.CompressionProtocol._ +import akka.remote.artery.compress.CompressionTable import akka.stream.AbruptTerminationException import akka.stream.Materializer import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.util.{ Unsafe, WildcardIndex } -import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.remote.artery.compress.CompressionProtocol._ /** @@ -82,9 +84,6 @@ private[remote] class Association( // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to // start sending (enqueuing) to the Association immediate after construction. - /** Accesses the currently active outbound compression. */ - def outboundCompression: OutboundCompressions = associationState.outboundCompressions - def createQueue(capacity: Int): Queue[OutboundEnvelope] = new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) @@ -93,6 +92,25 @@ private[remote] class Association( @volatile private[this] var controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(controlQueueSize)) @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ @volatile private[this] var materializing = new CountDownLatch(1) + @volatile private[this] var changeOutboundCompression: Option[ChangeOutboundCompression] = None + + def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = + changeOutboundCompression match { + case Some(c) ⇒ c.changeActorRefCompression(table) + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + } + + def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = + changeOutboundCompression match { + case Some(c) ⇒ c.changeClassManifestCompression(table) + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + } + + def clearCompression(): Future[Done] = + changeOutboundCompression match { + case Some(c) ⇒ c.clearCompression() + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + } private val _testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList @@ -138,31 +156,41 @@ private[remote] class Association( def associationState: AssociationState = Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState] - def completeHandshake(peer: UniqueAddress): Unit = { + def completeHandshake(peer: UniqueAddress): Future[Done] = { require( remoteAddress == peer.address, s"wrong remote address in completeHandshake, got ${peer.address}, expected $remoteAddress") val current = associationState - current.uniqueRemoteAddressPromise.trySuccess(peer) + current.uniqueRemoteAddressValue() match { case Some(`peer`) ⇒ - // our value - if (current.outboundCompressions == NoOutboundCompressions) { - // enable outbound compression (here, since earlier we don't know the remote address) - swapState(current, current.withCompression(createOutboundCompressions(remoteAddress))) - } + // handshake already completed + Future.successful(Done) case _ ⇒ - val newState = current.newIncarnation(Promise.successful(peer), createOutboundCompressions(remoteAddress)) - if (swapState(current, newState)) { + // clear outbound compression, it's safe to do that several times if someone else + // completes handshake at same time, but it's important to clear it before + // we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess) + import transport.system.dispatcher + clearCompression().map { _ ⇒ + current.uniqueRemoteAddressPromise.trySuccess(peer) current.uniqueRemoteAddressValue() match { - case Some(old) ⇒ - log.debug( - "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", - newState.incarnation, peer.address, peer.uid, old.uid) - case None ⇒ - // Failed, nothing to do + case Some(`peer`) ⇒ + // our value + case _ ⇒ + val newState = current.newIncarnation(Promise.successful(peer)) + if (swapState(current, newState)) { + current.uniqueRemoteAddressValue() match { + case Some(old) ⇒ + log.debug( + "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", + newState.incarnation, peer.address, peer.uid, old.uid) + case None ⇒ + // Failed, nothing to do + } + // if swap failed someone else completed before us, and that is fine + } } - // if swap failed someone else completed before us, and that is fine + Done } } } @@ -253,6 +281,8 @@ private[remote] class Association( log.warning( "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", remoteAddress, u, reason) + // clear outbound compression + clearCompression() // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644 transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt)) // end delivery of system messages to that incarnation after this point @@ -291,20 +321,16 @@ private[remote] class Association( } private def runOutboundStreams(): Unit = { - // TODO no compression for control / large streams currently - val disableCompression = NoOutboundCompressions - // it's important to materialize the outboundControl stream first, // so that outboundControlIngress is ready when stages for all streams start - runOutboundControlStream(disableCompression) - runOutboundOrdinaryMessagesStream(CurrentAssociationStateOutboundCompressionsProxy) + runOutboundControlStream() + runOutboundOrdinaryMessagesStream() - if (transport.largeMessageChannelEnabled) { - runOutboundLargeMessagesStream(disableCompression) - } + if (transport.largeMessageChannelEnabled) + runOutboundLargeMessagesStream() } - private def runOutboundControlStream(compression: OutboundCompressions): Unit = { + private def runOutboundControlStream(): Unit = { // stage in the control stream may access the outboundControlIngress before returned here // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress materializing = new CountDownLatch(1) @@ -318,14 +344,14 @@ private[remote] class Association( Source.fromGraph(new SendQueue[OutboundEnvelope]) .via(transport.outboundControlPart1(this)) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundControlPart2(this, compression))(Keep.both) + .toMat(transport.outboundControlPart2(this))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, (control, completed)) } else { Source.fromGraph(new SendQueue[OutboundEnvelope]) .via(transport.outboundControlPart1(this)) - .toMat(transport.outboundControlPart2(this, compression))(Keep.both) + .toMat(transport.outboundControlPart2(this))(Keep.both) .run()(materializer) } @@ -335,7 +361,7 @@ private[remote] class Association( _outboundControlIngress = control materializing.countDown() attachStreamRestart("Outbound control stream", completed, cause ⇒ { - runOutboundControlStream(compression) + runOutboundControlStream() cause match { case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID case _ ⇒ quarantine("Outbound control stream restarted") @@ -351,32 +377,33 @@ private[remote] class Association( QueueWrapper(createQueue(capacity)) } - private def runOutboundOrdinaryMessagesStream(compression: OutboundCompressions): Unit = { + private def runOutboundOrdinaryMessagesStream(): Unit = { val wrapper = getOrCreateQueueWrapper(queue, queueSize) queue = wrapper // use new underlying queue immediately for restarts - val (queueValue, completed) = + val (queueValue, (changeCompression, completed)) = if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outbound(this, compression))(Keep.both) + .toMat(transport.outbound(this))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { Source.fromGraph(new SendQueue[OutboundEnvelope]) - .toMat(transport.outbound(this, compression))(Keep.both) + .toMat(transport.outbound(this))(Keep.both) .run()(materializer) } queueValue.inject(wrapper.queue) // replace with the materialized value, still same underlying queue queue = queueValue + changeOutboundCompression = Some(changeCompression) - attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream(compression)) + attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) } - private def runOutboundLargeMessagesStream(compression: OutboundCompressions): Unit = { + private def runOutboundLargeMessagesStream(): Unit = { val wrapper = getOrCreateQueueWrapper(largeQueue, largeQueueSize) largeQueue = wrapper // use new underlying queue immediately for restarts @@ -384,20 +411,20 @@ private[remote] class Association( if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundLarge(this, compression))(Keep.both) + .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { Source.fromGraph(new SendQueue[OutboundEnvelope]) - .toMat(transport.outboundLarge(this, compression))(Keep.both) + .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) } queueValue.inject(wrapper.queue) // replace with the materialized value, still same underlying queue largeQueue = queueValue - attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream(compression)) + attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream()) } private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable ⇒ Unit): Unit = { @@ -421,41 +448,6 @@ private[remote] class Association( } } - // TODO: Make sure that once other channels use Compression, each gets it's own - private def createOutboundCompressions(remoteAddress: Address): OutboundCompressions = { - if (transport.provider.remoteSettings.ArteryCompressionSettings.enabled) { - val compression = new OutboundCompressionsImpl(transport.system, remoteAddress) - log.debug("Creating Outbound compression table to [{}]", remoteAddress) - compression - } else NoOutboundCompressions - } - - /** - * This proxy uses the current associationStates compression table, which is reset for a new incarnation. - * This way the same outgoing stream will switch to using the new table without the need of restarting it. - */ - private object CurrentAssociationStateOutboundCompressionsProxy extends OutboundCompressions { - - override def actorRefCompressionTableVersion: Int = - associationState.outboundCompressions.actorRefCompressionTableVersion - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = { - associationState.outboundCompressions.applyActorRefCompressionTable(table) - } - override final def compressActorRef(ref: ActorRef): Int = { - associationState.outboundCompressions.compressActorRef(ref) - } - - override def classManifestCompressionTableVersion: Int = - associationState.outboundCompressions.classManifestCompressionTableVersion - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = - associationState.outboundCompressions.applyClassManifestCompressionTable(table) - override final def compressClassManifest(manifest: String): Int = - associationState.outboundCompressions.compressClassManifest(manifest) - - override def toString = - s"${Logging.simpleName(getClass)}(current delegate: ${associationState.outboundCompressions})" - } - override def toString: String = s"Association($localAddress -> $remoteAddress with $associationState)" @@ -489,17 +481,23 @@ private[remote] class AssociationRegistry(createAssociation: Address ⇒ Associa @tailrec final def setUID(peer: UniqueAddress): Association = { val currentMap = associationsByUid.get val a = association(peer.address) - // make sure we don't overwrite same UID with different association + currentMap.get(peer.uid) match { - case OptionVal.Some(previous) if (previous ne a) ⇒ - throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]") - case _ ⇒ // ok + case OptionVal.Some(previous) ⇒ + if (previous eq a) + // associationsByUid Map already contains the right association + a + else + // make sure we don't overwrite same UID with different association + throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]") + case _ ⇒ + // update associationsByUid Map with the uid -> assocation + val newMap = currentMap.updated(peer.uid, a) + if (associationsByUid.compareAndSet(currentMap, newMap)) + a + else + setUID(peer) // lost CAS, retry } - val newMap = currentMap.updated(peer.uid, a) - if (associationsByUid.compareAndSet(currentMap, newMap)) - a - else - setUID(peer) // lost CAS, retry } def allAssociations: Set[Association] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 0b4bfbf558..dc07002d35 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -9,12 +9,13 @@ import java.nio.{ ByteBuffer, ByteOrder } import akka.actor.{ ActorRef, Address } import akka.remote.artery.compress.CompressionProtocol._ -import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions } +import akka.remote.artery.compress.{ CompressionTable, InboundCompressions } import akka.serialization.Serialization import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } import akka.util.{ OptionVal, Unsafe } import scala.util.control.NonFatal +import akka.remote.artery.compress.NoInboundCompressions /** * INTERNAL API @@ -77,25 +78,34 @@ private[remote] object HeaderBuilder { // We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl // we inject no-op compression's of the "other side". - def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoOutboundCompressions) - def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoInboundCompressions, compression) + def in(compression: InboundCompressions): HeaderBuilder = + new HeaderBuilderImpl(compression, CompressionTable.empty[ActorRef], CompressionTable.empty[String]) + def out(): HeaderBuilder = + new HeaderBuilderImpl(NoInboundCompressions, CompressionTable.empty[ActorRef], CompressionTable.empty[String]) /** INTERNAL API, FOR TESTING ONLY */ - private[remote] def bothWays(in: InboundCompressions, out: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(in, out) + private[remote] def bothWays( + in: InboundCompressions, + outboundActorRefCompression: CompressionTable[ActorRef], + outboundClassManifestCompression: CompressionTable[String]): HeaderBuilder = + new HeaderBuilderImpl(in, outboundActorRefCompression, outboundClassManifestCompression) } /** * INTERNAL API */ -sealed trait HeaderBuilder { +private[remote] sealed trait HeaderBuilder { def setVersion(v: Int): Unit def version: Int - def setActorRefCompressionTableVersion(v: Int): Unit - def actorRefCompressionTableVersion: Int + def inboundActorRefCompressionTableVersion: Int + def inboundClassManifestCompressionTableVersion: Int - def setClassManifestCompressionTableVersion(v: Int): Unit - def classManifestCompressionTableVersion: Int + def outboundActorRefCompression: CompressionTable[ActorRef] + def setOutboundActorRefCompression(table: CompressionTable[ActorRef]): Unit + + def outboundClassManifestCompression: CompressionTable[String] + def setOutboundClassManifestCompression(table: CompressionTable[String]): Unit def setUid(u: Long): Unit def uid: Long @@ -140,12 +150,15 @@ sealed trait HeaderBuilder { /** * INTERNAL API */ -private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompressions, outboundCompression: OutboundCompressions) extends HeaderBuilder { +private[remote] final class HeaderBuilderImpl( + inboundCompression: InboundCompressions, + var _outboundActorRefCompression: CompressionTable[ActorRef], + var _outboundClassManifestCompression: CompressionTable[String]) extends HeaderBuilder { // Fields only available for EnvelopeBuffer var _version: Int = _ var _uid: Long = _ - var _actorRefCompressionTableVersion: Int = 0 - var _classManifestCompressionTableVersion: Int = 0 + var _inboundActorRefCompressionTableVersion: Int = 0 + var _inboundClassManifestCompressionTableVersion: Int = 0 var _senderActorRef: String = null var _senderActorRefIdx: Int = -1 @@ -162,14 +175,19 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres override def setUid(uid: Long) = _uid = uid override def uid: Long = _uid - override def setActorRefCompressionTableVersion(v: Int): Unit = _actorRefCompressionTableVersion = v - override def actorRefCompressionTableVersion: Int = _actorRefCompressionTableVersion + override def inboundActorRefCompressionTableVersion: Int = _inboundActorRefCompressionTableVersion + override def inboundClassManifestCompressionTableVersion: Int = _inboundClassManifestCompressionTableVersion - override def setClassManifestCompressionTableVersion(v: Int): Unit = _classManifestCompressionTableVersion = v - override def classManifestCompressionTableVersion: Int = _classManifestCompressionTableVersion + def setOutboundActorRefCompression(table: CompressionTable[ActorRef]): Unit = + _outboundActorRefCompression = table + override def outboundActorRefCompression: CompressionTable[ActorRef] = _outboundActorRefCompression + + def setOutboundClassManifestCompression(table: CompressionTable[String]): Unit = + _outboundClassManifestCompression = table + def outboundClassManifestCompression: CompressionTable[String] = _outboundClassManifestCompression override def setSenderActorRef(ref: ActorRef): Unit = { - _senderActorRefIdx = outboundCompression.compressActorRef(ref) + _senderActorRefIdx = outboundActorRefCompression.compress(ref) if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref) // includes local address from `currentTransportInformation` } override def setNoSender(): Unit = { @@ -179,7 +197,8 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres override def isNoSender: Boolean = (_senderActorRef eq null) && _senderActorRefIdx == EnvelopeBuffer.DeadLettersCode override def senderActorRef(originUid: Long): OptionVal[ActorRef] = - if (_senderActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _senderActorRefIdx) + if (_senderActorRef eq null) + inboundCompression.decompressActorRef(originUid, inboundActorRefCompressionTableVersion, _senderActorRefIdx) else OptionVal.None def senderActorRefPath: OptionVal[String] = OptionVal(_senderActorRef) @@ -192,13 +211,14 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres (_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode def setRecipientActorRef(ref: ActorRef): Unit = { - _recipientActorRefIdx = outboundCompression.compressActorRef(ref) + _recipientActorRefIdx = outboundActorRefCompression.compress(ref) if (_recipientActorRefIdx == -1) { _recipientActorRef = ref.path.toSerializationFormat } } def recipientActorRef(originUid: Long): OptionVal[ActorRef] = - if (_recipientActorRef eq null) inboundCompression.decompressActorRef(originUid, actorRefCompressionTableVersion, _recipientActorRefIdx) + if (_recipientActorRef eq null) + inboundCompression.decompressActorRef(originUid, inboundActorRefCompressionTableVersion, _recipientActorRefIdx) else OptionVal.None def recipientActorRefPath: OptionVal[String] = OptionVal(_recipientActorRef) @@ -210,13 +230,15 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres _serializer override def setManifest(manifest: String): Unit = { - _manifestIdx = outboundCompression.compressClassManifest(manifest) + _manifestIdx = outboundClassManifestCompression.compress(manifest) if (_manifestIdx == -1) _manifest = manifest } override def manifest(originUid: Long): String = { if (_manifest ne null) _manifest else { - _manifest = inboundCompression.decompressClassManifest(originUid, classManifestCompressionTableVersion, _manifestIdx).get + _manifest = inboundCompression.decompressClassManifest( + originUid, + inboundClassManifestCompressionTableVersion, _manifestIdx).get _manifest } } @@ -224,8 +246,6 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres override def toString = "HeaderBuilderImpl(" + "version:" + version + ", " + - "actorRefCompressionTableVersion:" + actorRefCompressionTableVersion + ", " + - "classManifestCompressionTableVersion:" + classManifestCompressionTableVersion + ", " + "uid:" + uid + ", " + "_senderActorRef:" + _senderActorRef + ", " + "_senderActorRefIdx:" + _senderActorRefIdx + ", " + @@ -257,8 +277,8 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { byteBuffer.putInt(header.serializer) // compression table version numbers - byteBuffer.putInt(ActorRefCompressionTableVersionTagOffset, header._actorRefCompressionTableVersion | TagTypeMask) - byteBuffer.putInt(ClassManifestCompressionTableVersionTagOffset, header._classManifestCompressionTableVersion | TagTypeMask) + byteBuffer.putInt(ActorRefCompressionTableVersionTagOffset, header.outboundActorRefCompression.version | TagTypeMask) + byteBuffer.putInt(ClassManifestCompressionTableVersionTagOffset, header.outboundClassManifestCompression.version | TagTypeMask) // Write compressable, variable-length parts always to the actual position of the buffer // Write tag values explicitly in their proper offset @@ -294,11 +314,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // compression table versions (stored in the Tag) val refCompressionVersionTag = byteBuffer.getInt(ActorRefCompressionTableVersionTagOffset) if ((refCompressionVersionTag & TagTypeMask) != 0) { - header setActorRefCompressionTableVersion refCompressionVersionTag & TagValueMask + header._inboundActorRefCompressionTableVersion = refCompressionVersionTag & TagValueMask } val manifestCompressionVersionTag = byteBuffer.getInt(ClassManifestCompressionTableVersionTagOffset) if ((manifestCompressionVersionTag & TagTypeMask) != 0) { - header setClassManifestCompressionTableVersion manifestCompressionVersionTag & TagValueMask + header._inboundClassManifestCompressionTableVersion = manifestCompressionVersionTag & TagValueMask } // Read compressable, variable-length parts always from the actual position of the buffer diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 6c9a1149dc..5c2243d97b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -13,9 +13,28 @@ import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.util.{ ByteString, OptionVal, PrettyByteString } import akka.actor.EmptyLocalActorRef -import akka.remote.artery.compress.{ InboundCompressions, OutboundCompressions, OutboundCompressionsImpl } +import akka.remote.artery.compress.InboundCompressions import akka.stream.stage.TimerGraphStageLogic import java.util.concurrent.TimeUnit +import scala.concurrent.Future +import akka.remote.artery.compress.CompressionTable +import akka.Done +import akka.stream.stage.GraphStageWithMaterializedValue +import scala.concurrent.Promise + +/** + * INTERNAL API + */ +private[remote] object Encoder { + private[remote] trait ChangeOutboundCompression { + def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] + def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] + def clearCompression(): Future[Done] + } + + private[remote] class ChangeOutboundCompressionFailed extends RuntimeException( + "Change of outbound compression table failed (will be retried), because materialization did not complete yet") +} /** * INTERNAL API @@ -23,42 +42,49 @@ import java.util.concurrent.TimeUnit private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, system: ActorSystem, - compression: OutboundCompressions, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], bufferPool: EnvelopeBufferPool) - extends GraphStage[FlowShape[OutboundEnvelope, EnvelopeBuffer]] { + extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.ChangeOutboundCompression] { + import Encoder._ val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in") val out: Outlet[EnvelopeBuffer] = Outlet("Artery.Encoder.out") val shape: FlowShape[OutboundEnvelope, EnvelopeBuffer] = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ChangeOutboundCompression) = { + val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with ChangeOutboundCompression { - private val headerBuilder = HeaderBuilder.out(compression) + private val headerBuilder = HeaderBuilder.out() headerBuilder setVersion ArteryTransport.Version headerBuilder setUid uniqueLocalAddress.uid private val localAddress = uniqueLocalAddress.address private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) + private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] { + case (table, done) ⇒ + headerBuilder.setOutboundActorRefCompression(table) + done.success(Done) + } + + private val changeClassManifsetCompressionCb = getAsyncCallback[(CompressionTable[String], Promise[Done])] { + case (table, done) ⇒ + headerBuilder.setOutboundClassManifestCompression(table) + done.success(Done) + } + + private val clearCompressionCb = getAsyncCallback[Promise[Done]] { done ⇒ + headerBuilder.setOutboundActorRefCompression(CompressionTable.empty[ActorRef]) + headerBuilder.setOutboundClassManifestCompression(CompressionTable.empty[String]) + done.success(Done) + } + override protected def logSource = classOf[Encoder] override def onPush(): Unit = { val outboundEnvelope = grab(in) val envelope = bufferPool.acquire() - // FIXME: OMG race between setting the version, and using the table!!!! - // incoming messages are concurrent to outgoing ones - // incoming message may be table advertisement - // which swaps the table in Outgoing*Compression for the new one (n+1) - // by itself it does so atomically, - // race: however here we store the compression table version separately from actually using it (storing the refs / manifests etc). - // so there is a slight race IF the table is swapped right between us setting the version n here [then the table being swapped to n+1] and then we use the n+1 version to compressions the compressions (which the receiving end will fail to read, since the encoding could be completely different, and it picks the table based on the version Int). - // A solution would be to getTable => use it to set and compress things - headerBuilder setActorRefCompressionTableVersion compression.actorRefCompressionTableVersion - headerBuilder setClassManifestCompressionTableVersion compression.classManifestCompressionTableVersion - // internally compression is applied by the builder: outboundEnvelope.recipient match { case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r @@ -109,8 +135,49 @@ private[remote] class Encoder( override def onPull(): Unit = pull(in) + /** + * External call from ChangeOutboundCompression materialized value + */ + override def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = { + val done = Promise[Done]() + try changeActorRefCompressionCb.invoke((table, done)) catch { + // This is a harmless failure, it will be retried on next advertisement or handshake attempt. + // It will only occur when callback is invoked before preStart. That is highly unlikely to + // happen since advertisement is not done immediately and handshake involves network roundtrip. + case NonFatal(_) ⇒ done.tryFailure(new ChangeOutboundCompressionFailed) + } + done.future + } + + /** + * External call from ChangeOutboundCompression materialized value + */ + override def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = { + val done = Promise[Done]() + try changeClassManifsetCompressionCb.invoke((table, done)) catch { + // in case materialization not completed yet + case NonFatal(_) ⇒ done.tryFailure(new ChangeOutboundCompressionFailed) + } + done.future + } + + /** + * External call from ChangeOutboundCompression materialized value + */ + override def clearCompression(): Future[Done] = { + val done = Promise[Done]() + try clearCompressionCb.invoke(done) catch { + // in case materialization not completed yet + case NonFatal(_) ⇒ done.tryFailure(new ChangeOutboundCompressionFailed) + } + done.future + } + setHandlers(in, out, this) } + + (logic, logic) + } } /** @@ -198,20 +265,17 @@ private[remote] class Decoder( val remoteAddress = assoc.remoteAddress sender match { case OptionVal.Some(snd) ⇒ - compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, - remoteAddress, snd, 1) + compression.hitActorRef(originUid, remoteAddress, snd, 1) case OptionVal.None ⇒ } recipient match { case OptionVal.Some(rcp) ⇒ - compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, - remoteAddress, rcp, 1) + compression.hitActorRef(originUid, remoteAddress, rcp, 1) case OptionVal.None ⇒ } - compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, - remoteAddress, classManifest, 1) + compression.hitClassManifest(originUid, remoteAddress, classManifest, 1) case _ ⇒ // we don't want to record hits for compression while handshake is still in progress. diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index a7c8420377..5f4a50a3b4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -18,6 +18,8 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import akka.util.OptionVal +import akka.Done +import scala.concurrent.Future /** * INTERNAL API @@ -177,8 +179,9 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt env.message match { case HandshakeReq(from) ⇒ onHandshakeReq(from) case HandshakeRsp(from) ⇒ - inboundContext.completeHandshake(from) - pull(in) + after(inboundContext.completeHandshake(from)) { + pull(in) + } case _ ⇒ onMessage(env) } @@ -197,21 +200,32 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt }) private def onHandshakeReq(from: UniqueAddress): Unit = { - inboundContext.completeHandshake(from) - inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) - pull(in) + after(inboundContext.completeHandshake(from)) { + inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) + pull(in) + } + } + + private def after(first: Future[Done])(thenInside: ⇒ Unit): Unit = { + first.value match { + case Some(_) ⇒ + // This in the normal case (all but the first). The future will be completed + // because handshake was already completed. Note that we send those HandshakeReq + // periodically. + thenInside + case None ⇒ + implicit val ec = materializer.executionContext + first.onComplete { _ ⇒ + getAsyncCallback[Done](_ ⇒ thenInside).invoke(Done) + } + } + } private def onMessage(env: InboundEnvelope): Unit = { if (isKnownOrigin(env)) push(out, env) else { - // FIXME remove, only debug - log.warning( - s"Dropping message [{}] from unknown system with UID [{}]. " + - "This system with UID [{}] was probably restarted. " + - "Messages will be accepted when new handshake has been completed.", - env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid) if (log.isDebugEnabled) log.debug( s"Dropping message [{}] from unknown system with UID [{}]. " + @@ -222,8 +236,12 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt } } - private def isKnownOrigin(env: InboundEnvelope): Boolean = - env.association.isDefined + private def isKnownOrigin(env: InboundEnvelope): Boolean = { + // the association is passed in the envelope from the Decoder stage to avoid + // additional lookup. The second OR case is because if we didn't use fusing it + // would be possible that it was not found by Decoder (handshake not completed yet) + env.association.isDefined || inboundContext.association(env.originUid).isDefined + } // OutHandler override def onPull(): Unit = pull(in) diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala deleted file mode 100644 index fdbc067d61..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/ActualCompressions.scala +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import java.util.function.LongFunction - -import akka.actor.{ ActorRef, ActorSystem, Address } -import akka.remote.artery._ -import akka.util.OptionVal -import org.agrona.collections.Long2ObjectHashMap - -/** INTERNAL API */ -private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompressions { - - private val actorRefsOut = new OutboundActorRefCompression(system, remoteAddress) - private val classManifestsOut = new OutboundClassManifestCompression(system, remoteAddress) - - // actor ref compression --- - - override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) - override def actorRefCompressionTableVersion: Int = actorRefsOut.activeCompressionTableVersion - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = - actorRefsOut.flipTable(table) - - // class manifest compression --- - - override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) - override def classManifestCompressionTableVersion: Int = classManifestsOut.activeCompressionTableVersion - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = - classManifestsOut.flipTable(table) -} - -/** - * INTERNAL API - * - * One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand. - */ -private[remote] final class InboundCompressionsImpl( - system: ActorSystem, - inboundContext: InboundContext) extends InboundCompressions { - - private val settings = CompressionSettings(system) - - // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers - private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() - private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { - override def apply(originUid: Long): InboundActorRefCompression = { - val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) - new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters) - } - } - private def actorRefsIn(originUid: Long): InboundActorRefCompression = - _actorRefsIns.computeIfAbsent(originUid, createInboundActorRefsForOrigin) - - private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]() - private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { - override def apply(originUid: Long): InboundManifestCompression = { - val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) - new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters) - } - } - private def classManifestsIn(originUid: Long): InboundManifestCompression = - _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin) - - // actor ref compression --- - - override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = - actorRefsIn(originUid).decompress(tableVersion, idx) - override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef, n: Int): Unit = - actorRefsIn(originUid).increment(address, ref, n) - override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = - actorRefsIn(originUid).confirmAdvertisement(tableVersion) - - // class manifest compression --- - - override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = - classManifestsIn(originUid).decompress(tableVersion, idx) - override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String, n: Int): Unit = - classManifestsIn(originUid).increment(address, manifest, n) - override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = - actorRefsIn(originUid).confirmAdvertisement(tableVersion) - - // testing utilities --- - - /** INTERNAL API: for testing only */ - private[remote] def runNextActorRefAdvertisement() = { - import scala.collection.JavaConverters._ - _actorRefsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } - } - - /** INTERNAL API: for testing only */ - private[remote] def runNextClassManifestAdvertisement() = { - import scala.collection.JavaConverters._ - _classManifestsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } - } -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala deleted file mode 100644 index e90db8bd31..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/AllCompressions.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import akka.actor.{ ActorRef, Address } -import akka.util.OptionVal - -/** - * INTERNAL API - * Decompress and cause compression advertisements. - * - * One per inbound message stream thus must demux by originUid to use the right tables. - */ -private[remote] trait InboundCompressions { - def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit - def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] - def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit - - def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit - def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] - def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit -} -/** - * INTERNAL API - * Compress outgoing data and handle compression advertisements to fill compression table. - * - * One per outgoing message stream. - */ -private[remote] trait OutboundCompressions { - def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit - def actorRefCompressionTableVersion: Int - def compressActorRef(ref: ActorRef): Int - - def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit - def classManifestCompressionTableVersion: Int - def compressClassManifest(manifest: String): Int -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala index dfc612868b..d07866e239 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala @@ -6,6 +6,13 @@ package akka.remote.artery.compress /** INTERNAL API: Versioned compression table to be advertised between systems */ private[akka] final case class CompressionTable[T](version: Int, map: Map[T, Int]) { + import CompressionTable.NotCompressedId + + def compress(value: T): Int = + map.get(value) match { + case Some(id) ⇒ id + case None ⇒ NotCompressedId + } def invert: DecompressionTable[T] = if (map.isEmpty) DecompressionTable.empty[T].copy(version = version) @@ -25,6 +32,8 @@ private[akka] final case class CompressionTable[T](version: Int, map: Map[T, Int } /** INTERNAL API */ private[remote] object CompressionTable { + final val NotCompressedId = -1 + private[this] val _empty = new CompressionTable[Any](0, Map.empty) def empty[T] = _empty.asInstanceOf[CompressionTable[T]] } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 7fd5b5bfd4..5be9354a32 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -4,12 +4,97 @@ package akka.remote.artery.compress +import java.util.concurrent.atomic.AtomicReference +import java.util.function.LongFunction + +import scala.concurrent.duration.{ Duration, FiniteDuration } + import akka.actor.{ ActorRef, ActorSystem, Address } import akka.event.{ Logging, NoLogging } import akka.remote.artery.{ InboundContext, OutboundContext } import akka.util.{ OptionVal, PrettyDuration } -import scala.concurrent.duration.{ Duration, FiniteDuration } -import java.util.concurrent.atomic.AtomicReference +import org.agrona.collections.Long2ObjectHashMap + +/** + * INTERNAL API + * Decompress and cause compression advertisements. + * + * One per inbound message stream thus must demux by originUid to use the right tables. + */ +private[remote] trait InboundCompressions { + def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit + def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] + def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit + + def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit + def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] + def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit +} + +/** + * INTERNAL API + * + * One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand. + */ +private[remote] final class InboundCompressionsImpl( + system: ActorSystem, + inboundContext: InboundContext) extends InboundCompressions { + + private val settings = CompressionSettings(system) + + // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers + private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() + private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { + override def apply(originUid: Long): InboundActorRefCompression = { + val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) + new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters) + } + } + private def actorRefsIn(originUid: Long): InboundActorRefCompression = + _actorRefsIns.computeIfAbsent(originUid, createInboundActorRefsForOrigin) + + private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]() + private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { + override def apply(originUid: Long): InboundManifestCompression = { + val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) + new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters) + } + } + private def classManifestsIn(originUid: Long): InboundManifestCompression = + _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin) + + // actor ref compression --- + + override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = + actorRefsIn(originUid).decompress(tableVersion, idx) + override def hitActorRef(originUid: Long, address: Address, ref: ActorRef, n: Int): Unit = + actorRefsIn(originUid).increment(address, ref, n) + override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = + actorRefsIn(originUid).confirmAdvertisement(tableVersion) + + // class manifest compression --- + + override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = + classManifestsIn(originUid).decompress(tableVersion, idx) + override def hitClassManifest(originUid: Long, address: Address, manifest: String, n: Int): Unit = + classManifestsIn(originUid).increment(address, manifest, n) + override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = + actorRefsIn(originUid).confirmAdvertisement(tableVersion) + + // testing utilities --- + + /** INTERNAL API: for testing only */ + private[remote] def runNextActorRefAdvertisement() = { + import scala.collection.JavaConverters._ + _actorRefsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } + } + + /** INTERNAL API: for testing only */ + private[remote] def runNextClassManifestAdvertisement() = { + import scala.collection.JavaConverters._ + _classManifestsIns.values().asScala.foreach { inbound ⇒ inbound.runNextTableAdvertisement() } + } +} /** * INTERNAL API @@ -105,7 +190,7 @@ private[remote] abstract class InboundCompression[T >: Null]( lazy val log = Logging(system, getClass.getSimpleName) - // TODO NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet, + // FIXME NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet, // yet we technically could already prepare table 3, then it starts using table 1 suddenly. Edge cases like that. // SOLUTION 1: We don't start building new tables until we've seen the previous one be used (move from new to active) // This is nice as it practically disables all the "build the table" work when the other side is not interested in using it. diff --git a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala similarity index 51% rename from akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala rename to akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala index 63886945d5..7219cb0cfa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/NoLiteralCompression.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala @@ -1,10 +1,9 @@ /** * Copyright (C) 2016 Lightbend Inc. */ -package akka.remote.artery +package akka.remote.artery.compress import akka.actor.{ ActorRef, Address } -import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions } import akka.util.OptionVal /** @@ -13,30 +12,16 @@ import akka.util.OptionVal * Literarily, no compression! */ case object NoInboundCompressions extends InboundCompressions { - override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = () + override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else OptionVal.None override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () - override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = () + override def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit = () override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") else OptionVal.None override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () } -/** - * INTERNAL API - * - * Literarily, no compression! - */ -case object NoOutboundCompressions extends OutboundCompressions { - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = () - override def actorRefCompressionTableVersion: Int = 0 - override def compressActorRef(ref: ActorRef): Int = -1 - - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = () - override def classManifestCompressionTableVersion: Int = 0 - override def compressClassManifest(manifest: String): Int = -1 -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala deleted file mode 100644 index 59756afe32..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/OutboundActorRefCompression.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.remote.artery.compress - -import java.util.concurrent.atomic.AtomicReference -import java.{ util ⇒ ju } - -import akka.actor.{ ActorRef, ActorSystem, Address } -import akka.event.{ Logging, LoggingAdapter } -import akka.remote.artery.compress.OutboundCompression.OutboundCompressionState - -import scala.annotation.tailrec - -/** INTERNAL API */ -private[remote] final class OutboundActorRefCompression(system: ActorSystem, remoteAddress: Address) - extends OutboundCompressionTable[ActorRef](system, remoteAddress) { - - flipTable(CompressionTable( - version = 0, - map = Map( - system.deadLetters → 0))) -} - -/** INTERNAL API */ -private[remote] final class OutboundClassManifestCompression(system: ActorSystem, remoteAddress: Address) - extends OutboundCompressionTable[String](system, remoteAddress) { - - flipTable(CompressionTable(version = 0, Map.empty)) -} - -/** - * INTERNAL API - * Base class for all outgoing compression. - * Encapsulates the compressedId registration and lookup. - */ -private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAddress: Address) - extends AtomicReference[OutboundCompressionState[T]](OutboundCompressionState.initial) { // TODO could be instead via Unsafe - import OutboundCompression._ - - // TODO: The compression map may benefit from padding if we want multiple compressions to be running in parallel - - protected val log: LoggingAdapter = Logging(system, Logging.simpleName(getClass)) - - // TODO this exposes us to a race between setting the Version and USING the table...? - def activeCompressionTableVersion = { - val version = get.version - version - } - - /** - * Flips the currently used compression table to the new one (iff the new one has a version number higher than the currently used one). - */ - // (╯°□°)╯︵ ┻━┻ - @tailrec final def flipTable(activate: CompressionTable[T]): Unit = { - val state = get() - if (activate.version > state.version) // TODO this should handle roll-over as we move to Byte - if (compareAndSet(state, prepareState(activate))) - log.debug(s"Successfully flipped compression table versions {}=>{}, for outgoing to [{}]", state.version, activate.version, remoteAddress) - else - flipTable(activate) // retry - else if (state.version == activate.version) - log.debug("Received duplicate compression table (version: {})! Ignoring it.", state.version) - else - log.error("Received unexpected compression table with version nr [{}]! " + - "Current version number is [{}].", activate.version, state.version) - - } - - // TODO this is crazy hot-path; optimised FastUtil-like Object->int hash map would perform better here (and avoid Integer) allocs - final def compress(value: T): Int = - get().table.getOrDefault(value, NotCompressedId) - - private final def prepareState(activate: CompressionTable[T]): OutboundCompressionState[T] = { - val size = activate.map.size - // load factor is `1` since we will never grow this table beyond the initial size, - // this way we can avoid any rehashing from happening. - val m = new ju.HashMap[T, Integer](size, 1.0f) // TODO could be replaced with primitive `int` specialized version - activate.map.foreach { - case (key, value) ⇒ m.put(key, value) // TODO boxing :< - } - OutboundCompressionState(activate.version, m) - } - - def toDebugString: String = { - s"""${Logging.simpleName(getClass)}( - | version: ${get.version} to [$remoteAddress] - | ${get.table} - |)""".stripMargin - } - - override def toString = { - val s = get - s"""${Logging.simpleName(getClass)}(to: $remoteAddress, version: ${s.version}, compressedEntries: ${s.table.size})""" - } - -} - -/** INTERNAL API */ -private[remote] object OutboundCompression { - // format: OFF - final val DeadLettersId = 0 - final val NotCompressedId = -1 - - // format: ON - - /** INTERNAL API */ - private[remote] final case class OutboundCompressionState[T](version: Int, table: ju.Map[T, Integer]) - private[remote] object OutboundCompressionState { - def initial[T] = OutboundCompressionState[T](-1, ju.Collections.emptyMap()) - } - -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index d9a3040a28..644b321491 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -7,14 +7,14 @@ package akka.remote.artery import java.nio.{ ByteBuffer, ByteOrder } import akka.actor._ -import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils, InboundCompressions, OutboundCompressions } +import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils, InboundCompressions } import akka.testkit.AkkaSpec import akka.util.{ ByteString, OptionVal } class EnvelopeBufferSpec extends AkkaSpec { import CompressionTestUtils._ - object TestCompressor extends InboundCompressions with OutboundCompressions { + object TestCompressor extends InboundCompressions { val refToIdx: Map[ActorRef, Int] = Map( minimalRef("compressable0") → 0, minimalRef("compressable1") → 1, @@ -31,24 +31,24 @@ class EnvelopeBufferSpec extends AkkaSpec { "manifest1" → 1) val idxToManifest = manifestToIdx.map(_.swap) - override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests - override def actorRefCompressionTableVersion: Int = 0 - override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1) - override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef, n: Int): Unit = () + val outboundActorRefTable: CompressionTable[ActorRef] = + CompressionTable(version = 0xCAFE, refToIdx) + + val outboundClassManifestTable: CompressionTable[String] = + CompressionTable(version = 0xBABE, manifestToIdx) + + override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = () override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx)) override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () - override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests - override def classManifestCompressionTableVersion: Int = 0 - override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1) - override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String, n: Int): Unit = () + override def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit = () override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx)) override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = () } "EnvelopeBuffer" must { - val headerIn = HeaderBuilder.bothWays(TestCompressor, TestCompressor) - val headerOut = HeaderBuilder.bothWays(TestCompressor, TestCompressor) + val headerIn = HeaderBuilder.bothWays(TestCompressor, TestCompressor.outboundActorRefTable, TestCompressor.outboundClassManifestTable) + val headerOut = HeaderBuilder.bothWays(TestCompressor, TestCompressor.outboundActorRefTable, TestCompressor.outboundClassManifestTable) val byteBuffer = ByteBuffer.allocate(1024).order(ByteOrder.LITTLE_ENDIAN) val envelope = new EnvelopeBuffer(byteBuffer) @@ -59,8 +59,6 @@ class EnvelopeBufferSpec extends AkkaSpec { headerIn setVersion 1 headerIn setUid 42 headerIn setSerializer 4 - headerIn setActorRefCompressionTableVersion 0xCAFE - headerIn setClassManifestCompressionTableVersion 0xBABE headerIn setRecipientActorRef minimalRef("compressable1") headerIn setSenderActorRef minimalRef("compressable0") @@ -74,8 +72,8 @@ class EnvelopeBufferSpec extends AkkaSpec { headerOut.version should ===(1) headerOut.uid should ===(42) - headerOut.actorRefCompressionTableVersion should ===(0xCAFE) - headerOut.classManifestCompressionTableVersion should ===(0xBABE) + headerOut.inboundActorRefCompressionTableVersion should ===(0xCAFE) + headerOut.inboundClassManifestCompressionTableVersion should ===(0xBABE) headerOut.serializer should ===(4) headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/compressable0") headerOut.senderActorRefPath should ===(OptionVal.None) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 41c12280f8..04233b14ef 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -18,6 +18,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.util.OptionVal import akka.actor.InternalActorRef +import akka.dispatch.ExecutionContexts private[akka] class TestInboundContext( override val localAddress: UniqueAddress, @@ -47,10 +48,13 @@ private[akka] class TestInboundContext( override def association(uid: Long): OptionVal[OutboundContext] = OptionVal(associationsByUid.get(uid)) - override def completeHandshake(peer: UniqueAddress): Unit = { + override def completeHandshake(peer: UniqueAddress): Future[Done] = { val a = association(peer.address).asInstanceOf[TestOutboundContext] - a.completeHandshake(peer) - associationsByUid.put(peer.uid, a) + val done = a.completeHandshake(peer) + done.foreach { _ ⇒ + associationsByUid.put(peer.uid, a) + }(ExecutionContexts.sameThreadExecutionContext) + done } protected def createAssociation(remoteAddress: Address): TestOutboundContext = @@ -70,13 +74,14 @@ private[akka] class TestOutboundContext( _associationState } - def completeHandshake(peer: UniqueAddress): Unit = synchronized { + def completeHandshake(peer: UniqueAddress): Future[Done] = synchronized { _associationState.uniqueRemoteAddressPromise.trySuccess(peer) _associationState.uniqueRemoteAddress.value match { case Some(Success(`peer`)) ⇒ // our value case _ ⇒ - _associationState = _associationState.newIncarnation(Promise.successful(peer), NoOutboundCompressions) + _associationState = _associationState.newIncarnation(Promise.successful(peer)) } + Future.successful(Done) } override def quarantine(reason: String): Unit = synchronized { diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala index 723a4c9a13..ac04610e9b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala @@ -10,40 +10,23 @@ import akka.testkit.AkkaSpec class OutboundCompressionSpec extends AkkaSpec { import CompressionTestUtils._ - val remoteAddress = Address("artery", "example", "localhost", 0) - - "OutboundCompression" must { - "not compress not-known values" in { - val table = new OutboundActorRefCompression(system, remoteAddress) - table.compress(minimalRef("banana")) should ===(-1) - } - } - - "OutboundActorRefCompression" must { + "Outbound ActorRef compression" must { val alice = minimalRef("alice") val bob = minimalRef("bob") - "always compress /deadLetters" in { - val table = new OutboundActorRefCompression(system, remoteAddress) - table.compress(system.deadLetters) should ===(0) - } - "not compress unknown actor ref" in { - val table = new OutboundActorRefCompression(system, remoteAddress) + val table = CompressionTable.empty[ActorRef] table.compress(alice) should ===(-1) // not compressed } "compress previously registered actor ref" in { - val compression = new OutboundActorRefCompression(system, remoteAddress) val table = CompressionTable(1, Map(system.deadLetters → 0, alice → 1)) - compression.flipTable(table) - compression.compress(alice) should ===(1) // compressed - compression.compress(bob) should ===(-1) // not compressed + table.compress(alice) should ===(1) // compressed + table.compress(bob) should ===(-1) // not compressed val table2 = table.copy(2, map = table.map.updated(bob, 2)) - compression.flipTable(table2) - compression.compress(alice) should ===(1) // compressed - compression.compress(bob) should ===(2) // compressed + table2.compress(alice) should ===(1) // compressed + table2.compress(bob) should ===(2) // compressed } }