diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 511ef38cc3..fe3776a3a3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -142,7 +142,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val codec: AkkaPduCodec = AkkaPduProtobufCodec private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") - private val systemMessageResendInterval: FiniteDuration = 1.second // FIXME config + + // FIXME config + private val systemMessageResendInterval: FiniteDuration = 1.second + private val handshakeTimeout: FiniteDuration = 10.seconds // TODO support port 0 private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" @@ -276,7 +279,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def outbound(outboundContext: OutboundContext): Sink[Send, Any] = { Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext)) + .via(new OutboundHandshake(outboundContext, handshakeTimeout)) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) @@ -284,7 +287,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = { Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext)) + .via(new OutboundHandshake(outboundContext, handshakeTimeout)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 5357b0ee45..92793ff6e6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -40,7 +40,7 @@ private[akka] object OutboundHandshake { /** * INTERNAL API */ -private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends GraphStage[FlowShape[Send, Send]] { +private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration) extends GraphStage[FlowShape[Send, Send]] { val in: Inlet[Send] = Inlet("OutboundHandshake.in") val out: Outlet[Send] = Outlet("OutboundHandshake.out") override val shape: FlowShape[Send, Send] = FlowShape(in, out) @@ -49,7 +49,6 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { import OutboundHandshake._ - private val timeout: FiniteDuration = 10.seconds // FIXME config private var handshakeState: HandshakeState = Start private def remoteAddress = outboundContext.remoteAddress diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala new file mode 100644 index 0000000000..4b67be4274 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.Address +import akka.actor.InternalActorRef +import akka.remote.UniqueAddress +import akka.remote.artery.InboundControlJunction.ControlMessageObserver +import akka.remote.artery.SystemMessageDelivery._ +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl.Keep +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.TestSource +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestProbe + +object InboundControlJunctionSpec { + case object Control1 extends ControlMessage + case object Control2 extends ControlMessage + case object Control3 extends ControlMessage +} + +class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender { + import InboundControlJunctionSpec._ + + val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + implicit val mat = ActorMaterializer(matSettings)(system) + + val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1) + val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2) + + "Control messages" must { + + "be emitted via side channel" in { + val observerProbe = TestProbe() + val inboundContext = new TestInboundContext(localAddress = addressB) + val recipient = null.asInstanceOf[InternalActorRef] // not used + + val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] + .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None)) + .viaMat(new InboundControlJunction)(Keep.both) + .map { case InboundEnvelope(_, _, msg, _) ⇒ msg } + .toMat(TestSink.probe[Any])(Keep.both) + .run() + + controlSubject.attach(new ControlMessageObserver { + override def notify(env: InboundEnvelope) { + observerProbe.ref ! env.message + } + }) + + downstream.request(10) + upstream.sendNext("msg1") + downstream.expectNext("msg1") + upstream.sendNext(Control1) + upstream.sendNext(Control2) + observerProbe.expectMsg(Control1) + observerProbe.expectMsg(Control2) + upstream.sendNext("msg2") + downstream.expectNext("msg2") + upstream.sendNext(Control3) + observerProbe.expectMsg(Control3) + downstream.cancel() + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala new file mode 100644 index 0000000000..f2b378efc5 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.Address +import akka.actor.InternalActorRef +import akka.remote.UniqueAddress +import akka.remote.artery.OutboundHandshake.HandshakeReq +import akka.remote.artery.OutboundHandshake.HandshakeRsp +import akka.remote.artery.SystemMessageDelivery._ +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl.Keep +import akka.stream.testkit.TestPublisher +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.TestSource +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestProbe + +object InboundHandshakeSpec { + case object Control1 extends ControlMessage + case object Control2 extends ControlMessage + case object Control3 extends ControlMessage +} + +class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { + import InboundHandshakeSpec._ + + val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + implicit val mat = ActorMaterializer(matSettings)(system) + + val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1) + val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2) + + private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { + val recipient = null.asInstanceOf[InternalActorRef] // not used + TestSource.probe[AnyRef] + .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None)) + .via(new InboundHandshake(inboundContext)) + .map { case InboundEnvelope(_, _, msg, _) ⇒ msg } + .toMat(TestSink.probe[Any])(Keep.both) + .run() + } + + "InboundHandshake stage" must { + + "send HandshakeRsp as reply to HandshakeReq" in { + val replyProbe = TestProbe() + val inboundContext = new ManualReplyInboundContext(replyProbe.ref, addressB, new TestControlMessageSubject) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + upstream.sendNext(HandshakeReq(addressA)) + upstream.sendNext("msg1") + replyProbe.expectMsg(HandshakeRsp(addressB)) + downstream.expectNext("msg1") + downstream.cancel() + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala new file mode 100644 index 0000000000..ab1eb63f0a --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.Address +import akka.remote.EndpointManager.Send +import akka.remote.RemoteActorRef +import akka.remote.UniqueAddress +import akka.remote.artery.SystemMessageDelivery._ +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl.Keep +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.TestSource +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +object OutboundControlJunctionSpec { + case object Control1 extends ControlMessage + case object Control2 extends ControlMessage + case object Control3 extends ControlMessage +} + +class OutboundControlJunctionSpec extends AkkaSpec with ImplicitSender { + import OutboundControlJunctionSpec._ + + val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + implicit val mat = ActorMaterializer(matSettings)(system) + + val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1) + val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2) + + "Control messages" must { + + "be injected via side channel" in { + val inboundContext = new TestInboundContext(localAddress = addressA) + val outboundContext = inboundContext.association(addressB.address) + val destination = null.asInstanceOf[RemoteActorRef] // not used + + val ((upstream, controlIngress), downstream) = TestSource.probe[String] + .map(msg ⇒ Send(msg, None, destination, None)) + .viaMat(new OutboundControlJunction(outboundContext))(Keep.both) + .map { case Send(msg, _, _, _) ⇒ msg } + .toMat(TestSink.probe[Any])(Keep.both) + .run() + + controlIngress.sendControlMessage(Control1) + downstream.request(1) + downstream.expectNext(Control1) + upstream.sendNext("msg1") + downstream.request(1) + downstream.expectNext("msg1") + upstream.sendNext("msg2") + downstream.request(1) + downstream.expectNext("msg2") + controlIngress.sendControlMessage(Control2) + upstream.sendNext("msg3") + downstream.request(10) + downstream.expectNextUnorderedN(List("msg3", Control2)) + controlIngress.sendControlMessage(Control3) + downstream.expectNext(Control3) + downstream.cancel() + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala new file mode 100644 index 0000000000..a75c8576bf --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.TimeoutException + +import scala.concurrent.duration._ + +import akka.actor.Address +import akka.actor.InternalActorRef +import akka.remote.EndpointManager.Send +import akka.remote.RemoteActorRef +import akka.remote.UniqueAddress +import akka.remote.artery.OutboundHandshake.HandshakeReq +import akka.remote.artery.OutboundHandshake.HandshakeRsp +import akka.remote.artery.SystemMessageDelivery._ +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl.Keep +import akka.stream.testkit.TestPublisher +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.TestSource +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { + + val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + implicit val mat = ActorMaterializer(matSettings)(system) + + val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1) + val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2) + + private def setupStream(outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = { + val destination = null.asInstanceOf[RemoteActorRef] // not used + TestSource.probe[String] + .map(msg ⇒ Send(msg, None, destination, None)) + .via(new OutboundHandshake(outboundContext, timeout)) + .map { case Send(msg, _, _, _) ⇒ msg } + .toMat(TestSink.probe[Any])(Keep.both) + .run() + } + + "OutboundHandshake stage" must { + "send HandshakeReq when first pulled" in { + val inboundContext = new TestInboundContext(localAddress = addressA) + val outboundContext = inboundContext.association(addressB.address) + val (upstream, downstream) = setupStream(outboundContext) + + downstream.request(10) + downstream.expectNext(HandshakeReq(addressA)) + downstream.cancel() + } + + "timeout if not receiving HandshakeRsp" in { + val inboundContext = new TestInboundContext(localAddress = addressA) + val outboundContext = inboundContext.association(addressB.address) + val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis) + + downstream.request(1) + downstream.expectNext(HandshakeReq(addressA)) + downstream.expectError().getClass should be(classOf[TimeoutException]) + } + + "not deliver messages from upstream until handshake completed" in { + val controlSubject = new TestControlMessageSubject + val inboundContext = new TestInboundContext(localAddress = addressA, controlSubject) + val outboundContext = inboundContext.association(addressB.address) + val recipient = null.asInstanceOf[InternalActorRef] // not used + val (upstream, downstream) = setupStream(outboundContext) + + downstream.request(10) + downstream.expectNext(HandshakeReq(addressA)) + upstream.sendNext("msg1") + downstream.expectNoMsg(200.millis) + controlSubject.sendControl(InboundEnvelope(recipient, addressA.address, HandshakeRsp(addressB), None)) + downstream.expectNext("msg1") + upstream.sendNext("msg2") + downstream.expectNext("msg2") + downstream.cancel() + } + + "complete handshake via another sub-channel" in { + val inboundContext = new TestInboundContext(localAddress = addressA) + val outboundContext = inboundContext.association(addressB.address) + val (upstream, downstream) = setupStream(outboundContext) + + downstream.request(10) + downstream.expectNext(HandshakeReq(addressA)) + upstream.sendNext("msg1") + // handshake completed first by another sub-channel + outboundContext.completeRemoteAddress(addressB) + downstream.expectNext("msg1") + upstream.sendNext("msg2") + downstream.expectNext("msg2") + downstream.cancel() + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 6675ff3e9b..6e24432483 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -54,24 +54,6 @@ object SystemMessageDeliverySpec { val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB") .withFallback(commonConfig) - - class ManualReplyInboundContext( - replyProbe: ActorRef, - localAddress: UniqueAddress, - controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) { - - private var lastReply: Option[(Address, ControlMessage)] = None - - override def sendControl(to: Address, message: ControlMessage) = { - lastReply = Some((to, message)) - replyProbe ! message - } - - def deliverLastReply(): Unit = { - lastReply.foreach { case (to, message) ⇒ super.sendControl(to, message) } - lastReply = None - } - } } class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commonConfig) with ImplicitSender { diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index a9a68e7247..0db4672139 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -14,6 +14,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageObserver import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom +import akka.actor.ActorRef private[akka] class TestInboundContext( override val localAddress: UniqueAddress, @@ -76,4 +77,23 @@ private[akka] class TestControlMessageSubject extends ControlMessageSubject { while (iter.hasNext()) iter.next().notify(env) } + +} + +private[akka] class ManualReplyInboundContext( + replyProbe: ActorRef, + localAddress: UniqueAddress, + controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) { + + private var lastReply: Option[(Address, ControlMessage)] = None + + override def sendControl(to: Address, message: ControlMessage) = { + lastReply = Some((to, message)) + replyProbe ! message + } + + def deliverLastReply(): Unit = { + lastReply.foreach { case (to, message) ⇒ super.sendControl(to, message) } + lastReply = None + } }