use the new WildcardIndex

This commit is contained in:
Patrik Nordwall 2016-08-23 20:38:39 +02:00
parent 8ab02738b7
commit 21a4899054
2 changed files with 12 additions and 11 deletions

View file

@ -46,7 +46,7 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.Helpers.ConfigOps
import akka.util.Helpers.Requiring
import akka.util.WildcardTree
import akka.util.WildcardIndex
import io.aeron.Aeron
import io.aeron.AvailableImageHandler
import io.aeron.Image
@ -320,11 +320,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val remoteDispatcher = system.dispatchers.lookup(remoteSettings.Dispatcher)
private val largeMessageDestinations =
system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry)
system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry)
val segments = entry.split('/').tail
tree.insert(segments.iterator, NotUsed)
tree.insert(segments, NotUsed)
}
private val largeMessageDestinationsEnabled = largeMessageDestinations.children.nonEmpty
// TODO use WildcardIndex.isEmpty when merged from master
val largeMessageChannelEnabled =
!largeMessageDestinations.wildcardTree.isEmpty || !largeMessageDestinations.doubleWildcardTree.isEmpty
private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
@ -526,7 +528,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
runInboundControlStream(noCompressions) // TODO should understand compressions too
runInboundOrdinaryMessagesStream(compressions)
if (largeMessageDestinationsEnabled) {
if (largeMessageChannelEnabled) {
runInboundLargeMessagesStream()
}
}

View file

@ -35,7 +35,7 @@ import akka.stream.AbruptTerminationException
import akka.stream.Materializer
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.util.{ Unsafe, WildcardTree }
import akka.util.{ Unsafe, WildcardIndex }
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.util.OptionVal
import akka.remote.QuarantinedEvent
@ -62,7 +62,7 @@ private[remote] class Association(
val materializer: Materializer,
override val remoteAddress: Address,
override val controlSubject: ControlMessageSubject,
largeMessageDestinations: WildcardTree[NotUsed],
largeMessageDestinations: WildcardIndex[NotUsed],
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])
extends AbstractAssociation with OutboundContext {
import Association._
@ -77,7 +77,6 @@ private[remote] class Association(
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
private val maxRestarts = 5 // FIXME config
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
private val largeMessageChannelEnabled = largeMessageDestinations.children.nonEmpty
// We start with the raw wrapped queue and then it is replaced with the materialized value of
// the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
@ -200,7 +199,7 @@ private[remote] class Association(
case _
val outboundEnvelope = createOutboundEnvelope()
val offerOk =
if (largeMessageChannelEnabled && isLargeMessageDestination(recipient))
if (transport.largeMessageChannelEnabled && isLargeMessageDestination(recipient))
largeQueue.offer(outboundEnvelope)
else
queue.offer(outboundEnvelope)
@ -216,7 +215,7 @@ private[remote] class Association(
case OptionVal.Some(r)
if (r.cachedLargeMessageDestinationFlag ne null)
r.cachedLargeMessageDestinationFlag eq LargeDestination
else if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) {
else if (largeMessageDestinations.find(r.path.elements).isEmpty) {
r.cachedLargeMessageDestinationFlag = RegularDestination
false
} else {
@ -293,7 +292,7 @@ private[remote] class Association(
runOutboundControlStream(disableCompression)
runOutboundOrdinaryMessagesStream(CurrentAssociationStateOutboundCompressionsProxy)
if (largeMessageChannelEnabled) {
if (transport.largeMessageChannelEnabled) {
runOutboundLargeMessagesStream(disableCompression)
}
}