[WIP] Large message stream for Artery (#20545)

* First stab at separate large message channel for Artery

* Full actor paths, no implicit "/user/" part

* Various small fixes after review

* Fixes to make it work after rebasing

* Use a separate EnvelopeBufferPool for the large message stream

* Docs for actorSelection not sending through large message stream
This commit is contained in:
Johan Andrén 2016-05-20 12:40:56 +02:00 committed by Patrik Nordwall
parent a4b996546e
commit cd71643a91
6 changed files with 266 additions and 14 deletions

View file

@ -91,6 +91,20 @@ akka {
# InetAddress.getLocalHost.getHostName is used if
# "<getHostName>" is specified.
hostname = "<getHostAddress>"
# Actor paths to use the large message stream for when a message
# is sent to them over remoting. The large message stream dedicated
# is separate from "normal" and system messages so that sending a
# large message does not interfere with them.
# Entries should be the full path to the actor. Wildcards in the form of "*"
# can be supplied at any place and matches any name at that segment -
# "/user/supervisor/actor/*" will match any direct child to actor,
# while "/supervisor/*/child" will match any grandchild to "supervisor" that
# has the name "child"
# Messages sent to ActorSelections will not be passed through the large message
# stream, to pass such messages through the large message stream the selections
# but must be resolved to ActorRefs first.
large-message-destinations = []
}
### General settings

View file

@ -438,6 +438,13 @@ private[akka] trait RemoteRef extends ActorRefScope {
final def isLocal = false
}
/**
* INTERNAL API
*/
private[remote] sealed abstract class LargeMessageDestinationFlag
private[remote] case object RegularDestination extends LargeMessageDestinationFlag
private[remote] case object LargeDestination extends LargeMessageDestinationFlag
/**
* INTERNAL API
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
@ -454,6 +461,9 @@ private[akka] class RemoteActorRef private[akka] (
@volatile var cachedAssociation: artery.Association = null
// used by artery to direct messages to a separate stream for large messages
@volatile var cachedLargeMessageDestinationFlag: LargeMessageDestinationFlag = null
def getChild(name: Iterator[String]): InternalActorRef = {
val s = name.toStream
s.headOption match {

View file

@ -50,8 +50,7 @@ import akka.stream.scaladsl.Framing
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import akka.util.ByteStringBuilder
import akka.util.{ ByteString, ByteStringBuilder, WildcardTree }
import akka.util.Helpers.ConfigOps
import akka.util.Helpers.Requiring
import io.aeron.Aeron
@ -68,6 +67,7 @@ import java.nio.channels.DatagramChannel
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import scala.collection.JavaConverters._
/**
* INTERNAL API
*/
@ -233,10 +233,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero,
"handshake-timeout must be > 0")
private val largeMessageDestinations =
system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry)
val segments = entry.split('/').tail
tree.insert(segments.iterator, NotUsed)
}
private val largeMessageDestinationsEnabled = largeMessageDestinations.children.nonEmpty
private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
private val controlStreamId = 1
private val ordinaryStreamId = 3
private val largeStreamId = 4
private val taskRunner = new TaskRunner(system)
// FIXME: This does locking on putIfAbsent, we need something smarter
@ -247,6 +255,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val largeEnvelopePool: Option[EnvelopeBufferPool] =
if (largeMessageDestinationsEnabled) Some(new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers))
else None
// FIXME: Compression table must be owned by each channel instead
// of having a global one
val compression = new Compression(system)
@ -320,6 +332,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def runInboundStreams(): Unit = {
runInboundControlStream()
runInboundOrdinaryMessagesStream()
if (largeMessageDestinationsEnabled) {
runInboundLargeMessagesStream()
}
}
private def runInboundControlStream(): Unit = {
@ -369,6 +384,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
attachStreamRestart("Inbound message stream", completed, () runInboundOrdinaryMessagesStream())
}
private def runInboundLargeMessagesStream(): Unit = {
largeEnvelopePool.foreach { largePool
// TODO just cargo-cult programming here
val completed = Source.fromGraph(new AeronSource(inboundChannel, largeStreamId, aeron, taskRunner, largePool))
.async // FIXME measure
.via(inboundFlow)
.runWith(Sink.ignore)(materializer)
attachStreamRestart("Inbound large message stream", completed, () runInboundLargeMessagesStream())
}
}
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () Unit): Unit = {
implicit val ec = materializer.executionContext
streamCompleted.onFailure {
@ -422,7 +449,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
else {
associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] {
override def apply(remoteAddress: Address): Association = {
val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject)
val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject, largeMessageDestinations)
newAssociation.associate() // This is a bit costly for this blocking method :(
newAssociation
}
@ -443,6 +470,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, envelopePool))(Keep.right)
}
def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = {
largeEnvelopePool match {
case Some(pool)
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
.via(createEncoder(pool))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, envelopePool))(Keep.right)
case None throw new IllegalArgumentException("Trying to create outbound stream but outbound stream not configured")
}
}
def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
@ -457,8 +495,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// FIXME hack until real envelopes, encoding originAddress in sender :)
private val dummySender = system.systemActorOf(Props.empty, "dummy")
val encoder: Flow[Send, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(this, compression))
def createEncoder(pool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(this, compression, pool))
val encoder = createEncoder(envelopePool)
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption)
@ -496,6 +535,7 @@ private[remote] object ArteryTransport {
val Version = 0
val MaximumFrameSize = 1024 * 1024
val MaximumPooledBuffers = 256
val MaximumLargeFrameSize = MaximumFrameSize * 5
/**
* Internal API

View file

@ -12,8 +12,7 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import scala.util.Success
import akka.Done
import akka.{ Done, NotUsed }
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
import akka.actor.Address
@ -21,8 +20,7 @@ import akka.actor.RootActorPath
import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef, UniqueAddress }
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
@ -33,7 +31,7 @@ import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceQueueWithComplete
import akka.util.Unsafe
import akka.util.{ Unsafe, WildcardTree }
/**
* INTERNAL API
@ -45,7 +43,8 @@ private[akka] class Association(
val transport: ArteryTransport,
val materializer: Materializer,
override val remoteAddress: Address,
override val controlSubject: ControlMessageSubject)
override val controlSubject: ControlMessageSubject,
largeMessageDestinations: WildcardTree[NotUsed])
extends AbstractAssociation with OutboundContext {
private val log = Logging(transport.system, getClass.getName)
@ -54,8 +53,10 @@ private[akka] class Association(
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
private val maxRestarts = 5 // FIXME config
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
private val largeMessageChannelEnabled = largeMessageDestinations.children.nonEmpty
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
@volatile private[this] var largeQueue: SourceQueueWithComplete[Send] = _
@volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _
@volatile private[this] var _outboundControlIngress: OutboundControlIngress = _
@volatile private[this] var materializing = new CountDownLatch(1)
@ -136,12 +137,32 @@ private[akka] class Association(
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
}
case _
queue.offer(Send(message, senderOption, recipient, None))
val send = Send(message, senderOption, recipient, None)
if (largeMessageChannelEnabled && isLargeMessageDestination(recipient))
largeQueue.offer(send)
else
queue.offer(send)
}
} else if (log.isDebugEnabled)
log.debug("Dropping message to quarantined system {}", remoteAddress)
}
private def isLargeMessageDestination(recipient: ActorRef): Boolean = {
recipient match {
case r: RemoteActorRef if r.cachedLargeMessageDestinationFlag ne null r.cachedLargeMessageDestinationFlag == LargeDestination
case r: RemoteActorRef
if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) {
r.cachedLargeMessageDestinationFlag = RegularDestination
false
} else {
log.debug("Using large message stream for {}", r.path)
r.cachedLargeMessageDestinationFlag = LargeDestination
true
}
case _ false
}
}
// FIXME we should be able to Send without a recipient ActorRef
override val dummyRecipient: RemoteActorRef =
transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef]
@ -195,6 +216,10 @@ private[akka] class Association(
// so that outboundControlIngress is ready when stages for all streams start
runOutboundControlStream()
runOutboundOrdinaryMessagesStream()
if (largeMessageChannelEnabled) {
runOutboundLargeMessagesStream()
}
}
}
@ -225,6 +250,14 @@ private[akka] class Association(
attachStreamRestart("Outbound message stream", completed, _ runOutboundOrdinaryMessagesStream())
}
private def runOutboundLargeMessagesStream(): Unit = {
val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer)
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
largeQueue = q
attachStreamRestart("Outbound large message stream", completed, _ runOutboundLargeMessagesStream())
}
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable Unit): Unit = {
implicit val ec = materializer.executionContext
streamCompleted.onFailure {

View file

@ -10,7 +10,8 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
// TODO: Long UID
class Encoder(
transport: ArteryTransport,
compressionTable: LiteralCompressionTable)
compressionTable: LiteralCompressionTable,
pool: EnvelopeBufferPool)
extends GraphStage[FlowShape[Send, EnvelopeBuffer]] {
val in: Inlet[Send] = Inlet("Artery.Encoder.in")
@ -20,7 +21,6 @@ class Encoder(
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val pool = transport.envelopePool
private val headerBuilder = HeaderBuilder(compressionTable)
headerBuilder.version = ArteryTransport.Version
headerBuilder.uid = transport.localAddress.uid

View file

@ -0,0 +1,155 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.actor.{ Actor, ActorRef, ActorSelection, ActorSystem, ExtendedActorSystem, Props, RootActorPath }
import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef }
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ ShouldMatchers, WordSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
object LargeMessagesStreamSpec {
case class Ping(payload: ByteString = ByteString.empty)
case class Pong(bytesReceived: Long)
class EchoSize extends Actor {
def receive = {
case Ping(bytes) sender() ! Pong(bytes.size)
}
}
}
class LargeMessagesStreamSpec extends WordSpec with ShouldMatchers with ScalaFutures {
import LargeMessagesStreamSpec._
val config = ConfigFactory.parseString(
s"""
akka {
loglevel = ERROR
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote.artery {
enabled = on
hostname = localhost
port = 0
large-message-destinations = [
"/user/large"
]
}
}
""")
"The large message support" should {
"not affect regular communication" in {
val systemA = ActorSystem("systemA", config)
val systemB = ActorSystem("systemB", config)
try {
val senderProbeA = TestProbe()(systemA)
val senderProbeB = TestProbe()(systemB)
// start actor and make sure it is up and running
val large = systemB.actorOf(Props(new EchoSize), "regular")
large.tell(Ping(), senderProbeB.ref)
senderProbeB.expectMsg(Pong(0))
// communicate with it from the other system
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
largeRemote.tell(Ping(), senderProbeA.ref)
senderProbeA.expectMsg(Pong(0))
// flag should be cached now
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
} finally {
TestKit.shutdownActorSystem(systemA)
TestKit.shutdownActorSystem(systemB)
}
}
"pass small regular messages over the large-message stream" in {
val systemA = ActorSystem("systemA", config)
val systemB = ActorSystem("systemB", config)
try {
val senderProbeA = TestProbe()(systemA)
val senderProbeB = TestProbe()(systemB)
// start actor and make sure it is up and running
val large = systemB.actorOf(Props(new EchoSize), "large")
large.tell(Ping(), senderProbeB.ref)
senderProbeB.expectMsg(Pong(0))
// communicate with it from the other system
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large"))
largeRemote.tell(Ping(), senderProbeA.ref)
senderProbeA.expectMsg(Pong(0))
// flag should be cached now
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination)
} finally {
TestKit.shutdownActorSystem(systemA)
TestKit.shutdownActorSystem(systemB)
}
}
"allow for normal communication while simultaneously sending large messages" in {
val systemA = ActorSystem("systemA", config)
val systemB = ActorSystem("systemB", config)
try {
val senderProbeB = TestProbe()(systemB)
// setup two actors, one with the large flag and one regular
val large = systemB.actorOf(Props(new EchoSize), "large")
large.tell(Ping(), senderProbeB.ref)
senderProbeB.expectMsg(Pong(0))
val regular = systemB.actorOf(Props(new EchoSize), "regular")
regular.tell(Ping(), senderProbeB.ref)
senderProbeB.expectMsg(Pong(0))
// both up and running, resolve remote refs
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large"))
val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
// send a large message, as well as regular one
val remoteProbe = TestProbe()(systemA)
val largeBytes = 2000000
largeRemote.tell(Ping(ByteString.fromArray(Array.ofDim[Byte](largeBytes))), remoteProbe.ref)
regularRemote.tell(Ping(), remoteProbe.ref)
// should be no problems sending regular small messages while large messages are being sent
remoteProbe.expectMsg(Pong(0))
remoteProbe.expectMsg(10.seconds, Pong(largeBytes))
// cached flags should be set now
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination)
regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
} finally {
TestKit.shutdownActorSystem(systemA)
TestKit.shutdownActorSystem(systemB)
}
}
}
def awaitResolve(selection: ActorSelection): ActorRef = Await.result(selection.resolveOne(3.seconds), 3.seconds)
}