diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index fc81a7597a..ea692dab5c 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -91,6 +91,20 @@ akka { # InetAddress.getLocalHost.getHostName is used if # "" is specified. hostname = "" + + # Actor paths to use the large message stream for when a message + # is sent to them over remoting. The large message stream dedicated + # is separate from "normal" and system messages so that sending a + # large message does not interfere with them. + # Entries should be the full path to the actor. Wildcards in the form of "*" + # can be supplied at any place and matches any name at that segment - + # "/user/supervisor/actor/*" will match any direct child to actor, + # while "/supervisor/*/child" will match any grandchild to "supervisor" that + # has the name "child" + # Messages sent to ActorSelections will not be passed through the large message + # stream, to pass such messages through the large message stream the selections + # but must be resolved to ActorRefs first. + large-message-destinations = [] } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 52fe8f2ba8..987f819b88 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -438,6 +438,13 @@ private[akka] trait RemoteRef extends ActorRefScope { final def isLocal = false } +/** + * INTERNAL API + */ +private[remote] sealed abstract class LargeMessageDestinationFlag +private[remote] case object RegularDestination extends LargeMessageDestinationFlag +private[remote] case object LargeDestination extends LargeMessageDestinationFlag + /** * INTERNAL API * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. @@ -454,6 +461,9 @@ private[akka] class RemoteActorRef private[akka] ( @volatile var cachedAssociation: artery.Association = null + // used by artery to direct messages to a separate stream for large messages + @volatile var cachedLargeMessageDestinationFlag: LargeMessageDestinationFlag = null + def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream s.headOption match { diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 05aa1aa075..df612a77a0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -50,8 +50,7 @@ import akka.stream.scaladsl.Framing import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import akka.util.ByteString -import akka.util.ByteStringBuilder +import akka.util.{ ByteString, ByteStringBuilder, WildcardTree } import akka.util.Helpers.ConfigOps import akka.util.Helpers.Requiring import io.aeron.Aeron @@ -68,6 +67,7 @@ import java.nio.channels.DatagramChannel import akka.remote.artery.OutboundControlJunction.OutboundControlIngress +import scala.collection.JavaConverters._ /** * INTERNAL API */ @@ -233,10 +233,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero, "handshake-timeout must be > 0") + private val largeMessageDestinations = + system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry) ⇒ + val segments = entry.split('/').tail + tree.insert(segments.iterator, NotUsed) + } + private val largeMessageDestinationsEnabled = largeMessageDestinations.children.nonEmpty + private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" private val controlStreamId = 1 private val ordinaryStreamId = 3 + private val largeStreamId = 4 private val taskRunner = new TaskRunner(system) // FIXME: This does locking on putIfAbsent, we need something smarter @@ -247,6 +255,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + val largeEnvelopePool: Option[EnvelopeBufferPool] = + if (largeMessageDestinationsEnabled) Some(new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers)) + else None + // FIXME: Compression table must be owned by each channel instead // of having a global one val compression = new Compression(system) @@ -320,6 +332,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def runInboundStreams(): Unit = { runInboundControlStream() runInboundOrdinaryMessagesStream() + if (largeMessageDestinationsEnabled) { + runInboundLargeMessagesStream() + } } private def runInboundControlStream(): Unit = { @@ -369,6 +384,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R attachStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream()) } + private def runInboundLargeMessagesStream(): Unit = { + largeEnvelopePool.foreach { largePool ⇒ + // TODO just cargo-cult programming here + val completed = Source.fromGraph(new AeronSource(inboundChannel, largeStreamId, aeron, taskRunner, largePool)) + .async // FIXME measure + .via(inboundFlow) + .runWith(Sink.ignore)(materializer) + + attachStreamRestart("Inbound large message stream", completed, () ⇒ runInboundLargeMessagesStream()) + } + } + private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { implicit val ec = materializer.executionContext streamCompleted.onFailure { @@ -422,7 +449,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R else { associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] { override def apply(remoteAddress: Address): Association = { - val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject) + val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject, largeMessageDestinations) newAssociation.associate() // This is a bit costly for this blocking method :( newAssociation } @@ -443,6 +470,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, envelopePool))(Keep.right) } + def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = { + largeEnvelopePool match { + case Some(pool) ⇒ + Flow.fromGraph(killSwitch.flow[Send]) + .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) + .via(createEncoder(pool)) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, envelopePool))(Keep.right) + case None ⇒ throw new IllegalArgumentException("Trying to create outbound stream but outbound stream not configured") + } + } + def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) @@ -457,8 +495,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME hack until real envelopes, encoding originAddress in sender :) private val dummySender = system.systemActorOf(Props.empty, "dummy") - val encoder: Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(this, compression)) + def createEncoder(pool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(this, compression, pool)) + val encoder = createEncoder(envelopePool) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption) @@ -496,6 +535,7 @@ private[remote] object ArteryTransport { val Version = 0 val MaximumFrameSize = 1024 * 1024 val MaximumPooledBuffers = 256 + val MaximumLargeFrameSize = MaximumFrameSize * 5 /** * Internal API diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 219dc15906..fe6b9e800e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -12,8 +12,7 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration import scala.util.Success - -import akka.Done +import akka.{ Done, NotUsed } import akka.actor.ActorRef import akka.actor.ActorSelectionMessage import akka.actor.Address @@ -21,8 +20,7 @@ import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging import akka.remote.EndpointManager.Send -import akka.remote.RemoteActorRef -import akka.remote.UniqueAddress +import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef, UniqueAddress } import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException @@ -33,7 +31,7 @@ import akka.stream.OverflowStrategy import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete -import akka.util.Unsafe +import akka.util.{ Unsafe, WildcardTree } /** * INTERNAL API @@ -45,7 +43,8 @@ private[akka] class Association( val transport: ArteryTransport, val materializer: Materializer, override val remoteAddress: Address, - override val controlSubject: ControlMessageSubject) + override val controlSubject: ControlMessageSubject, + largeMessageDestinations: WildcardTree[NotUsed]) extends AbstractAssociation with OutboundContext { private val log = Logging(transport.system, getClass.getName) @@ -54,8 +53,10 @@ private[akka] class Association( private val restartTimeout: FiniteDuration = 5.seconds // FIXME config private val maxRestarts = 5 // FIXME config private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) + private val largeMessageChannelEnabled = largeMessageDestinations.children.nonEmpty @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ + @volatile private[this] var largeQueue: SourceQueueWithComplete[Send] = _ @volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _ @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ @volatile private[this] var materializing = new CountDownLatch(1) @@ -136,12 +137,32 @@ private[akka] class Association( quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") } case _ ⇒ - queue.offer(Send(message, senderOption, recipient, None)) + val send = Send(message, senderOption, recipient, None) + if (largeMessageChannelEnabled && isLargeMessageDestination(recipient)) + largeQueue.offer(send) + else + queue.offer(send) } } else if (log.isDebugEnabled) log.debug("Dropping message to quarantined system {}", remoteAddress) } + private def isLargeMessageDestination(recipient: ActorRef): Boolean = { + recipient match { + case r: RemoteActorRef if r.cachedLargeMessageDestinationFlag ne null ⇒ r.cachedLargeMessageDestinationFlag == LargeDestination + case r: RemoteActorRef ⇒ + if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) { + r.cachedLargeMessageDestinationFlag = RegularDestination + false + } else { + log.debug("Using large message stream for {}", r.path) + r.cachedLargeMessageDestinationFlag = LargeDestination + true + } + case _ ⇒ false + } + } + // FIXME we should be able to Send without a recipient ActorRef override val dummyRecipient: RemoteActorRef = transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef] @@ -195,6 +216,10 @@ private[akka] class Association( // so that outboundControlIngress is ready when stages for all streams start runOutboundControlStream() runOutboundOrdinaryMessagesStream() + + if (largeMessageChannelEnabled) { + runOutboundLargeMessagesStream() + } } } @@ -225,6 +250,14 @@ private[akka] class Association( attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) } + private def runOutboundLargeMessagesStream(): Unit = { + val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer) + .toMat(transport.outboundLarge(this))(Keep.both) + .run()(materializer) + largeQueue = q + attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream()) + } + private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable ⇒ Unit): Unit = { implicit val ec = materializer.executionContext streamCompleted.onFailure { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index db412d7862..e853046131 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -10,7 +10,8 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } // TODO: Long UID class Encoder( transport: ArteryTransport, - compressionTable: LiteralCompressionTable) + compressionTable: LiteralCompressionTable, + pool: EnvelopeBufferPool) extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { val in: Inlet[Send] = Inlet("Artery.Encoder.in") @@ -20,7 +21,6 @@ class Encoder( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - private val pool = transport.envelopePool private val headerBuilder = HeaderBuilder(compressionTable) headerBuilder.version = ArteryTransport.Version headerBuilder.uid = transport.localAddress.uid diff --git a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala new file mode 100644 index 0000000000..b56b3ff795 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.{ Actor, ActorRef, ActorSelection, ActorSystem, ExtendedActorSystem, Props, RootActorPath } +import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef } +import akka.testkit.{ SocketUtil, TestKit, TestProbe } +import akka.util.ByteString +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{ ShouldMatchers, WordSpec } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object LargeMessagesStreamSpec { + case class Ping(payload: ByteString = ByteString.empty) + case class Pong(bytesReceived: Long) + class EchoSize extends Actor { + def receive = { + case Ping(bytes) ⇒ sender() ! Pong(bytes.size) + } + } +} + +class LargeMessagesStreamSpec extends WordSpec with ShouldMatchers with ScalaFutures { + import LargeMessagesStreamSpec._ + + val config = ConfigFactory.parseString( + s""" + akka { + loglevel = ERROR + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote.artery { + enabled = on + hostname = localhost + port = 0 + large-message-destinations = [ + "/user/large" + ] + } + } + + """) + + "The large message support" should { + + "not affect regular communication" in { + val systemA = ActorSystem("systemA", config) + val systemB = ActorSystem("systemB", config) + + try { + val senderProbeA = TestProbe()(systemA) + val senderProbeB = TestProbe()(systemB) + + // start actor and make sure it is up and running + val large = systemB.actorOf(Props(new EchoSize), "regular") + large.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) + + // communicate with it from the other system + val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val rootB = RootActorPath(addressB) + val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular")) + largeRemote.tell(Ping(), senderProbeA.ref) + senderProbeA.expectMsg(Pong(0)) + + // flag should be cached now + largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) + + } finally { + TestKit.shutdownActorSystem(systemA) + TestKit.shutdownActorSystem(systemB) + } + } + + "pass small regular messages over the large-message stream" in { + val systemA = ActorSystem("systemA", config) + val systemB = ActorSystem("systemB", config) + + try { + val senderProbeA = TestProbe()(systemA) + val senderProbeB = TestProbe()(systemB) + + // start actor and make sure it is up and running + val large = systemB.actorOf(Props(new EchoSize), "large") + large.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) + + // communicate with it from the other system + val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val rootB = RootActorPath(addressB) + val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large")) + largeRemote.tell(Ping(), senderProbeA.ref) + senderProbeA.expectMsg(Pong(0)) + + // flag should be cached now + largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) + + } finally { + TestKit.shutdownActorSystem(systemA) + TestKit.shutdownActorSystem(systemB) + } + } + + "allow for normal communication while simultaneously sending large messages" in { + val systemA = ActorSystem("systemA", config) + val systemB = ActorSystem("systemB", config) + + try { + + val senderProbeB = TestProbe()(systemB) + + // setup two actors, one with the large flag and one regular + val large = systemB.actorOf(Props(new EchoSize), "large") + large.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) + + val regular = systemB.actorOf(Props(new EchoSize), "regular") + regular.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) + + // both up and running, resolve remote refs + val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val rootB = RootActorPath(addressB) + val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large")) + val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular")) + + // send a large message, as well as regular one + val remoteProbe = TestProbe()(systemA) + + val largeBytes = 2000000 + largeRemote.tell(Ping(ByteString.fromArray(Array.ofDim[Byte](largeBytes))), remoteProbe.ref) + regularRemote.tell(Ping(), remoteProbe.ref) + + // should be no problems sending regular small messages while large messages are being sent + remoteProbe.expectMsg(Pong(0)) + remoteProbe.expectMsg(10.seconds, Pong(largeBytes)) + + // cached flags should be set now + largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) + regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) + + } finally { + TestKit.shutdownActorSystem(systemA) + TestKit.shutdownActorSystem(systemB) + } + } + } + + def awaitResolve(selection: ActorSelection): ActorRef = Await.result(selection.resolveOne(3.seconds), 3.seconds) +}