diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index cdff0acd03..511ef38cc3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -23,7 +23,7 @@ import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization @@ -47,7 +47,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil import java.io.File -import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress /** * INTERNAL API @@ -70,10 +70,10 @@ private[akka] trait InboundContext { def localAddress: UniqueAddress /** - * An inbound stage can send reply message to the origin + * An inbound stage can send control message, e.g. a reply, to the origin * address with this method. */ - def sendReply(to: Address, message: ControlMessage): Unit // FIXME rename to sendControl + def sendControl(to: Address, message: ControlMessage): Unit /** * Lookup the outbound association for a given address. @@ -110,10 +110,10 @@ private[akka] trait OutboundContext { def completeRemoteAddress(a: UniqueAddress): Unit /** - * An outbound stage can listen to reply messages + * An outbound stage can listen to control messages * via this observer subject. */ - def replySubject: ReplySubject // FIXME rename to controlSubject + def controlSubject: ControlMessageSubject // FIXME we should be able to Send without a recipient ActorRef def dummyRecipient: RemoteActorRef @@ -130,7 +130,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var _localAddress: UniqueAddress = _ override def localAddress: UniqueAddress = _localAddress @volatile private[this] var materializer: Materializer = _ - @volatile private[this] var replySubject: ReplySubject = _ + @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ @volatile private[this] var driver: MediaDriver = _ @volatile private[this] var aeron: Aeron = _ @@ -147,7 +147,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO support port 0 private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - private val systemMessageStreamId = 1 // FIXME rename to controlStreamId + private val controlStreamId = 1 private val ordinaryStreamId = 3 private val taskRunner = new TaskRunner(system) @@ -215,10 +215,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundFlows(): Unit = { - replySubject = Source.fromGraph(new AeronSource(inboundChannel, systemMessageStreamId, aeron, taskRunner)) + controlSubject = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner)) .async // FIXME measure .map(ByteString.apply) // TODO we should use ByteString all the way - .viaMat(inboundSystemMessageFlow)(Keep.right) + .viaMat(inboundControlFlow)(Keep.right) .to(Sink.ignore) .run()(materializer) @@ -242,8 +242,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } // InboundContext - override def sendReply(to: Address, message: ControlMessage) = - association(to).outboundReplyIngress.sendControlMessage(message) + override def sendControl(to: Address, message: ControlMessage) = + association(to).outboundControlIngress.sendControlMessage(message) override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation @@ -262,7 +262,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R else { associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] { override def apply(remoteAddress: Address): Association = { - val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, replySubject) + val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject) newAssociation.associate() // This is a bit costly for this blocking method :( newAssociation } @@ -282,14 +282,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) } - def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, OutboundReplyIngress] = { + def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) - .viaMat(new OutboundReplyJunction(outboundContext))(Keep.right) + .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner)) + .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner)) } // TODO: Try out parallelized serialization (mapAsync) for performance @@ -339,14 +339,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Source.maybe[ByteString].via(killSwitch.flow)) } - // FIXME rename to controlFlow - val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = { + val inboundControlFlow: Flow[ByteString, ByteString, ControlMessageSubject] = { Flow.fromSinkAndSourceMat( decoder .via(deserializer) .via(new InboundHandshake(this)) .via(new SystemMessageAcker(this)) - .viaMat(new InboundReplyJunction)(Keep.right) + .viaMat(new InboundControlJunction)(Keep.right) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 1254fd2139..323aac7266 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -12,12 +12,12 @@ import akka.dispatch.sysmsg.SystemMessage import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete -import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.stream.scaladsl.Keep /** @@ -30,16 +30,16 @@ private[akka] class Association( val transport: ArteryTransport, val materializer: Materializer, override val remoteAddress: Address, - override val replySubject: ReplySubject) extends OutboundContext { + override val controlSubject: ControlMessageSubject) extends OutboundContext { @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ - @volatile private[this] var _outboundReplyIngress: OutboundReplyIngress = _ + @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ - def outboundReplyIngress: OutboundReplyIngress = { - if (_outboundReplyIngress eq null) - throw new IllegalStateException("outboundReplyIngress not initialized yet") - _outboundReplyIngress + def outboundControlIngress: OutboundControlIngress = { + if (_outboundControlIngress eq null) + throw new IllegalStateException("outboundControlIngress not initialized yet") + _outboundControlIngress } override def localAddress: UniqueAddress = transport.localAddress @@ -82,10 +82,10 @@ private[akka] class Association( .to(transport.outbound(this)).run()(materializer) if (systemMessageQueue eq null) { val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) - .toMat(transport.outboundSystemMessage(this))(Keep.both) + .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) systemMessageQueue = q - _outboundReplyIngress = control + _outboundControlIngress = control } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala similarity index 63% rename from akka-remote/src/main/scala/akka/remote/artery/Reply.scala rename to akka-remote/src/main/scala/akka/remote/artery/Control.scala index baddb3e59a..56127d956c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -32,35 +32,45 @@ trait ControlMessage /** * INTERNAL API */ -private[akka] object InboundReplyJunction { +private[akka] object InboundControlJunction { - // FIXME rename all Reply stuff to Control or ControlMessage - - private[akka] trait ReplySubject { - def attach(observer: ReplyObserver): Future[Done] - def detach(observer: ReplyObserver): Unit + /** + * Observer subject for inbound control messages. + * Interested observers can attach themselves to the + * subject to get notification of incoming control + * messages. + */ + private[akka] trait ControlMessageSubject { + def attach(observer: ControlMessageObserver): Future[Done] + def detach(observer: ControlMessageObserver): Unit def stopped: Future[Done] } - private[akka] trait ReplyObserver { - def reply(inboundEnvelope: InboundEnvelope): Unit + private[akka] trait ControlMessageObserver { + + /** + * Notification of incoming control message. The message + * of the envelope is always a `ControlMessage`. + */ + def notify(inboundEnvelope: InboundEnvelope): Unit } - private[InboundReplyJunction] sealed trait CallbackMessage - private[InboundReplyJunction] final case class Attach(observer: ReplyObserver, done: Promise[Done]) + // messages for the CallbackWrapper + private[InboundControlJunction] sealed trait CallbackMessage + private[InboundControlJunction] final case class Attach(observer: ControlMessageObserver, done: Promise[Done]) extends CallbackMessage - private[InboundReplyJunction] final case class Dettach(observer: ReplyObserver) extends CallbackMessage + private[InboundControlJunction] final case class Dettach(observer: ControlMessageObserver) extends CallbackMessage } /** * INTERNAL API */ -private[akka] class InboundReplyJunction - extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundReplyJunction.ReplySubject] { - import InboundReplyJunction._ +private[akka] class InboundControlJunction + extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundControlJunction.ControlMessageSubject] { + import InboundControlJunction._ - val in: Inlet[InboundEnvelope] = Inlet("InboundReplyJunction.in") - val out: Outlet[InboundEnvelope] = Outlet("InboundReplyJunction.out") + val in: Inlet[InboundEnvelope] = Inlet("InboundControlJunction.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundControlJunction.out") override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { @@ -68,14 +78,14 @@ private[akka] class InboundReplyJunction // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler { - private var replyObservers: Vector[ReplyObserver] = Vector.empty + private var observers: Vector[ControlMessageObserver] = Vector.empty private val callback = getAsyncCallback[CallbackMessage] { case Attach(observer, done) ⇒ - replyObservers :+= observer + observers :+= observer done.success(Done) case Dettach(observer) ⇒ - replyObservers = replyObservers.filterNot(_ == observer) + observers = observers.filterNot(_ == observer) } override def preStart(): Unit = { @@ -87,8 +97,8 @@ private[akka] class InboundReplyJunction // InHandler override def onPush(): Unit = { grab(in) match { - case env @ InboundEnvelope(_, _, reply: Reply, _) ⇒ - replyObservers.foreach(_.reply(env)) + case env @ InboundEnvelope(_, _, _: ControlMessage, _) ⇒ + observers.foreach(_.notify(env)) pull(in) case env ⇒ push(out, env) @@ -102,29 +112,29 @@ private[akka] class InboundReplyJunction } // materialized value - val replySubject: ReplySubject = new ReplySubject { - override def attach(observer: ReplyObserver): Future[Done] = { + val controlSubject: ControlMessageSubject = new ControlMessageSubject { + override def attach(observer: ControlMessageObserver): Future[Done] = { val p = Promise[Done]() logic.invoke(Attach(observer, p)) p.future } - override def detach(observer: ReplyObserver): Unit = + override def detach(observer: ControlMessageObserver): Unit = logic.invoke(Dettach(observer)) override def stopped: Future[Done] = stoppedPromise.future } - (logic, replySubject) + (logic, controlSubject) } } /** * INTERNAL API */ -private[akka] object OutboundReplyJunction { - trait OutboundReplyIngress { +private[akka] object OutboundControlJunction { + trait OutboundControlIngress { def sendControlMessage(message: ControlMessage): Unit } } @@ -132,17 +142,17 @@ private[akka] object OutboundReplyJunction { /** * INTERNAL API */ -private[akka] class OutboundReplyJunction(outboundContext: OutboundContext) - extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundReplyJunction.OutboundReplyIngress] { - import OutboundReplyJunction._ - val in: Inlet[Send] = Inlet("OutboundReplyJunction.in") - val out: Outlet[Send] = Outlet("OutboundReplyJunction.out") +private[akka] class OutboundControlJunction(outboundContext: OutboundContext) + extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundControlJunction.OutboundControlIngress] { + import OutboundControlJunction._ + val in: Inlet[Send] = Inlet("OutboundControlJunction.in") + val out: Outlet[Send] = Outlet("OutboundControlJunction.out") override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler { - import OutboundReplyJunction._ + import OutboundControlJunction._ private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) private val buffer = new ArrayDeque[Send] @@ -181,12 +191,12 @@ private[akka] class OutboundReplyJunction(outboundContext: OutboundContext) } // materialized value - val outboundReplyIngress = new OutboundReplyIngress { + val outboundControlIngress = new OutboundControlIngress { override def sendControlMessage(message: ControlMessage): Unit = logic.invoke(message) } - (logic, outboundReplyIngress) + (logic, outboundControlIngress) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 5a6cdeee65..5357b0ee45 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -29,7 +29,7 @@ private[akka] object OutboundHandshake { private sealed trait HandshakeState private case object Start extends HandshakeState - private case object ReplyObserverAttached extends HandshakeState + private case object ControlMessageObserverAttached extends HandshakeState private case object ReqInProgress extends HandshakeState private case object Completed extends HandshakeState @@ -46,7 +46,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { import OutboundHandshake._ private val timeout: FiniteDuration = 10.seconds // FIXME config @@ -59,13 +59,13 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends handshakeState = Completed } else { implicit val ec = materializer.executionContext - outboundContext.replySubject.attach(this).foreach { + outboundContext.controlSubject.attach(this).foreach { getAsyncCallback[Done] { _ ⇒ if (handshakeState != Completed) { if (isAvailable(out)) pushHandshakeReq() else - handshakeState = ReplyObserverAttached + handshakeState = ControlMessageObserverAttached } }.invoke } @@ -85,7 +85,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends } override def postStop(): Unit = { - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } // InHandler @@ -99,9 +99,9 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends override def onPull(): Unit = { handshakeState match { case Completed ⇒ pull(in) - case ReplyObserverAttached ⇒ + case ControlMessageObserverAttached ⇒ pushHandshakeReq() - case Start ⇒ // will push HandshakeReq when ReplyObserver is attached + case Start ⇒ // will push HandshakeReq when ControlMessageObserver is attached case ReqInProgress ⇒ // will pull when handshake reply is received } } @@ -115,7 +115,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends private def handshakeCompleted(): Unit = { handshakeState = Completed cancelTimer(HandshakeTimeout) - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } override protected def onTimer(timerKey: Any): Unit = @@ -125,8 +125,8 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends s"Handshake with [$remoteAddress] did not complete within ${timeout.toMillis} ms")) } - // ReplyObserver, external call - override def reply(inboundEnvelope: InboundEnvelope): Unit = { + // ControlMessageObserver, external call + override def notify(inboundEnvelope: InboundEnvelope): Unit = { inboundEnvelope.message match { case rsp: HandshakeRsp ⇒ if (rsp.from.address == remoteAddress) { @@ -165,7 +165,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext) extends Gra grab(in) match { case InboundEnvelope(_, _, HandshakeReq(from), _) ⇒ inboundContext.association(from.address).completeRemoteAddress(from) - inboundContext.sendReply(from.address, HandshakeRsp(inboundContext.localAddress)) + inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) pull(in) case other ⇒ push(out, other) diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index f66c6ef2c3..21e62ca917 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -14,7 +14,7 @@ import scala.util.Try import akka.Done import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.InboundReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -53,7 +53,7 @@ private[akka] class SystemMessageDelivery( override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { private var replyObserverAttached = false private var seqNo = 0L // sequence number for the first message will be 1 @@ -69,7 +69,7 @@ private[akka] class SystemMessageDelivery( this.schedulePeriodically(ResendTick, resendInterval) implicit val ec = materializer.executionContext - outboundContext.replySubject.attach(this).foreach { + outboundContext.controlSubject.attach(this).foreach { getAsyncCallback[Done] { _ ⇒ replyObserverAttached = true if (isAvailable(out)) @@ -77,7 +77,7 @@ private[akka] class SystemMessageDelivery( }.invoke } - outboundContext.replySubject.stopped.onComplete { + outboundContext.controlSubject.stopped.onComplete { getAsyncCallback[Try[Done]] { // FIXME quarantine case Success(_) ⇒ completeStage() @@ -87,7 +87,7 @@ private[akka] class SystemMessageDelivery( } override def postStop(): Unit = { - outboundContext.replySubject.detach(this) + outboundContext.controlSubject.detach(this) } override def onUpstreamFinish(): Unit = { @@ -106,8 +106,8 @@ private[akka] class SystemMessageDelivery( } } - // ReplyObserver, external call - override def reply(inboundEnvelope: InboundEnvelope): Unit = { + // ControlMessageObserver, external call + override def notify(inboundEnvelope: InboundEnvelope): Unit = { inboundEnvelope.message match { case ack: Ack ⇒ if (ack.from.address == remoteAddress) ackCallback.invoke(ack) case nack: Nack ⇒ if (nack.from.address == remoteAddress) nackCallback.invoke(nack) @@ -201,15 +201,15 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G grab(in) match { case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _) ⇒ if (n == seqNo) { - inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) seqNo += 1 val unwrapped = env.copy(message = sysEnv.message) push(out, unwrapped) } else if (n < seqNo) { - inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) pull(in) } else { - inboundContext.sendReply(ackReplyTo.address, Nack(seqNo - 1, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Nack(seqNo - 1, localAddress)) pull(in) } case env ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 8614aaf2b8..6675ff3e9b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -58,17 +58,17 @@ object SystemMessageDeliverySpec { class ManualReplyInboundContext( replyProbe: ActorRef, localAddress: UniqueAddress, - replySubject: TestReplySubject) extends TestInboundContext(localAddress, replySubject) { + controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) { private var lastReply: Option[(Address, ControlMessage)] = None - override def sendReply(to: Address, message: ControlMessage) = { + override def sendControl(to: Address, message: ControlMessage) = { lastReply = Some((to, message)) replyProbe ! message } def deliverLastReply(): Unit = { - lastReply.foreach { case (to, message) ⇒ super.sendReply(to, message) } + lastReply.foreach { case (to, message) ⇒ super.sendControl(to, message) } lastReply = None } } @@ -147,9 +147,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when some in the middle are lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 5, resendInterval = 60.seconds, outboundContextA) @@ -181,9 +181,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when first is lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 3, resendInterval = 60.seconds, outboundContextA) @@ -210,9 +210,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "be resent when last is lost" in { val replyProbe = TestProbe() - val replySubject = new TestReplySubject - val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, controlSubject) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val sink = send(sendCount = 3, resendInterval = 1.seconds, outboundContextA) @@ -239,9 +239,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "deliver all during stress and random dropping" in { val N = 10000 val dropRate = 0.1 - val replySubject = new TestReplySubject - val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new TestInboundContext(addressB, controlSubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val output = @@ -257,9 +257,9 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "deliver all during throttling and random dropping" in { val N = 500 val dropRate = 0.1 - val replySubject = new TestReplySubject - val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) - val inboundContextA = new TestInboundContext(addressB, replySubject) + val controlSubject = new TestControlMessageSubject + val inboundContextB = new TestInboundContext(addressB, controlSubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, controlSubject) val outboundContextA = inboundContextA.association(addressB.address) val output = diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index a5694a276d..a9a68e7247 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -6,31 +6,31 @@ package akka.remote.artery import akka.remote.UniqueAddress import akka.actor.Address import scala.concurrent.Future -import akka.remote.artery.InboundReplyJunction.ReplySubject +import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.RemoteActorRef import scala.concurrent.Promise import akka.Done -import akka.remote.artery.InboundReplyJunction.ReplyObserver +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom private[akka] class TestInboundContext( override val localAddress: UniqueAddress, - val replySubject: TestReplySubject = new TestReplySubject, + val controlSubject: TestControlMessageSubject = new TestControlMessageSubject, replyDropRate: Double = 0.0) extends InboundContext { private val associations = new ConcurrentHashMap[Address, OutboundContext] - def sendReply(to: Address, message: ControlMessage) = { + def sendControl(to: Address, message: ControlMessage) = { if (ThreadLocalRandom.current().nextDouble() >= replyDropRate) - replySubject.sendReply(InboundEnvelope(null, to, message, None)) + controlSubject.sendControl(InboundEnvelope(null, to, message, None)) } def association(remoteAddress: Address): OutboundContext = associations.get(remoteAddress) match { case null ⇒ - val a = new TestOutboundContext(localAddress, remoteAddress, replySubject) + val a = new TestOutboundContext(localAddress, remoteAddress, controlSubject) associations.putIfAbsent(remoteAddress, a) match { case null ⇒ a case existing ⇒ existing @@ -39,13 +39,13 @@ private[akka] class TestInboundContext( } protected def createAssociation(remoteAddress: Address): OutboundContext = - new TestOutboundContext(localAddress, remoteAddress, replySubject) + new TestOutboundContext(localAddress, remoteAddress, controlSubject) } private[akka] class TestOutboundContext( override val localAddress: UniqueAddress, override val remoteAddress: Address, - override val replySubject: TestReplySubject) extends OutboundContext { + override val controlSubject: TestControlMessageSubject) extends OutboundContext { private val _uniqueRemoteAddress = Promise[UniqueAddress]() def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future @@ -56,24 +56,24 @@ private[akka] class TestOutboundContext( } -private[akka] class TestReplySubject extends ReplySubject { +private[akka] class TestControlMessageSubject extends ControlMessageSubject { - private var replyObservers = new CopyOnWriteArrayList[ReplyObserver] + private var observers = new CopyOnWriteArrayList[ControlMessageObserver] - override def attach(observer: ReplyObserver): Future[Done] = { - replyObservers.add(observer) + override def attach(observer: ControlMessageObserver): Future[Done] = { + observers.add(observer) Future.successful(Done) } - override def detach(observer: ReplyObserver): Unit = { - replyObservers.remove(observer) + override def detach(observer: ControlMessageObserver): Unit = { + observers.remove(observer) } override def stopped: Future[Done] = Promise[Done]().future - def sendReply(env: InboundEnvelope): Unit = { - val iter = replyObservers.iterator() + def sendControl(env: InboundEnvelope): Unit = { + val iter = observers.iterator() while (iter.hasNext()) - iter.next().reply(env) + iter.next().notify(env) } }