diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index cc114df912..ae5344629c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -46,7 +46,7 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.util.Helpers.ConfigOps import akka.util.Helpers.Requiring -import akka.util.WildcardTree +import akka.util.WildcardIndex import io.aeron.Aeron import io.aeron.AvailableImageHandler import io.aeron.Image @@ -320,11 +320,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val remoteDispatcher = system.dispatchers.lookup(remoteSettings.Dispatcher) private val largeMessageDestinations = - system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry) ⇒ + system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry) ⇒ val segments = entry.split('/').tail - tree.insert(segments.iterator, NotUsed) + tree.insert(segments, NotUsed) } - private val largeMessageDestinationsEnabled = largeMessageDestinations.children.nonEmpty + // TODO use WildcardIndex.isEmpty when merged from master + val largeMessageChannelEnabled = + !largeMessageDestinations.wildcardTree.isEmpty || !largeMessageDestinations.doubleWildcardTree.isEmpty private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" @@ -526,7 +528,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R runInboundControlStream(noCompressions) // TODO should understand compressions too runInboundOrdinaryMessagesStream(compressions) - if (largeMessageDestinationsEnabled) { + if (largeMessageChannelEnabled) { runInboundLargeMessagesStream() } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index da3936b065..65706abf07 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -35,7 +35,7 @@ import akka.stream.AbruptTerminationException import akka.stream.Materializer import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source -import akka.util.{ Unsafe, WildcardTree } +import akka.util.{ Unsafe, WildcardIndex } import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal import akka.remote.QuarantinedEvent @@ -62,7 +62,7 @@ private[remote] class Association( val materializer: Materializer, override val remoteAddress: Address, override val controlSubject: ControlMessageSubject, - largeMessageDestinations: WildcardTree[NotUsed], + largeMessageDestinations: WildcardIndex[NotUsed], outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope]) extends AbstractAssociation with OutboundContext { import Association._ @@ -77,7 +77,6 @@ private[remote] class Association( private val restartTimeout: FiniteDuration = 5.seconds // FIXME config private val maxRestarts = 5 // FIXME config private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) - private val largeMessageChannelEnabled = largeMessageDestinations.children.nonEmpty // We start with the raw wrapped queue and then it is replaced with the materialized value of // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to @@ -200,7 +199,7 @@ private[remote] class Association( case _ ⇒ val outboundEnvelope = createOutboundEnvelope() val offerOk = - if (largeMessageChannelEnabled && isLargeMessageDestination(recipient)) + if (transport.largeMessageChannelEnabled && isLargeMessageDestination(recipient)) largeQueue.offer(outboundEnvelope) else queue.offer(outboundEnvelope) @@ -216,7 +215,7 @@ private[remote] class Association( case OptionVal.Some(r) ⇒ if (r.cachedLargeMessageDestinationFlag ne null) r.cachedLargeMessageDestinationFlag eq LargeDestination - else if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) { + else if (largeMessageDestinations.find(r.path.elements).isEmpty) { r.cachedLargeMessageDestinationFlag = RegularDestination false } else { @@ -293,7 +292,7 @@ private[remote] class Association( runOutboundControlStream(disableCompression) runOutboundOrdinaryMessagesStream(CurrentAssociationStateOutboundCompressionsProxy) - if (largeMessageChannelEnabled) { + if (transport.largeMessageChannelEnabled) { runOutboundLargeMessagesStream(disableCompression) } }