diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 6b822f9521..511ef38cc3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -23,7 +23,7 @@ import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization @@ -47,6 +47,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil import java.io.File +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress /** * INTERNAL API @@ -69,10 +70,10 @@ private[akka] trait InboundContext { def localAddress: UniqueAddress /** - * An inbound stage can send reply message to the origin + * An inbound stage can send control message, e.g. a reply, to the origin * address with this method. */ - def sendReply(to: Address, message: ControlMessage): Unit + def sendControl(to: Address, message: ControlMessage): Unit /** * Lookup the outbound association for a given address. @@ -109,10 +110,10 @@ private[akka] trait OutboundContext { def completeRemoteAddress(a: UniqueAddress): Unit /** - * An outbound stage can listen to reply messages + * An outbound stage can listen to control messages * via this observer subject. */ - def replySubject: ReplySubject + def controlSubject: ControlMessageSubject // FIXME we should be able to Send without a recipient ActorRef def dummyRecipient: RemoteActorRef @@ -129,7 +130,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var _localAddress: UniqueAddress = _ override def localAddress: UniqueAddress = _localAddress @volatile private[this] var materializer: Materializer = _ - @volatile private[this] var replySubject: ReplySubject = _ + @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ @volatile private[this] var driver: MediaDriver = _ @volatile private[this] var aeron: Aeron = _ @@ -146,7 +147,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO support port 0 private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - private val systemMessageStreamId = 1 + private val controlStreamId = 1 private val ordinaryStreamId = 3 private val taskRunner = new TaskRunner(system) @@ -214,10 +215,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundFlows(): Unit = { - replySubject = Source.fromGraph(new AeronSource(inboundChannel, systemMessageStreamId, aeron, taskRunner)) + controlSubject = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner)) .async // FIXME measure .map(ByteString.apply) // TODO we should use ByteString all the way - .viaMat(inboundSystemMessageFlow)(Keep.right) + .viaMat(inboundControlFlow)(Keep.right) .to(Sink.ignore) .run()(materializer) @@ -241,9 +242,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } // InboundContext - override def sendReply(to: Address, message: ControlMessage) = { - send(message, None, association(to).dummyRecipient) - } + override def sendControl(to: Address, message: ControlMessage) = + association(to).outboundControlIngress.sendControlMessage(message) override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation @@ -262,7 +262,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R else { associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] { override def apply(remoteAddress: Address): Association = { - val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, replySubject) + val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject) newAssociation.associate() // This is a bit costly for this blocking method :( newAssociation } @@ -282,13 +282,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) } - def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, Any] = { + def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) + .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner)) + .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner)) } // TODO: Try out parallelized serialization (mapAsync) for performance @@ -338,13 +339,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Source.maybe[ByteString].via(killSwitch.flow)) } - val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = { + val inboundControlFlow: Flow[ByteString, ByteString, ControlMessageSubject] = { Flow.fromSinkAndSourceMat( decoder .via(deserializer) .via(new InboundHandshake(this)) .via(new SystemMessageAcker(this)) - .viaMat(new ReplyJunction)(Keep.right) + .viaMat(new InboundControlJunction)(Keep.right) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 1e6b8df019..323aac7266 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -5,20 +5,20 @@ package akka.remote.artery import scala.concurrent.Future import scala.concurrent.Promise - import akka.actor.ActorRef import akka.actor.Address - import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress +import akka.stream.scaladsl.Keep /** * INTERNAL API @@ -30,10 +30,17 @@ private[akka] class Association( val transport: ArteryTransport, val materializer: Materializer, override val remoteAddress: Address, - override val replySubject: ReplySubject) extends OutboundContext { + override val controlSubject: ControlMessageSubject) extends OutboundContext { @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ + @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ + + def outboundControlIngress: OutboundControlIngress = { + if (_outboundControlIngress eq null) + throw new IllegalStateException("outboundControlIngress not initialized yet") + _outboundControlIngress + } override def localAddress: UniqueAddress = transport.localAddress @@ -49,7 +56,7 @@ private[akka] class Association( // TODO: lookup subchannel // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { - case _: SystemMessage | _: Reply ⇒ + case _: SystemMessage ⇒ implicit val ec = materializer.executionContext systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure { case e ⇒ @@ -73,8 +80,12 @@ private[akka] class Association( if (queue eq null) queue = Source.queue(256, OverflowStrategy.dropBuffer) .to(transport.outbound(this)).run()(materializer) - if (systemMessageQueue eq null) - systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outboundSystemMessage(this)).run()(materializer) + if (systemMessageQueue eq null) { + val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) + .toMat(transport.outboundControl(this))(Keep.both) + .run()(materializer) + systemMessageQueue = q + _outboundControlIngress = control + } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala new file mode 100644 index 0000000000..56127d956c --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -0,0 +1,202 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.Future +import scala.concurrent.Promise +import akka.Done +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.GraphStageWithMaterializedValue +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler +import akka.remote.EndpointManager.Send +import java.util.ArrayDeque +import akka.stream.stage.CallbackWrapper + +/** + * Marker trait for reply messages + */ +trait Reply extends ControlMessage + +/** + * Marker trait for control messages that can be sent via the system message sub-channel + * but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`. + */ +trait ControlMessage + +/** + * INTERNAL API + */ +private[akka] object InboundControlJunction { + + /** + * Observer subject for inbound control messages. + * Interested observers can attach themselves to the + * subject to get notification of incoming control + * messages. + */ + private[akka] trait ControlMessageSubject { + def attach(observer: ControlMessageObserver): Future[Done] + def detach(observer: ControlMessageObserver): Unit + def stopped: Future[Done] + } + + private[akka] trait ControlMessageObserver { + + /** + * Notification of incoming control message. The message + * of the envelope is always a `ControlMessage`. + */ + def notify(inboundEnvelope: InboundEnvelope): Unit + } + + // messages for the CallbackWrapper + private[InboundControlJunction] sealed trait CallbackMessage + private[InboundControlJunction] final case class Attach(observer: ControlMessageObserver, done: Promise[Done]) + extends CallbackMessage + private[InboundControlJunction] final case class Dettach(observer: ControlMessageObserver) extends CallbackMessage +} + +/** + * INTERNAL API + */ +private[akka] class InboundControlJunction + extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundControlJunction.ControlMessageSubject] { + import InboundControlJunction._ + + val in: Inlet[InboundEnvelope] = Inlet("InboundControlJunction.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundControlJunction.out") + override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val stoppedPromise = Promise[Done]() + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler { + + private var observers: Vector[ControlMessageObserver] = Vector.empty + + private val callback = getAsyncCallback[CallbackMessage] { + case Attach(observer, done) ⇒ + observers :+= observer + done.success(Done) + case Dettach(observer) ⇒ + observers = observers.filterNot(_ == observer) + } + + override def preStart(): Unit = { + initCallback(callback.invoke) + } + + override def postStop(): Unit = stoppedPromise.success(Done) + + // InHandler + override def onPush(): Unit = { + grab(in) match { + case env @ InboundEnvelope(_, _, _: ControlMessage, _) ⇒ + observers.foreach(_.notify(env)) + pull(in) + case env ⇒ + push(out, env) + } + } + + // OutHandler + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + + // materialized value + val controlSubject: ControlMessageSubject = new ControlMessageSubject { + override def attach(observer: ControlMessageObserver): Future[Done] = { + val p = Promise[Done]() + logic.invoke(Attach(observer, p)) + p.future + } + + override def detach(observer: ControlMessageObserver): Unit = + logic.invoke(Dettach(observer)) + + override def stopped: Future[Done] = + stoppedPromise.future + } + + (logic, controlSubject) + } +} + +/** + * INTERNAL API + */ +private[akka] object OutboundControlJunction { + trait OutboundControlIngress { + def sendControlMessage(message: ControlMessage): Unit + } +} + +/** + * INTERNAL API + */ +private[akka] class OutboundControlJunction(outboundContext: OutboundContext) + extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundControlJunction.OutboundControlIngress] { + import OutboundControlJunction._ + val in: Inlet[Send] = Inlet("OutboundControlJunction.in") + val out: Outlet[Send] = Outlet("OutboundControlJunction.out") + override val shape: FlowShape[Send, Send] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler { + import OutboundControlJunction._ + + private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) + private val buffer = new ArrayDeque[Send] + + override def preStart(): Unit = { + initCallback(sendControlMessageCallback.invoke) + } + + // InHandler + override def onPush(): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, grab(in)) + else + buffer.offer(grab(in)) + } + + // OutHandler + override def onPull(): Unit = { + if (buffer.isEmpty && !hasBeenPulled(in)) + pull(in) + else if (!buffer.isEmpty) + push(out, buffer.poll()) + } + + private def internalSendControlMessage(message: ControlMessage): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, wrap(message)) + else + buffer.offer(wrap(message)) + } + + private def wrap(message: ControlMessage): Send = + Send(message, None, outboundContext.dummyRecipient, None) + + setHandlers(in, out, this) + } + + // materialized value + val outboundControlIngress = new OutboundControlIngress { + override def sendControlMessage(message: ControlMessage): Unit = + logic.invoke(message) + } + + (logic, outboundControlIngress) + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 6590f82967..5357b0ee45 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -29,7 +29,7 @@ private[akka] object OutboundHandshake { private sealed trait HandshakeState private case object Start extends HandshakeState - private case object ReplyObserverAttached extends HandshakeState + private case object ControlMessageObserverAttached extends HandshakeState private case object ReqInProgress extends HandshakeState private case object Completed extends HandshakeState @@ -46,7 +46,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { import OutboundHandshake._ private val timeout: FiniteDuration = 10.seconds // FIXME config @@ -59,13 +59,13 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends handshakeState = Completed } else { implicit val ec = materializer.executionContext - outboundContext.replySubject.attach(this).foreach { + outboundContext.controlSubject.attach(this).foreach { getAsyncCallback[Done] { _ ⇒ if (handshakeState != Completed) { if (isAvailable(out)) pushHandshakeReq() else - handshakeState = ReplyObserverAttached + handshakeState = ControlMessageObserverAttached } }.invoke } @@ -85,7 +85,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends } override def postStop(): Unit = { - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } // InHandler @@ -99,9 +99,9 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends override def onPull(): Unit = { handshakeState match { case Completed ⇒ pull(in) - case ReplyObserverAttached ⇒ + case ControlMessageObserverAttached ⇒ pushHandshakeReq() - case Start ⇒ // will push HandshakeReq when ReplyObserver is attached + case Start ⇒ // will push HandshakeReq when ControlMessageObserver is attached case ReqInProgress ⇒ // will pull when handshake reply is received } } @@ -115,7 +115,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends private def handshakeCompleted(): Unit = { handshakeState = Completed cancelTimer(HandshakeTimeout) - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } override protected def onTimer(timerKey: Any): Unit = @@ -125,8 +125,8 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends s"Handshake with [$remoteAddress] did not complete within ${timeout.toMillis} ms")) } - // ReplyObserver, external call - override def reply(inboundEnvelope: InboundEnvelope): Unit = { + // ControlMessageObserver, external call + override def notify(inboundEnvelope: InboundEnvelope): Unit = { inboundEnvelope.message match { case rsp: HandshakeRsp ⇒ if (rsp.from.address == remoteAddress) { @@ -165,7 +165,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext) extends Gra grab(in) match { case InboundEnvelope(_, _, HandshakeReq(from), _) ⇒ inboundContext.association(from.address).completeRemoteAddress(from) - inboundContext.sendReply(from.address, HandshakeRsp(inboundContext.localAddress)) + inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) pull(in) case other ⇒ push(out, other) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala deleted file mode 100644 index 676425894e..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.artery - -import scala.concurrent.Future -import scala.concurrent.Promise -import akka.Done -import akka.stream.Attributes -import akka.stream.FlowShape -import akka.stream.Inlet -import akka.stream.Outlet -import akka.stream.stage.GraphStageLogic -import akka.stream.stage.GraphStageWithMaterializedValue -import akka.stream.stage.InHandler -import akka.stream.stage.OutHandler - -/** - * Marker trait for reply messages - */ -trait Reply extends ControlMessage - -/** - * Marker trait for control messages that can be sent via the system message sub-channel - * but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`. - */ -trait ControlMessage - -/** - * INTERNAL API - */ -private[akka] object ReplyJunction { - - private[akka] trait ReplySubject { - def attach(observer: ReplyObserver): Future[Done] - def detach(observer: ReplyObserver): Unit - def stopped: Future[Done] - } - - private[akka] trait ReplyObserver { - def reply(inboundEnvelope: InboundEnvelope): Unit - } -} - -/** - * INTERNAL API - */ -private[akka] class ReplyJunction - extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], ReplyJunction.ReplySubject] { - import ReplyJunction._ - - val in: Inlet[InboundEnvelope] = Inlet("ReplyJunction.in") - val out: Outlet[InboundEnvelope] = Outlet("ReplyJunction.out") - override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ReplySubject { - - private var replyObservers: Vector[ReplyObserver] = Vector.empty - private val stoppedPromise = Promise[Done]() - - override def postStop(): Unit = stoppedPromise.success(Done) - - // InHandler - override def onPush(): Unit = { - grab(in) match { - case env @ InboundEnvelope(_, _, reply: Reply, _) ⇒ - replyObservers.foreach(_.reply(env)) - pull(in) - case env ⇒ - push(out, env) - } - } - - // OutHandler - override def onPull(): Unit = pull(in) - - override def attach(observer: ReplyObserver): Future[Done] = { - val p = Promise[Done]() - getAsyncCallback[Unit](_ ⇒ { - replyObservers :+= observer - p.success(Done) - }).invoke(()) - p.future - } - - override def detach(observer: ReplyObserver): Unit = { - replyObservers = replyObservers.filterNot(_ == observer) - } - - override def stopped: Future[Done] = stoppedPromise.future - - setHandlers(in, out, this) - } - (logic, logic) - } -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 271f4a629a..21e62ca917 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -14,7 +14,7 @@ import scala.util.Try import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -53,7 +53,7 @@ private[akka] class SystemMessageDelivery( override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { private var replyObserverAttached = false private var seqNo = 0L // sequence number for the first message will be 1 @@ -69,7 +69,7 @@ private[akka] class SystemMessageDelivery( this.schedulePeriodically(ResendTick, resendInterval) implicit val ec = materializer.executionContext - outboundContext.replySubject.attach(this).foreach { + outboundContext.controlSubject.attach(this).foreach { getAsyncCallback[Done] { _ ⇒ replyObserverAttached = true if (isAvailable(out)) @@ -77,7 +77,7 @@ private[akka] class SystemMessageDelivery( }.invoke } - outboundContext.replySubject.stopped.onComplete { + outboundContext.controlSubject.stopped.onComplete { getAsyncCallback[Try[Done]] { // FIXME quarantine case Success(_) ⇒ completeStage() @@ -87,7 +87,7 @@ private[akka] class SystemMessageDelivery( } override def postStop(): Unit = { - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } override def onUpstreamFinish(): Unit = { @@ -106,8 +106,8 @@ private[akka] class SystemMessageDelivery( } } - // ReplyObserver, external call - override def reply(inboundEnvelope: InboundEnvelope): Unit = { + // ControlMessageObserver, external call + override def notify(inboundEnvelope: InboundEnvelope): Unit = { inboundEnvelope.message match { case ack: Ack ⇒ if (ack.from.address == remoteAddress) ackCallback.invoke(ack) case nack: Nack ⇒ if (nack.from.address == remoteAddress) nackCallback.invoke(nack) @@ -151,15 +151,6 @@ private[akka] class SystemMessageDelivery( // InHandler override def onPush(): Unit = { grab(in) match { - case s @ Send(reply: ControlMessage, _, _, _) ⇒ - // pass through - if (isAvailable(out)) - push(out, s) - else { - // it's ok to drop the replies, but we can try - resending.offer(s) - } - case s @ Send(msg: AnyRef, _, _, _) ⇒ seqNo += 1 val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) @@ -210,15 +201,15 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G grab(in) match { case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _) ⇒ if (n == seqNo) { - inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) seqNo += 1 val unwrapped = env.copy(message = sysEnv.message) push(out, unwrapped) } else if (n < seqNo) { - inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) pull(in) } else { - inboundContext.sendReply(ackReplyTo.address, Nack(seqNo - 1, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Nack(seqNo - 1, localAddress)) pull(in) } case env ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 8614aaf2b8..6675ff3e9b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -58,17 +58,17 @@ object SystemMessageDeliverySpec { class ManualReplyInboundContext( replyProbe: ActorRef, localAddress: UniqueAddress, - replySubject: TestReplySubject) extends TestInboundContext(localAddress, replySubject) { + controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) { private var lastReply: Option[(Address, ControlMessage)] = None - override def sendReply(to: Address, message: ControlMessage) = { + override def sendControl(to: Address, message: ControlMessage) = { lastReply = Some((to, message)) replyProbe ! message } def deliverLastReply(): Unit = { - lastReply.foreach { case (to, message) ⇒ super.sendReply(to, message) } + lastReply.foreach { case (to, message) ⇒ super.sendControl(to, message) } lastReply = None } } @@ -147,9 +147,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when some in the middle are lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 5, resendInterval = 60.seconds, outboundContextA) @@ -181,9 +181,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when first is lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 3, resendInterval = 60.seconds, outboundContextA) @@ -210,9 +210,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when last is lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 3, resendInterval = 1.seconds, outboundContextA) @@ -239,9 +239,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "deliver all during stress and random dropping" in { val N = 10000 val dropRate = 0.1 - val replySubject = new TestReplySubject - val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new TestInboundContext(addressB, controlSubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val output = @@ -257,9 +257,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "deliver all during throttling and random dropping" in { val N = 500 val dropRate = 0.1 - val replySubject = new TestReplySubject - val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new TestInboundContext(addressB, controlSubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val output = diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index d54cff909e..a9a68e7247 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -6,31 +6,31 @@ package akka.remote.artery import akka.remote.UniqueAddress import akka.actor.Address import scala.concurrent.Future -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.RemoteActorRef import scala.concurrent.Promise import akka.Done -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom private[akka] class TestInboundContext( override val localAddress: UniqueAddress, - val replySubject: TestReplySubject = new TestReplySubject, + val controlSubject: TestControlMessageSubject = new TestControlMessageSubject, replyDropRate: Double = 0.0) extends InboundContext { private val associations = new ConcurrentHashMap[Address, OutboundContext] - def sendReply(to: Address, message: ControlMessage) = { + def sendControl(to: Address, message: ControlMessage) = { if (ThreadLocalRandom.current().nextDouble() >= replyDropRate) - replySubject.sendReply(InboundEnvelope(null, to, message, None)) + controlSubject.sendControl(InboundEnvelope(null, to, message, None)) } def association(remoteAddress: Address): OutboundContext = associations.get(remoteAddress) match { case null ⇒ - val a = new TestOutboundContext(localAddress, remoteAddress, replySubject) + val a = new TestOutboundContext(localAddress, remoteAddress, controlSubject) associations.putIfAbsent(remoteAddress, a) match { case null ⇒ a case existing ⇒ existing @@ -39,13 +39,13 @@ private[akka] class TestInboundContext( } protected def createAssociation(remoteAddress: Address): OutboundContext = - new TestOutboundContext(localAddress, remoteAddress, replySubject) + new TestOutboundContext(localAddress, remoteAddress, controlSubject) } private[akka] class TestOutboundContext( override val localAddress: UniqueAddress, override val remoteAddress: Address, - override val replySubject: TestReplySubject) extends OutboundContext { + override val controlSubject: TestControlMessageSubject) extends OutboundContext { private val _uniqueRemoteAddress = Promise[UniqueAddress]() def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future @@ -56,24 +56,24 @@ private[akka] class TestOutboundContext( } -private[akka] class TestReplySubject extends ReplySubject { +private[akka] class TestControlMessageSubject extends ControlMessageSubject { - private var replyObservers = new CopyOnWriteArrayList[ReplyObserver] + private var observers = new CopyOnWriteArrayList[ControlMessageObserver] - override def attach(observer: ReplyObserver): Future[Done] = { - replyObservers.add(observer) + override def attach(observer: ControlMessageObserver): Future[Done] = { + observers.add(observer) Future.successful(Done) } - override def detach(observer: ReplyObserver): Unit = { - replyObservers.remove(observer) + override def detach(observer: ControlMessageObserver): Unit = { + observers.remove(observer) } override def stopped: Future[Done] = Promise[Done]().future - def sendReply(env: InboundEnvelope): Unit = { - val iter = replyObservers.iterator() + def sendControl(env: InboundEnvelope): Unit = { + val iter = observers.iterator() while (iter.hasNext()) - iter.next().reply(env) + iter.next().notify(env) } }