From 87386e18cfa3e380478290b2bcf84ea46c28bb5d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 May 2016 15:55:06 +0200 Subject: [PATCH 1/2] inject outgoing control messages at lower level * instead of sending the the control messages (e.g. handshake reply, sys msg ack) via the normal message send ingress point they are now injected via side channel and will therefore not go through higher level stages such as handshake and system message delivery --- .../akka/remote/artery/ArteryTransport.scala | 20 +-- .../akka/remote/artery/Association.scala | 25 +++- .../scala/akka/remote/artery/Handshake.scala | 2 +- .../main/scala/akka/remote/artery/Reply.scala | 127 +++++++++++++++--- .../remote/artery/SystemMessageDelivery.scala | 11 +- .../akka/remote/artery/TestContext.scala | 4 +- 6 files changed, 144 insertions(+), 45 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 6b822f9521..cdff0acd03 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -23,7 +23,7 @@ import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization @@ -47,6 +47,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil import java.io.File +import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress /** * INTERNAL API @@ -72,7 +73,7 @@ private[akka] trait InboundContext { * An inbound stage can send reply message to the origin * address with this method. */ - def sendReply(to: Address, message: ControlMessage): Unit + def sendReply(to: Address, message: ControlMessage): Unit // FIXME rename to sendControl /** * Lookup the outbound association for a given address. @@ -112,7 +113,7 @@ private[akka] trait OutboundContext { * An outbound stage can listen to reply messages * via this observer subject. */ - def replySubject: ReplySubject + def replySubject: ReplySubject // FIXME rename to controlSubject // FIXME we should be able to Send without a recipient ActorRef def dummyRecipient: RemoteActorRef @@ -146,7 +147,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO support port 0 private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - private val systemMessageStreamId = 1 + private val systemMessageStreamId = 1 // FIXME rename to controlStreamId private val ordinaryStreamId = 3 private val taskRunner = new TaskRunner(system) @@ -241,9 +242,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } // InboundContext - override def sendReply(to: Address, message: ControlMessage) = { - send(message, None, association(to).dummyRecipient) - } + override def sendReply(to: Address, message: ControlMessage) = + association(to).outboundReplyIngress.sendControlMessage(message) override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation @@ -282,10 +282,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) } - def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, Any] = { + def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, OutboundReplyIngress] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) + .viaMat(new OutboundReplyJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner)) @@ -338,13 +339,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Source.maybe[ByteString].via(killSwitch.flow)) } + // FIXME rename to controlFlow val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = { Flow.fromSinkAndSourceMat( decoder .via(deserializer) .via(new InboundHandshake(this)) .via(new SystemMessageAcker(this)) - .viaMat(new ReplyJunction)(Keep.right) + .viaMat(new InboundReplyJunction)(Keep.right) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 1e6b8df019..1254fd2139 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -5,20 +5,20 @@ package akka.remote.artery import scala.concurrent.Future import scala.concurrent.Promise - import akka.actor.ActorRef import akka.actor.Address - import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete +import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress +import akka.stream.scaladsl.Keep /** * INTERNAL API @@ -34,6 +34,13 @@ private[akka] class Association( @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ + @volatile private[this] var _outboundReplyIngress: OutboundReplyIngress = _ + + def outboundReplyIngress: OutboundReplyIngress = { + if (_outboundReplyIngress eq null) + throw new IllegalStateException("outboundReplyIngress not initialized yet") + _outboundReplyIngress + } override def localAddress: UniqueAddress = transport.localAddress @@ -49,7 +56,7 @@ private[akka] class Association( // TODO: lookup subchannel // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { - case _: SystemMessage | _: Reply ⇒ + case _: SystemMessage ⇒ implicit val ec = materializer.executionContext systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure { case e ⇒ @@ -73,8 +80,12 @@ private[akka] class Association( if (queue eq null) queue = Source.queue(256, OverflowStrategy.dropBuffer) .to(transport.outbound(this)).run()(materializer) - if (systemMessageQueue eq null) - systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outboundSystemMessage(this)).run()(materializer) + if (systemMessageQueue eq null) { + val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) + .toMat(transport.outboundSystemMessage(this))(Keep.both) + .run()(materializer) + systemMessageQueue = q + _outboundReplyIngress = control + } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 6590f82967..5a6cdeee65 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet diff --git a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala index 676425894e..baddb3e59a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala @@ -14,6 +14,9 @@ import akka.stream.stage.GraphStageLogic import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler +import akka.remote.EndpointManager.Send +import java.util.ArrayDeque +import akka.stream.stage.CallbackWrapper /** * Marker trait for reply messages @@ -29,7 +32,9 @@ trait ControlMessage /** * INTERNAL API */ -private[akka] object ReplyJunction { +private[akka] object InboundReplyJunction { + + // FIXME rename all Reply stuff to Control or ControlMessage private[akka] trait ReplySubject { def attach(observer: ReplyObserver): Future[Done] @@ -40,24 +45,42 @@ private[akka] object ReplyJunction { private[akka] trait ReplyObserver { def reply(inboundEnvelope: InboundEnvelope): Unit } + + private[InboundReplyJunction] sealed trait CallbackMessage + private[InboundReplyJunction] final case class Attach(observer: ReplyObserver, done: Promise[Done]) + extends CallbackMessage + private[InboundReplyJunction] final case class Dettach(observer: ReplyObserver) extends CallbackMessage } /** * INTERNAL API */ -private[akka] class ReplyJunction - extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], ReplyJunction.ReplySubject] { - import ReplyJunction._ +private[akka] class InboundReplyJunction + extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundReplyJunction.ReplySubject] { + import InboundReplyJunction._ - val in: Inlet[InboundEnvelope] = Inlet("ReplyJunction.in") - val out: Outlet[InboundEnvelope] = Outlet("ReplyJunction.out") + val in: Inlet[InboundEnvelope] = Inlet("InboundReplyJunction.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundReplyJunction.out") override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ReplySubject { + val stoppedPromise = Promise[Done]() + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler { private var replyObservers: Vector[ReplyObserver] = Vector.empty - private val stoppedPromise = Promise[Done]() + + private val callback = getAsyncCallback[CallbackMessage] { + case Attach(observer, done) ⇒ + replyObservers :+= observer + done.success(Done) + case Dettach(observer) ⇒ + replyObservers = replyObservers.filterNot(_ == observer) + } + + override def preStart(): Unit = { + initCallback(callback.invoke) + } override def postStop(): Unit = stoppedPromise.success(Done) @@ -75,23 +98,95 @@ private[akka] class ReplyJunction // OutHandler override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } + + // materialized value + val replySubject: ReplySubject = new ReplySubject { override def attach(observer: ReplyObserver): Future[Done] = { val p = Promise[Done]() - getAsyncCallback[Unit](_ ⇒ { - replyObservers :+= observer - p.success(Done) - }).invoke(()) + logic.invoke(Attach(observer, p)) p.future } - override def detach(observer: ReplyObserver): Unit = { - replyObservers = replyObservers.filterNot(_ == observer) + override def detach(observer: ReplyObserver): Unit = + logic.invoke(Dettach(observer)) + + override def stopped: Future[Done] = + stoppedPromise.future + } + + (logic, replySubject) + } +} + +/** + * INTERNAL API + */ +private[akka] object OutboundReplyJunction { + trait OutboundReplyIngress { + def sendControlMessage(message: ControlMessage): Unit + } +} + +/** + * INTERNAL API + */ +private[akka] class OutboundReplyJunction(outboundContext: OutboundContext) + extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundReplyJunction.OutboundReplyIngress] { + import OutboundReplyJunction._ + val in: Inlet[Send] = Inlet("OutboundReplyJunction.in") + val out: Outlet[Send] = Outlet("OutboundReplyJunction.out") + override val shape: FlowShape[Send, Send] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler { + import OutboundReplyJunction._ + + private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) + private val buffer = new ArrayDeque[Send] + + override def preStart(): Unit = { + initCallback(sendControlMessageCallback.invoke) } - override def stopped: Future[Done] = stoppedPromise.future + // InHandler + override def onPush(): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, grab(in)) + else + buffer.offer(grab(in)) + } + + // OutHandler + override def onPull(): Unit = { + if (buffer.isEmpty && !hasBeenPulled(in)) + pull(in) + else if (!buffer.isEmpty) + push(out, buffer.poll()) + } + + private def internalSendControlMessage(message: ControlMessage): Unit = { + if (buffer.isEmpty && isAvailable(out)) + push(out, wrap(message)) + else + buffer.offer(wrap(message)) + } + + private def wrap(message: ControlMessage): Send = + Send(message, None, outboundContext.dummyRecipient, None) setHandlers(in, out, this) } - (logic, logic) + + // materialized value + val outboundReplyIngress = new OutboundReplyIngress { + override def sendControlMessage(message: ControlMessage): Unit = + logic.invoke(message) + } + + (logic, outboundReplyIngress) } + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 271f4a629a..f66c6ef2c3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -14,7 +14,7 @@ import scala.util.Try import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -151,15 +151,6 @@ private[akka] class SystemMessageDelivery( // InHandler override def onPush(): Unit = { grab(in) match { - case s @ Send(reply: ControlMessage, _, _, _) ⇒ - // pass through - if (isAvailable(out)) - push(out, s) - else { - // it's ok to drop the replies, but we can try - resending.offer(s) - } - case s @ Send(msg: AnyRef, _, _, _) ⇒ seqNo += 1 val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index d54cff909e..a5694a276d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -6,11 +6,11 @@ package akka.remote.artery import akka.remote.UniqueAddress import akka.actor.Address import scala.concurrent.Future -import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.artery.InboundReplyJunction.ReplySubject import akka.remote.RemoteActorRef import scala.concurrent.Promise import akka.Done -import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.remote.artery.InboundReplyJunction.ReplyObserver import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom From 8a04b6d05a0ae71866e6ee259815e881b2a70bbb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 12 May 2016 08:56:28 +0200 Subject: [PATCH 2/2] rename reply to control --- .../akka/remote/artery/ArteryTransport.scala | 37 +++++---- .../akka/remote/artery/Association.scala | 20 ++--- .../artery/{Reply.scala => Control.scala} | 80 +++++++++++-------- .../scala/akka/remote/artery/Handshake.scala | 24 +++--- .../remote/artery/SystemMessageDelivery.scala | 20 ++--- .../artery/SystemMessageDeliverySpec.scala | 36 ++++----- .../akka/remote/artery/TestContext.scala | 34 ++++---- 7 files changed, 130 insertions(+), 121 deletions(-) rename akka-remote/src/main/scala/akka/remote/artery/{Reply.scala => Control.scala} (63%) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index cdff0acd03..511ef38cc3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -23,7 +23,7 @@ import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization @@ -47,7 +47,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil import java.io.File -import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress /** * INTERNAL API @@ -70,10 +70,10 @@ private[akka] trait InboundContext { def localAddress: UniqueAddress /** - * An inbound stage can send reply message to the origin + * An inbound stage can send control message, e.g. a reply, to the origin * address with this method. */ - def sendReply(to: Address, message: ControlMessage): Unit // FIXME rename to sendControl + def sendControl(to: Address, message: ControlMessage): Unit /** * Lookup the outbound association for a given address. @@ -110,10 +110,10 @@ private[akka] trait OutboundContext { def completeRemoteAddress(a: UniqueAddress): Unit /** - * An outbound stage can listen to reply messages + * An outbound stage can listen to control messages * via this observer subject. */ - def replySubject: ReplySubject // FIXME rename to controlSubject + def controlSubject: ControlMessageSubject // FIXME we should be able to Send without a recipient ActorRef def dummyRecipient: RemoteActorRef @@ -130,7 +130,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var _localAddress: UniqueAddress = _ override def localAddress: UniqueAddress = _localAddress @volatile private[this] var materializer: Materializer = _ - @volatile private[this] var replySubject: ReplySubject = _ + @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ @volatile private[this] var driver: MediaDriver = _ @volatile private[this] var aeron: Aeron = _ @@ -147,7 +147,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO support port 0 private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - private val systemMessageStreamId = 1 // FIXME rename to controlStreamId + private val controlStreamId = 1 private val ordinaryStreamId = 3 private val taskRunner = new TaskRunner(system) @@ -215,10 +215,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundFlows(): Unit = { - replySubject = Source.fromGraph(new AeronSource(inboundChannel, systemMessageStreamId, aeron, taskRunner)) + controlSubject = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner)) .async // FIXME measure .map(ByteString.apply) // TODO we should use ByteString all the way - .viaMat(inboundSystemMessageFlow)(Keep.right) + .viaMat(inboundControlFlow)(Keep.right) .to(Sink.ignore) .run()(materializer) @@ -242,8 +242,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } // InboundContext - override def sendReply(to: Address, message: ControlMessage) = - association(to).outboundReplyIngress.sendControlMessage(message) + override def sendControl(to: Address, message: ControlMessage) = + association(to).outboundControlIngress.sendControlMessage(message) override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation @@ -262,7 +262,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R else { associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] { override def apply(remoteAddress: Address): Association = { - val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, replySubject) + val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject) newAssociation.associate() // This is a bit costly for this blocking method :( newAssociation } @@ -282,14 +282,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) } - def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, OutboundReplyIngress] = { + def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) - .viaMat(new OutboundReplyJunction(outboundContext))(Keep.right) + .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner)) + .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner)) } // TODO: Try out parallelized serialization (mapAsync) for performance @@ -339,14 +339,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Source.maybe[ByteString].via(killSwitch.flow)) } - // FIXME rename to controlFlow - val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = { + val inboundControlFlow: Flow[ByteString, ByteString, ControlMessageSubject] = { Flow.fromSinkAndSourceMat( decoder .via(deserializer) .via(new InboundHandshake(this)) .via(new SystemMessageAcker(this)) - .viaMat(new InboundReplyJunction)(Keep.right) + .viaMat(new InboundControlJunction)(Keep.right) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 1254fd2139..323aac7266 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -12,12 +12,12 @@ import akka.dispatch.sysmsg.SystemMessage import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete -import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.stream.scaladsl.Keep /** @@ -30,16 +30,16 @@ private[akka] class Association( val transport: ArteryTransport, val materializer: Materializer, override val remoteAddress: Address, - override val replySubject: ReplySubject) extends OutboundContext { + override val controlSubject: ControlMessageSubject) extends OutboundContext { @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ - @volatile private[this] var _outboundReplyIngress: OutboundReplyIngress = _ + @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ - def outboundReplyIngress: OutboundReplyIngress = { - if (_outboundReplyIngress eq null) - throw new IllegalStateException("outboundReplyIngress not initialized yet") - _outboundReplyIngress + def outboundControlIngress: OutboundControlIngress = { + if (_outboundControlIngress eq null) + throw new IllegalStateException("outboundControlIngress not initialized yet") + _outboundControlIngress } override def localAddress: UniqueAddress = transport.localAddress @@ -82,10 +82,10 @@ private[akka] class Association( .to(transport.outbound(this)).run()(materializer) if (systemMessageQueue eq null) { val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) - .toMat(transport.outboundSystemMessage(this))(Keep.both) + .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) systemMessageQueue = q - _outboundReplyIngress = control + _outboundControlIngress = control } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala similarity index 63% rename from akka-remote/src/main/scala/akka/remote/artery/Reply.scala rename to akka-remote/src/main/scala/akka/remote/artery/Control.scala index baddb3e59a..56127d956c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -32,35 +32,45 @@ trait ControlMessage /** * INTERNAL API */ -private[akka] object InboundReplyJunction { +private[akka] object InboundControlJunction { - // FIXME rename all Reply stuff to Control or ControlMessage - - private[akka] trait ReplySubject { - def attach(observer: ReplyObserver): Future[Done] - def detach(observer: ReplyObserver): Unit + /** + * Observer subject for inbound control messages. + * Interested observers can attach themselves to the + * subject to get notification of incoming control + * messages. + */ + private[akka] trait ControlMessageSubject { + def attach(observer: ControlMessageObserver): Future[Done] + def detach(observer: ControlMessageObserver): Unit def stopped: Future[Done] } - private[akka] trait ReplyObserver { - def reply(inboundEnvelope: InboundEnvelope): Unit + private[akka] trait ControlMessageObserver { + + /** + * Notification of incoming control message. The message + * of the envelope is always a `ControlMessage`. + */ + def notify(inboundEnvelope: InboundEnvelope): Unit } - private[InboundReplyJunction] sealed trait CallbackMessage - private[InboundReplyJunction] final case class Attach(observer: ReplyObserver, done: Promise[Done]) + // messages for the CallbackWrapper + private[InboundControlJunction] sealed trait CallbackMessage + private[InboundControlJunction] final case class Attach(observer: ControlMessageObserver, done: Promise[Done]) extends CallbackMessage - private[InboundReplyJunction] final case class Dettach(observer: ReplyObserver) extends CallbackMessage + private[InboundControlJunction] final case class Dettach(observer: ControlMessageObserver) extends CallbackMessage } /** * INTERNAL API */ -private[akka] class InboundReplyJunction - extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundReplyJunction.ReplySubject] { - import InboundReplyJunction._ +private[akka] class InboundControlJunction + extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundControlJunction.ControlMessageSubject] { + import InboundControlJunction._ - val in: Inlet[InboundEnvelope] = Inlet("InboundReplyJunction.in") - val out: Outlet[InboundEnvelope] = Outlet("InboundReplyJunction.out") + val in: Inlet[InboundEnvelope] = Inlet("InboundControlJunction.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundControlJunction.out") override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { @@ -68,14 +78,14 @@ private[akka] class InboundReplyJunction // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler { - private var replyObservers: Vector[ReplyObserver] = Vector.empty + private var observers: Vector[ControlMessageObserver] = Vector.empty private val callback = getAsyncCallback[CallbackMessage] { case Attach(observer, done) ⇒ - replyObservers :+= observer + observers :+= observer done.success(Done) case Dettach(observer) ⇒ - replyObservers = replyObservers.filterNot(_ == observer) + observers = observers.filterNot(_ == observer) } override def preStart(): Unit = { @@ -87,8 +97,8 @@ private[akka] class InboundReplyJunction // InHandler override def onPush(): Unit = { grab(in) match { - case env @ InboundEnvelope(_, _, reply: Reply, _) ⇒ - replyObservers.foreach(_.reply(env)) + case env @ InboundEnvelope(_, _, _: ControlMessage, _) ⇒ + observers.foreach(_.notify(env)) pull(in) case env ⇒ push(out, env) @@ -102,29 +112,29 @@ private[akka] class InboundReplyJunction } // materialized value - val replySubject: ReplySubject = new ReplySubject { - override def attach(observer: ReplyObserver): Future[Done] = { + val controlSubject: ControlMessageSubject = new ControlMessageSubject { + override def attach(observer: ControlMessageObserver): Future[Done] = { val p = Promise[Done]() logic.invoke(Attach(observer, p)) p.future } - override def detach(observer: ReplyObserver): Unit = + override def detach(observer: ControlMessageObserver): Unit = logic.invoke(Dettach(observer)) override def stopped: Future[Done] = stoppedPromise.future } - (logic, replySubject) + (logic, controlSubject) } } /** * INTERNAL API */ -private[akka] object OutboundReplyJunction { - trait OutboundReplyIngress { +private[akka] object OutboundControlJunction { + trait OutboundControlIngress { def sendControlMessage(message: ControlMessage): Unit } } @@ -132,17 +142,17 @@ private[akka] object OutboundReplyJunction { /** * INTERNAL API */ -private[akka] class OutboundReplyJunction(outboundContext: OutboundContext) - extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundReplyJunction.OutboundReplyIngress] { - import OutboundReplyJunction._ - val in: Inlet[Send] = Inlet("OutboundReplyJunction.in") - val out: Outlet[Send] = Outlet("OutboundReplyJunction.out") +private[akka] class OutboundControlJunction(outboundContext: OutboundContext) + extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundControlJunction.OutboundControlIngress] { + import OutboundControlJunction._ + val in: Inlet[Send] = Inlet("OutboundControlJunction.in") + val out: Outlet[Send] = Outlet("OutboundControlJunction.out") override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler { - import OutboundReplyJunction._ + import OutboundControlJunction._ private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) private val buffer = new ArrayDeque[Send] @@ -181,12 +191,12 @@ private[akka] class OutboundReplyJunction(outboundContext: OutboundContext) } // materialized value - val outboundReplyIngress = new OutboundReplyIngress { + val outboundControlIngress = new OutboundControlIngress { override def sendControlMessage(message: ControlMessage): Unit = logic.invoke(message) } - (logic, outboundReplyIngress) + (logic, outboundControlIngress) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 5a6cdeee65..5357b0ee45 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -29,7 +29,7 @@ private[akka] object OutboundHandshake { private sealed trait HandshakeState private case object Start extends HandshakeState - private case object ReplyObserverAttached extends HandshakeState + private case object ControlMessageObserverAttached extends HandshakeState private case object ReqInProgress extends HandshakeState private case object Completed extends HandshakeState @@ -46,7 +46,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { import OutboundHandshake._ private val timeout: FiniteDuration = 10.seconds // FIXME config @@ -59,13 +59,13 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends handshakeState = Completed } else { implicit val ec = materializer.executionContext - outboundContext.replySubject.attach(this).foreach { + outboundContext.controlSubject.attach(this).foreach { getAsyncCallback[Done] { _ ⇒ if (handshakeState != Completed) { if (isAvailable(out)) pushHandshakeReq() else - handshakeState = ReplyObserverAttached + handshakeState = ControlMessageObserverAttached } }.invoke } @@ -85,7 +85,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends } override def postStop(): Unit = { - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } // InHandler @@ -99,9 +99,9 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends override def onPull(): Unit = { handshakeState match { case Completed ⇒ pull(in) - case ReplyObserverAttached ⇒ + case ControlMessageObserverAttached ⇒ pushHandshakeReq() - case Start ⇒ // will push HandshakeReq when ReplyObserver is attached + case Start ⇒ // will push HandshakeReq when ControlMessageObserver is attached case ReqInProgress ⇒ // will pull when handshake reply is received } } @@ -115,7 +115,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends private def handshakeCompleted(): Unit = { handshakeState = Completed cancelTimer(HandshakeTimeout) - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } override protected def onTimer(timerKey: Any): Unit = @@ -125,8 +125,8 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends s"Handshake with [$remoteAddress] did not complete within ${timeout.toMillis} ms")) } - // ReplyObserver, external call - override def reply(inboundEnvelope: InboundEnvelope): Unit = { + // ControlMessageObserver, external call + override def notify(inboundEnvelope: InboundEnvelope): Unit = { inboundEnvelope.message match { case rsp: HandshakeRsp ⇒ if (rsp.from.address == remoteAddress) { @@ -165,7 +165,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext) extends Gra grab(in) match { case InboundEnvelope(_, _, HandshakeReq(from), _) ⇒ inboundContext.association(from.address).completeRemoteAddress(from) - inboundContext.sendReply(from.address, HandshakeRsp(inboundContext.localAddress)) + inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) pull(in) case other ⇒ push(out, other) diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index f66c6ef2c3..21e62ca917 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -14,7 +14,7 @@ import scala.util.Try import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -53,7 +53,7 @@ private[akka] class SystemMessageDelivery( override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { private var replyObserverAttached = false private var seqNo = 0L // sequence number for the first message will be 1 @@ -69,7 +69,7 @@ private[akka] class SystemMessageDelivery( this.schedulePeriodically(ResendTick, resendInterval) implicit val ec = materializer.executionContext - outboundContext.replySubject.attach(this).foreach { + outboundContext.controlSubject.attach(this).foreach { getAsyncCallback[Done] { _ ⇒ replyObserverAttached = true if (isAvailable(out)) @@ -77,7 +77,7 @@ private[akka] class SystemMessageDelivery( }.invoke } - outboundContext.replySubject.stopped.onComplete { + outboundContext.controlSubject.stopped.onComplete { getAsyncCallback[Try[Done]] { // FIXME quarantine case Success(_) ⇒ completeStage() @@ -87,7 +87,7 @@ private[akka] class SystemMessageDelivery( } override def postStop(): Unit = { - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } override def onUpstreamFinish(): Unit = { @@ -106,8 +106,8 @@ private[akka] class SystemMessageDelivery( } } - // ReplyObserver, external call - override def reply(inboundEnvelope: InboundEnvelope): Unit = { + // ControlMessageObserver, external call + override def notify(inboundEnvelope: InboundEnvelope): Unit = { inboundEnvelope.message match { case ack: Ack ⇒ if (ack.from.address == remoteAddress) ackCallback.invoke(ack) case nack: Nack ⇒ if (nack.from.address == remoteAddress) nackCallback.invoke(nack) @@ -201,15 +201,15 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G grab(in) match { case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _) ⇒ if (n == seqNo) { - inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) seqNo += 1 val unwrapped = env.copy(message = sysEnv.message) push(out, unwrapped) } else if (n < seqNo) { - inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) pull(in) } else { - inboundContext.sendReply(ackReplyTo.address, Nack(seqNo - 1, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Nack(seqNo - 1, localAddress)) pull(in) } case env ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 8614aaf2b8..6675ff3e9b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -58,17 +58,17 @@ object SystemMessageDeliverySpec { class ManualReplyInboundContext( replyProbe: ActorRef, localAddress: UniqueAddress, - replySubject: TestReplySubject) extends TestInboundContext(localAddress, replySubject) { + controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) { private var lastReply: Option[(Address, ControlMessage)] = None - override def sendReply(to: Address, message: ControlMessage) = { + override def sendControl(to: Address, message: ControlMessage) = { lastReply = Some((to, message)) replyProbe ! message } def deliverLastReply(): Unit = { - lastReply.foreach { case (to, message) ⇒ super.sendReply(to, message) } + lastReply.foreach { case (to, message) ⇒ super.sendControl(to, message) } lastReply = None } } @@ -147,9 +147,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when some in the middle are lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 5, resendInterval = 60.seconds, outboundContextA) @@ -181,9 +181,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when first is lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 3, resendInterval = 60.seconds, outboundContextA) @@ -210,9 +210,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when last is lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 3, resendInterval = 1.seconds, outboundContextA) @@ -239,9 +239,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "deliver all during stress and random dropping" in { val N = 10000 val dropRate = 0.1 - val replySubject = new TestReplySubject - val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new TestInboundContext(addressB, controlSubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val output = @@ -257,9 +257,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "deliver all during throttling and random dropping" in { val N = 500 val dropRate = 0.1 - val replySubject = new TestReplySubject - val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new TestInboundContext(addressB, controlSubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val output = diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index a5694a276d..a9a68e7247 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -6,31 +6,31 @@ package akka.remote.artery import akka.remote.UniqueAddress import akka.actor.Address import scala.concurrent.Future -import akka.remote.artery.InboundReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.RemoteActorRef import scala.concurrent.Promise import akka.Done -import akka.remote.artery.InboundReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom private[akka] class TestInboundContext( override val localAddress: UniqueAddress, - val replySubject: TestReplySubject = new TestReplySubject, + val controlSubject: TestControlMessageSubject = new TestControlMessageSubject, replyDropRate: Double = 0.0) extends InboundContext { private val associations = new ConcurrentHashMap[Address, OutboundContext] - def sendReply(to: Address, message: ControlMessage) = { + def sendControl(to: Address, message: ControlMessage) = { if (ThreadLocalRandom.current().nextDouble() >= replyDropRate) - replySubject.sendReply(InboundEnvelope(null, to, message, None)) + controlSubject.sendControl(InboundEnvelope(null, to, message, None)) } def association(remoteAddress: Address): OutboundContext = associations.get(remoteAddress) match { case null ⇒ - val a = new TestOutboundContext(localAddress, remoteAddress, replySubject) + val a = new TestOutboundContext(localAddress, remoteAddress, controlSubject) associations.putIfAbsent(remoteAddress, a) match { case null ⇒ a case existing ⇒ existing @@ -39,13 +39,13 @@ private[akka] class TestInboundContext( } protected def createAssociation(remoteAddress: Address): OutboundContext = - new TestOutboundContext(localAddress, remoteAddress, replySubject) + new TestOutboundContext(localAddress, remoteAddress, controlSubject) } private[akka] class TestOutboundContext( override val localAddress: UniqueAddress, override val remoteAddress: Address, - override val replySubject: TestReplySubject) extends OutboundContext { + override val controlSubject: TestControlMessageSubject) extends OutboundContext { private val _uniqueRemoteAddress = Promise[UniqueAddress]() def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future @@ -56,24 +56,24 @@ private[akka] class TestOutboundContext( } -private[akka] class TestReplySubject extends ReplySubject { +private[akka] class TestControlMessageSubject extends ControlMessageSubject { - private var replyObservers = new CopyOnWriteArrayList[ReplyObserver] + private var observers = new CopyOnWriteArrayList[ControlMessageObserver] - override def attach(observer: ReplyObserver): Future[Done] = { - replyObservers.add(observer) + override def attach(observer: ControlMessageObserver): Future[Done] = { + observers.add(observer) Future.successful(Done) } - override def detach(observer: ReplyObserver): Unit = { - replyObservers.remove(observer) + override def detach(observer: ControlMessageObserver): Unit = { + observers.remove(observer) } override def stopped: Future[Done] = Promise[Done]().future - def sendReply(env: InboundEnvelope): Unit = { - val iter = replyObservers.iterator() + def sendControl(env: InboundEnvelope): Unit = { + val iter = observers.iterator() while (iter.hasNext()) - iter.next().reply(env) + iter.next().notify(env) } }