From 87386e18cfa3e380478290b2bcf84ea46c28bb5d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 May 2016 15:55:06 +0200 Subject: [PATCH] inject outgoing control messages at lower level * instead of sending the the control messages (e.g. handshake reply, sys msg ack) via the normal message send ingress point they are now injected via side channel and will therefore not go through higher level stages such as handshake and system message delivery --- .../akka/remote/artery/ArteryTransport.scala | 20 +-- .../akka/remote/artery/Association.scala | 25 +++- .../scala/akka/remote/artery/Handshake.scala | 2 +- .../main/scala/akka/remote/artery/Reply.scala | 127 +++++++++++++++--- .../remote/artery/SystemMessageDelivery.scala | 11 +- .../akka/remote/artery/TestContext.scala | 4 +- 6 files changed, 144 insertions(+), 45 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 6b822f9521..cdff0acd03 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -23,7 +23,7 @@ import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization @@ -47,6 +47,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil import java.io.File +import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress /** * INTERNAL API @@ -72,7 +73,7 @@ private[akka] trait InboundContext { * An inbound stage can send reply message to the origin * address with this method. */ - def sendReply(to: Address, message: ControlMessage): Unit + def sendReply(to: Address, message: ControlMessage): Unit // FIXME rename to sendControl /** * Lookup the outbound association for a given address. @@ -112,7 +113,7 @@ private[akka] trait OutboundContext { * An outbound stage can listen to reply messages * via this observer subject. */ - def replySubject: ReplySubject + def replySubject: ReplySubject // FIXME rename to controlSubject // FIXME we should be able to Send without a recipient ActorRef def dummyRecipient: RemoteActorRef @@ -146,7 +147,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO support port 0 private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - private val systemMessageStreamId = 1 + private val systemMessageStreamId = 1 // FIXME rename to controlStreamId private val ordinaryStreamId = 3 private val taskRunner = new TaskRunner(system) @@ -241,9 +242,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } // InboundContext - override def sendReply(to: Address, message: ControlMessage) = { - send(message, None, association(to).dummyRecipient) - } + override def sendReply(to: Address, message: ControlMessage) = + association(to).outboundReplyIngress.sendControlMessage(message) override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation @@ -282,10 +282,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) } - def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, Any] = { + def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, OutboundReplyIngress] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) + .viaMat(new OutboundReplyJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner)) @@ -338,13 +339,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Source.maybe[ByteString].via(killSwitch.flow)) } + // FIXME rename to controlFlow val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = { Flow.fromSinkAndSourceMat( decoder .via(deserializer) .via(new InboundHandshake(this)) .via(new SystemMessageAcker(this)) - .viaMat(new ReplyJunction)(Keep.right) + .viaMat(new InboundReplyJunction)(Keep.right) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 1e6b8df019..1254fd2139 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -5,20 +5,20 @@ package akka.remote.artery import scala.concurrent.Future import scala.concurrent.Promise - import akka.actor.ActorRef import akka.actor.Address - import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete +import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress +import akka.stream.scaladsl.Keep /** * INTERNAL API @@ -34,6 +34,13 @@ private[akka] class Association( @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ + @volatile private[this] var _outboundReplyIngress: OutboundReplyIngress = _ + + def outboundReplyIngress: OutboundReplyIngress = { + if (_outboundReplyIngress eq null) + throw new IllegalStateException("outboundReplyIngress not initialized yet") + _outboundReplyIngress + } override def localAddress: UniqueAddress = transport.localAddress @@ -49,7 +56,7 @@ private[akka] class Association( // TODO: lookup subchannel // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { - case _: SystemMessage | _: Reply ⇒ + case _: SystemMessage ⇒ implicit val ec = materializer.executionContext systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure { case e ⇒ @@ -73,8 +80,12 @@ private[akka] class Association( if (queue eq null) queue = Source.queue(256, OverflowStrategy.dropBuffer) .to(transport.outbound(this)).run()(materializer) - if (systemMessageQueue eq null) - systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outboundSystemMessage(this)).run()(materializer) + if (systemMessageQueue eq null) { + val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) + .toMat(transport.outboundSystemMessage(this))(Keep.both) + .run()(materializer) + systemMessageQueue = q + _outboundReplyIngress = control + } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 6590f82967..5a6cdeee65 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet diff --git a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala index 676425894e..baddb3e59a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala @@ -14,6 +14,9 @@ import akka.stream.stage.GraphStageLogic import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler +import akka.remote.EndpointManager.Send +import java.util.ArrayDeque +import akka.stream.stage.CallbackWrapper /** * Marker trait for reply messages @@ -29,7 +32,9 @@ trait ControlMessage /** * INTERNAL API */ -private[akka] object ReplyJunction { +private[akka] object InboundReplyJunction { + + // FIXME rename all Reply stuff to Control or ControlMessage private[akka] trait ReplySubject { def attach(observer: ReplyObserver): Future[Done] @@ -40,24 +45,42 @@ private[akka] object ReplyJunction { private[akka] trait ReplyObserver { def reply(inboundEnvelope: InboundEnvelope): Unit } + + private[InboundReplyJunction] sealed trait CallbackMessage + private[InboundReplyJunction] final case class Attach(observer: ReplyObserver, done: Promise[Done]) + extends CallbackMessage + private[InboundReplyJunction] final case class Dettach(observer: ReplyObserver) extends CallbackMessage } /** * INTERNAL API */ -private[akka] class ReplyJunction - extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], ReplyJunction.ReplySubject] { - import ReplyJunction._ +private[akka] class InboundReplyJunction + extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundReplyJunction.ReplySubject] { + import InboundReplyJunction._ - val in: Inlet[InboundEnvelope] = Inlet("ReplyJunction.in") - val out: Outlet[InboundEnvelope] = Outlet("ReplyJunction.out") + val in: Inlet[InboundEnvelope] = Inlet("InboundReplyJunction.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundReplyJunction.out") override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ReplySubject { + val stoppedPromise = Promise[Done]() + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler { private var replyObservers: Vector[ReplyObserver] = Vector.empty - private val stoppedPromise = Promise[Done]() + + private val callback = getAsyncCallback[CallbackMessage] { + case Attach(observer, done) ⇒ + replyObservers :+= observer + done.success(Done) + case Dettach(observer) ⇒ + replyObservers = replyObservers.filterNot(_ == observer) + } + + override def preStart(): Unit = { + initCallback(callback.invoke) + } override def postStop(): Unit = stoppedPromise.success(Done) @@ -75,23 +98,95 @@ private[akka] class ReplyJunction // OutHandler override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } + + // materialized value + val replySubject: ReplySubject = new ReplySubject { override def attach(observer: ReplyObserver): Future[Done] = { val p = Promise[Done]() - getAsyncCallback[Unit](_ ⇒ { - replyObservers :+= observer - p.success(Done) - }).invoke(()) + logic.invoke(Attach(observer, p)) p.future } - override def detach(observer: ReplyObserver): Unit = { - replyObservers = replyObservers.filterNot(_ == observer) + override def detach(observer: ReplyObserver): Unit = + logic.invoke(Dettach(observer)) + + override def stopped: Future[Done] = + stoppedPromise.future + } + + (logic, replySubject) + } +} + +/** + * INTERNAL API + */ +private[akka] object OutboundReplyJunction { + trait OutboundReplyIngress { + def sendControlMessage(message: ControlMessage): Unit + } +} + +/** + * INTERNAL API + */ +private[akka] class OutboundReplyJunction(outboundContext: OutboundContext) + extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundReplyJunction.OutboundReplyIngress] { + import OutboundReplyJunction._ + val in: Inlet[Send] = Inlet("OutboundReplyJunction.in") + val out: Outlet[Send] = Outlet("OutboundReplyJunction.out") + override val shape: FlowShape[Send, Send] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler { + import OutboundReplyJunction._ + + private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) + private val buffer = new ArrayDeque[Send] + + override def preStart(): Unit = { + initCallback(sendControlMessageCallback.invoke) } - override def stopped: Future[Done] = stoppedPromise.future + // InHandler + override def onPush(): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, grab(in)) + else + buffer.offer(grab(in)) + } + + // OutHandler + override def onPull(): Unit = { + if (buffer.isEmpty && !hasBeenPulled(in)) + pull(in) + else if (!buffer.isEmpty) + push(out, buffer.poll()) + } + + private def internalSendControlMessage(message: ControlMessage): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, wrap(message)) + else + buffer.offer(wrap(message)) + } + + private def wrap(message: ControlMessage): Send = + Send(message, None, outboundContext.dummyRecipient, None) setHandlers(in, out, this) } - (logic, logic) + + // materialized value + val outboundReplyIngress = new OutboundReplyIngress { + override def sendControlMessage(message: ControlMessage): Unit = + logic.invoke(message) + } + + (logic, outboundReplyIngress) } + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 271f4a629a..f66c6ef2c3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -14,7 +14,7 @@ import scala.util.Try import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -151,15 +151,6 @@ private[akka] class SystemMessageDelivery( // InHandler override def onPush(): Unit = { grab(in) match { - case s @ Send(reply: ControlMessage, _, _, _) ⇒ - // pass through - if (isAvailable(out)) - push(out, s) - else { - // it's ok to drop the replies, but we can try - resending.offer(s) - } - case s @ Send(msg: AnyRef, _, _, _) ⇒ seqNo += 1 val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index d54cff909e..a5694a276d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -6,11 +6,11 @@ package akka.remote.artery import akka.remote.UniqueAddress import akka.actor.Address import scala.concurrent.Future -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.remote.RemoteActorRef import scala.concurrent.Promise import akka.Done -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom