diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala index b9abfe25a2..35badd1cc0 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala @@ -37,13 +37,12 @@ class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false)) -// FIXME this is failing with Artery -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) object AttemptSysMsgRedeliverySpec { class Echo extends Actor { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala index 9be5c92b48..456d8a262a 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala @@ -134,11 +134,6 @@ abstract class RemoteReDeploymentMultiJvmSpec(multiNodeConfig: RemoteReDeploymen enterBarrier("first-deployed") - // FIXME When running with Artery: - // [akka://RemoteReDeploymentMultiJvmSpec/user/parent] received Supervise from unregistered child - // Actor[artery://RemoteReDeploymentMultiJvmSpec@localhost:55627/remote/artery/RemoteReDeploymentMultiJvmSpec@localhost:65490/user/parent/hello#-370928728], - // this will not end well - runOn(first) { testConductor.blackhole(second, first, Both).await testConductor.shutdown(second, abort = true).await diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala index 6b1100008d..157c8fb0e3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala @@ -65,14 +65,10 @@ class RemoteRoundRobinMultiJvmNode2 extends RemoteRoundRobinSpec(new RemoteRound class RemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = false)) class RemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = false)) -// FIXME this test fails with Artery -// [akka://RemoteRoundRobinSpec/user/service-hello2] received Supervise from unregistered child -// Actor[artery://RemoteRoundRobinSpec@localhost:52247/remote/artery/RemoteRoundRobinSpec@localhost:56386/user/service-hello2/c2#-2080820302], -// this will not end well -//class ArteryRemoteRoundRobinMultiJvmNode1 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) -//class ArteryRemoteRoundRobinMultiJvmNode2 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) -//class ArteryRemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) -//class ArteryRemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) +class ArteryRemoteRoundRobinMultiJvmNode1 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) +class ArteryRemoteRoundRobinMultiJvmNode2 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) +class ArteryRemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) +class ArteryRemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true)) object RemoteRoundRobinSpec { class SomeActor extends Actor { diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index de2ee9da60..12bb70cec0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -233,7 +233,8 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - var seqNo = 1L + // TODO we might need have to prune old unused entries + var sequenceNumbers = Map.empty[UniqueAddress, Long] def localAddress = inboundContext.localAddress @@ -242,16 +243,20 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G val env = grab(in) env.message match { case sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo) ⇒ - if (n == seqNo) { + val expectedSeqNo = sequenceNumbers.get(ackReplyTo) match { + case None ⇒ 1L + case Some(seqNo) ⇒ seqNo + } + if (n == expectedSeqNo) { inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) - seqNo += 1 + sequenceNumbers = sequenceNumbers.updated(ackReplyTo, n + 1) val unwrapped = env.withMessage(sysEnv.message) push(out, unwrapped) - } else if (n < seqNo) { - inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) + } else if (n < expectedSeqNo) { + inboundContext.sendControl(ackReplyTo.address, Ack(expectedSeqNo - 1, localAddress)) pull(in) } else { - inboundContext.sendControl(ackReplyTo.address, Nack(seqNo - 1, localAddress)) + inboundContext.sendControl(ackReplyTo.address, Nack(expectedSeqNo - 1, localAddress)) pull(in) } case _ ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala index 1f724b3fea..d55b842c53 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala @@ -102,7 +102,6 @@ class RemoteRouterSpec extends AkkaSpec(""" val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps), "blub") val replies = collectRouteePaths(probe, router, 5) - println(s"# replies $replies") // FIXME val children = replies.toSet children should have size 2 children.map(_.parent) should have size 1 diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala new file mode 100644 index 0000000000..58585a2e6a --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.Address +import akka.remote.UniqueAddress +import akka.remote.artery.SystemMessageDelivery._ +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl.Keep +import akka.stream.testkit.TestPublisher +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.TestSource +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestProbe +import akka.util.OptionVal + +class SystemMessageAckerSpec extends AkkaSpec with ImplicitSender { + + val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + implicit val mat = ActorMaterializer(matSettings)(system) + + val addressA = UniqueAddress(Address("artery", "sysA", "hostA", 1001), 1) + val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2) + val addressC = UniqueAddress(Address("artery", "sysC", "hostB", 1003), 3) + + private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { + val recipient = OptionVal.None // not used + TestSource.probe[AnyRef] + .map { + case sysMsg @ SystemMessageEnvelope(_, _, ackReplyTo) ⇒ + InboundEnvelope(recipient, addressA.address, sysMsg, OptionVal.None, ackReplyTo.uid, + inboundContext.association(ackReplyTo.uid)) + } + .via(new SystemMessageAcker(inboundContext)) + .map { case env: InboundEnvelope ⇒ env.message } + .toMat(TestSink.probe[Any])(Keep.both) + .run() + } + + "SystemMessageAcker stage" must { + + "send Ack for expected message" in { + val replyProbe = TestProbe() + val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref)) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB)) + replyProbe.expectMsg(Ack(1, addressA)) + upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB)) + replyProbe.expectMsg(Ack(2, addressA)) + downstream.cancel() + } + + "send Ack for duplicate message" in { + val replyProbe = TestProbe() + val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref)) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB)) + replyProbe.expectMsg(Ack(1, addressA)) + upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB)) + replyProbe.expectMsg(Ack(2, addressA)) + upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB)) + replyProbe.expectMsg(Ack(2, addressA)) + downstream.cancel() + } + + "send Nack for unexpected message" in { + val replyProbe = TestProbe() + val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref)) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB)) + replyProbe.expectMsg(Ack(1, addressA)) + upstream.sendNext(SystemMessageEnvelope("b3", 3, addressB)) + replyProbe.expectMsg(Nack(1, addressA)) + downstream.cancel() + } + + "send Nack for unexpected first message" in { + val replyProbe = TestProbe() + val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref)) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB)) + replyProbe.expectMsg(Nack(0, addressA)) + downstream.cancel() + } + + "keep track of sequence numbers per sending system" in { + val replyProbe = TestProbe() + val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref)) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB)) + replyProbe.expectMsg(Ack(1, addressA)) + upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB)) + replyProbe.expectMsg(Ack(2, addressA)) + + upstream.sendNext(SystemMessageEnvelope("c1", 1, addressC)) + replyProbe.expectMsg(Ack(1, addressA)) + upstream.sendNext(SystemMessageEnvelope("c3", 3, addressC)) + replyProbe.expectMsg(Nack(1, addressA)) + upstream.sendNext(SystemMessageEnvelope("c2", 2, addressC)) + replyProbe.expectMsg(Ack(2, addressA)) + upstream.sendNext(SystemMessageEnvelope("c3", 3, addressC)) + replyProbe.expectMsg(Ack(3, addressA)) + upstream.sendNext(SystemMessageEnvelope("c4", 4, addressC)) + replyProbe.expectMsg(Ack(4, addressA)) + + upstream.sendNext(SystemMessageEnvelope("b4", 4, addressB)) + replyProbe.expectMsg(Nack(2, addressA)) + upstream.sendNext(SystemMessageEnvelope("b3", 3, addressB)) + replyProbe.expectMsg(Ack(3, addressA)) + + downstream.cancel() + } + + } + +}