diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index ac83fc7d54..607ef745ba 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -458,9 +458,10 @@ private[akka] trait RemoteRef extends ActorRefScope { /** * INTERNAL API */ -private[remote] sealed abstract class LargeMessageDestinationFlag -private[remote] case object RegularDestination extends LargeMessageDestinationFlag -private[remote] case object LargeDestination extends LargeMessageDestinationFlag +private[remote] sealed abstract class MessageDestinationFlag +private[remote] case object RegularDestination extends MessageDestinationFlag +private[remote] case object LargeDestination extends MessageDestinationFlag +private[remote] case object PriorityDestination extends MessageDestinationFlag /** * INTERNAL API @@ -486,8 +487,8 @@ private[akka] class RemoteActorRef private[akka] ( } @volatile private[remote] var cachedAssociation: artery.Association = null - // used by artery to direct messages to a separate stream for large messages - @volatile private[remote] var cachedLargeMessageDestinationFlag: LargeMessageDestinationFlag = null + // used by artery to direct messages to separate specialized streams + @volatile private[remote] var cachedMessageDestinationFlag: MessageDestinationFlag = null def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 8a35a63126..35817c99d0 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -54,6 +54,8 @@ private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider { * Messages marked with this trait will be sent before other messages when buffering is active. * This means that these messages don't obey normal message ordering. * It is used for failure detector heartbeat messages. + * + * In Artery this is not used, and instead a preconfigured set of destinations select the priority lane. */ private[akka] trait PriorityMessage diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index ae5344629c..6131cb2cc6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -328,11 +328,21 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R val largeMessageChannelEnabled = !largeMessageDestinations.wildcardTree.isEmpty || !largeMessageDestinations.doubleWildcardTree.isEmpty + private val priorityMessageDestinations = + WildcardIndex[NotUsed]() + // this comes from remoting so is semi-ok to be hardcoded here + .insert(Array("system", "remote-watcher"), NotUsed) + // these belongs to cluster and should come from there + .insert(Array("system", "cluster", "core", "daemon", "heartbeatSender"), NotUsed) + .insert(Array("system", "cluster", "heartbeatReceiver"), NotUsed) + private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + private val controlStreamId = 1 private val ordinaryStreamId = 3 private val largeStreamId = 4 + private val taskRunner = new TaskRunner(system, remoteSettings.IdleCpuLevel) private val restartTimeout: FiniteDuration = 5.seconds // FIXME config @@ -367,7 +377,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R createFlightRecorderEventSink(synchr = true) private val associationRegistry = new AssociationRegistry( - remoteAddress ⇒ new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations, + remoteAddress ⇒ new Association( + this, + materializer, + remoteAddress, + controlSubject, + largeMessageDestinations, + priorityMessageDestinations, outboundEnvelopePool)) def remoteSettings: RemoteSettings = provider.remoteSettings @@ -749,21 +765,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) } - def outbound(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = { - Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) - .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, - handshakeRetryInterval, injectHandshakeInterval)) - .via(encoder(compression)) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, - envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) - } + def outbound(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = + createOutboundSink(ordinaryStreamId, outboundContext, compression, envelopePool) - def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = { + def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = + createOutboundSink(largeStreamId, outboundContext, compression, largeEnvelopePool) + + private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, Future[Done]] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) - .via(createEncoder(largeEnvelopePool, compression)) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, + .via(createEncoder(bufferPool, compression)) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 65706abf07..6c3a29e908 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -25,11 +25,12 @@ import akka.actor.Address import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging -import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef, UniqueAddress } +import akka.remote._ import akka.remote.artery.AeronSink.GaveUpSendingException import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException +import akka.remote.artery.SendQueue.ProducerApi import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery import akka.stream.AbruptTerminationException import akka.stream.Materializer @@ -38,8 +39,6 @@ import akka.stream.scaladsl.Source import akka.util.{ Unsafe, WildcardIndex } import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal -import akka.remote.QuarantinedEvent -import akka.remote.DaemonMsgCreate import akka.remote.artery.compress.CompressionProtocol._ /** @@ -63,6 +62,7 @@ private[remote] class Association( override val remoteAddress: Address, override val controlSubject: ControlMessageSubject, largeMessageDestinations: WildcardIndex[NotUsed], + priorityMessageDestinations: WildcardIndex[NotUsed], outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope]) extends AbstractAssociation with OutboundContext { import Association._ @@ -198,11 +198,8 @@ private[remote] class Association( transport.system.deadLetters ! outboundEnvelope2 case _ ⇒ val outboundEnvelope = createOutboundEnvelope() - val offerOk = - if (transport.largeMessageChannelEnabled && isLargeMessageDestination(recipient)) - largeQueue.offer(outboundEnvelope) - else - queue.offer(outboundEnvelope) + val queue = selectQueue(recipient) + val offerOk = queue.offer(outboundEnvelope) if (!offerOk) transport.system.deadLetters ! outboundEnvelope } @@ -210,20 +207,30 @@ private[remote] class Association( log.debug("Dropping message to quarantined system {}", remoteAddress) } - private def isLargeMessageDestination(recipient: OptionVal[RemoteActorRef]): Boolean = { + @tailrec + private def selectQueue(recipient: OptionVal[RemoteActorRef]): ProducerApi[OutboundEnvelope] = { recipient match { case OptionVal.Some(r) ⇒ - if (r.cachedLargeMessageDestinationFlag ne null) - r.cachedLargeMessageDestinationFlag eq LargeDestination - else if (largeMessageDestinations.find(r.path.elements).isEmpty) { - r.cachedLargeMessageDestinationFlag = RegularDestination - false - } else { - log.debug("Using large message stream for {}", r.path) - r.cachedLargeMessageDestinationFlag = LargeDestination - true + r.cachedMessageDestinationFlag match { + case RegularDestination ⇒ queue + case PriorityDestination ⇒ controlQueue + case LargeDestination ⇒ largeQueue + case null ⇒ + // only happens when messages are sent to new remote destination + // and is then cached on the RemoteActorRef + val elements = r.path.elements + if (priorityMessageDestinations.find(elements).isDefined) { + log.debug("Using priority message stream for {}", r.path) + r.cachedMessageDestinationFlag = PriorityDestination + } else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) { + log.debug("Using large message stream for {}", r.path) + r.cachedMessageDestinationFlag = LargeDestination + } else { + r.cachedMessageDestinationFlag = RegularDestination + } + selectQueue(recipient) } - case OptionVal.None ⇒ false + case OptionVal.None ⇒ queue } } @@ -370,7 +377,7 @@ private[remote] class Association( } private def runOutboundLargeMessagesStream(compression: OutboundCompressions): Unit = { - val wrapper = getOrCreateQueueWrapper(queue, largeQueueSize) + val wrapper = getOrCreateQueueWrapper(largeQueue, largeQueueSize) largeQueue = wrapper // use new underlying queue immediately for restarts val (queueValue, completed) = diff --git a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala index 1eaaa5d103..bb584ea8f2 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala @@ -51,7 +51,7 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( senderProbeA.expectMsg(Pong(0)) // flag should be cached now - regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) + regularRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(RegularDestination) } @@ -75,7 +75,7 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( senderProbeA.expectMsg(Pong(0)) // flag should be cached now - largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) + largeRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(LargeDestination) } @@ -112,8 +112,8 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( remoteProbe.expectMsg(10.seconds, Pong(largeBytes)) // cached flags should be set now - largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) - regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) + largeRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(LargeDestination) + regularRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(RegularDestination) } }