diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 6b822f9521..cdff0acd03 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -23,7 +23,7 @@ import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization @@ -47,6 +47,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil import java.io.File +import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress /** * INTERNAL API @@ -72,7 +73,7 @@ private[akka] trait InboundContext { * An inbound stage can send reply message to the origin * address with this method. */ - def sendReply(to: Address, message: ControlMessage): Unit + def sendReply(to: Address, message: ControlMessage): Unit // FIXME rename to sendControl /** * Lookup the outbound association for a given address. @@ -112,7 +113,7 @@ private[akka] trait OutboundContext { * An outbound stage can listen to reply messages * via this observer subject. */ - def replySubject: ReplySubject + def replySubject: ReplySubject // FIXME rename to controlSubject // FIXME we should be able to Send without a recipient ActorRef def dummyRecipient: RemoteActorRef @@ -146,7 +147,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO support port 0 private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - private val systemMessageStreamId = 1 + private val systemMessageStreamId = 1 // FIXME rename to controlStreamId private val ordinaryStreamId = 3 private val taskRunner = new TaskRunner(system) @@ -241,9 +242,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } // InboundContext - override def sendReply(to: Address, message: ControlMessage) = { - send(message, None, association(to).dummyRecipient) - } + override def sendReply(to: Address, message: ControlMessage) = + association(to).outboundReplyIngress.sendControlMessage(message) override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation @@ -282,10 +282,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) } - def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, Any] = { + def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, OutboundReplyIngress] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) + .viaMat(new OutboundReplyJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner)) @@ -338,13 +339,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Source.maybe[ByteString].via(killSwitch.flow)) } + // FIXME rename to controlFlow val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = { Flow.fromSinkAndSourceMat( decoder .via(deserializer) .via(new InboundHandshake(this)) .via(new SystemMessageAcker(this)) - .viaMat(new ReplyJunction)(Keep.right) + .viaMat(new InboundReplyJunction)(Keep.right) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 1e6b8df019..1254fd2139 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -5,20 +5,20 @@ package akka.remote.artery import scala.concurrent.Future import scala.concurrent.Promise - import akka.actor.ActorRef import akka.actor.Address - import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete +import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress +import akka.stream.scaladsl.Keep /** * INTERNAL API @@ -34,6 +34,13 @@ private[akka] class Association( @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ + @volatile private[this] var _outboundReplyIngress: OutboundReplyIngress = _ + + def outboundReplyIngress: OutboundReplyIngress = { + if (_outboundReplyIngress eq null) + throw new IllegalStateException("outboundReplyIngress not initialized yet") + _outboundReplyIngress + } override def localAddress: UniqueAddress = transport.localAddress @@ -49,7 +56,7 @@ private[akka] class Association( // TODO: lookup subchannel // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { - case _: SystemMessage | _: Reply ⇒ + case _: SystemMessage ⇒ implicit val ec = materializer.executionContext systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure { case e ⇒ @@ -73,8 +80,12 @@ private[akka] class Association( if (queue eq null) queue = Source.queue(256, OverflowStrategy.dropBuffer) .to(transport.outbound(this)).run()(materializer) - if (systemMessageQueue eq null) - systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outboundSystemMessage(this)).run()(materializer) + if (systemMessageQueue eq null) { + val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) + .toMat(transport.outboundSystemMessage(this))(Keep.both) + .run()(materializer) + systemMessageQueue = q + _outboundReplyIngress = control + } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 6590f82967..5a6cdeee65 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet diff --git a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala index 676425894e..baddb3e59a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala @@ -14,6 +14,9 @@ import akka.stream.stage.GraphStageLogic import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler +import akka.remote.EndpointManager.Send +import java.util.ArrayDeque +import akka.stream.stage.CallbackWrapper /** * Marker trait for reply messages @@ -29,7 +32,9 @@ trait ControlMessage /** * INTERNAL API */ -private[akka] object ReplyJunction { +private[akka] object InboundReplyJunction { + + // FIXME rename all Reply stuff to Control or ControlMessage private[akka] trait ReplySubject { def attach(observer: ReplyObserver): Future[Done] @@ -40,24 +45,42 @@ private[akka] object ReplyJunction { private[akka] trait ReplyObserver { def reply(inboundEnvelope: InboundEnvelope): Unit } + + private[InboundReplyJunction] sealed trait CallbackMessage + private[InboundReplyJunction] final case class Attach(observer: ReplyObserver, done: Promise[Done]) + extends CallbackMessage + private[InboundReplyJunction] final case class Dettach(observer: ReplyObserver) extends CallbackMessage } /** * INTERNAL API */ -private[akka] class ReplyJunction - extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], ReplyJunction.ReplySubject] { - import ReplyJunction._ +private[akka] class InboundReplyJunction + extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundReplyJunction.ReplySubject] { + import InboundReplyJunction._ - val in: Inlet[InboundEnvelope] = Inlet("ReplyJunction.in") - val out: Outlet[InboundEnvelope] = Outlet("ReplyJunction.out") + val in: Inlet[InboundEnvelope] = Inlet("InboundReplyJunction.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundReplyJunction.out") override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ReplySubject { + val stoppedPromise = Promise[Done]() + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler { private var replyObservers: Vector[ReplyObserver] = Vector.empty - private val stoppedPromise = Promise[Done]() + + private val callback = getAsyncCallback[CallbackMessage] { + case Attach(observer, done) ⇒ + replyObservers :+= observer + done.success(Done) + case Dettach(observer) ⇒ + replyObservers = replyObservers.filterNot(_ == observer) + } + + override def preStart(): Unit = { + initCallback(callback.invoke) + } override def postStop(): Unit = stoppedPromise.success(Done) @@ -75,23 +98,95 @@ private[akka] class ReplyJunction // OutHandler override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } + + // materialized value + val replySubject: ReplySubject = new ReplySubject { override def attach(observer: ReplyObserver): Future[Done] = { val p = Promise[Done]() - getAsyncCallback[Unit](_ ⇒ { - replyObservers :+= observer - p.success(Done) - }).invoke(()) + logic.invoke(Attach(observer, p)) p.future } - override def detach(observer: ReplyObserver): Unit = { - replyObservers = replyObservers.filterNot(_ == observer) + override def detach(observer: ReplyObserver): Unit = + logic.invoke(Dettach(observer)) + + override def stopped: Future[Done] = + stoppedPromise.future + } + + (logic, replySubject) + } +} + +/** + * INTERNAL API + */ +private[akka] object OutboundReplyJunction { + trait OutboundReplyIngress { + def sendControlMessage(message: ControlMessage): Unit + } +} + +/** + * INTERNAL API + */ +private[akka] class OutboundReplyJunction(outboundContext: OutboundContext) + extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundReplyJunction.OutboundReplyIngress] { + import OutboundReplyJunction._ + val in: Inlet[Send] = Inlet("OutboundReplyJunction.in") + val out: Outlet[Send] = Outlet("OutboundReplyJunction.out") + override val shape: FlowShape[Send, Send] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler { + import OutboundReplyJunction._ + + private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) + private val buffer = new ArrayDeque[Send] + + override def preStart(): Unit = { + initCallback(sendControlMessageCallback.invoke) } - override def stopped: Future[Done] = stoppedPromise.future + // InHandler + override def onPush(): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, grab(in)) + else + buffer.offer(grab(in)) + } + + // OutHandler + override def onPull(): Unit = { + if (buffer.isEmpty && !hasBeenPulled(in)) + pull(in) + else if (!buffer.isEmpty) + push(out, buffer.poll()) + } + + private def internalSendControlMessage(message: ControlMessage): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, wrap(message)) + else + buffer.offer(wrap(message)) + } + + private def wrap(message: ControlMessage): Send = + Send(message, None, outboundContext.dummyRecipient, None) setHandlers(in, out, this) } - (logic, logic) + + // materialized value + val outboundReplyIngress = new OutboundReplyIngress { + override def sendControlMessage(message: ControlMessage): Unit = + logic.invoke(message) + } + + (logic, outboundReplyIngress) } + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 271f4a629a..f66c6ef2c3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -14,7 +14,7 @@ import scala.util.Try import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -151,15 +151,6 @@ private[akka] class SystemMessageDelivery( // InHandler override def onPush(): Unit = { grab(in) match { - case s @ Send(reply: ControlMessage, _, _, _) ⇒ - // pass through - if (isAvailable(out)) - push(out, s) - else { - // it's ok to drop the replies, but we can try - resending.offer(s) - } - case s @ Send(msg: AnyRef, _, _, _) ⇒ seqNo += 1 val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index d54cff909e..a5694a276d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -6,11 +6,11 @@ package akka.remote.artery import akka.remote.UniqueAddress import akka.actor.Address import scala.concurrent.Future -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.remote.RemoteActorRef import scala.concurrent.Promise import akka.Done -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom