diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index f97057f5f4..288e8736b4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -65,8 +65,7 @@ private[akka] class ClusterActorRefProvider( failureDetector, heartbeatInterval = WatchHeartBeatInterval, unreachableReaperInterval = WatchUnreachableReaperInterval, - heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter, - numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher") + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher") } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 29bcf69904..ae78946ac7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -26,10 +26,9 @@ private[cluster] object ClusterRemoteWatcher { failureDetector: FailureDetectorRegistry[Address], heartbeatInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration, - heartbeatExpectedResponseAfter: FiniteDuration, - numberOfEndHeartbeatRequests: Int): Props = + heartbeatExpectedResponseAfter: FiniteDuration): Props = Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, - heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests) + heartbeatExpectedResponseAfter) } /** @@ -47,14 +46,12 @@ private[cluster] class ClusterRemoteWatcher( failureDetector: FailureDetectorRegistry[Address], heartbeatInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration, - heartbeatExpectedResponseAfter: FiniteDuration, - numberOfEndHeartbeatRequests: Int) + heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher( failureDetector, heartbeatInterval, unreachableReaperInterval, - heartbeatExpectedResponseAfter, - numberOfEndHeartbeatRequests) { + heartbeatExpectedResponseAfter) { import RemoteWatcher._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala index 959c3f500c..261d8ecf3c 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala @@ -86,6 +86,8 @@ abstract class RemoteNodeDeathWatchSpec override def initialParticipants = roles.size + muteDeadLetters(Heartbeat.getClass)() + lazy val remoteWatcher: ActorRef = { system.actorSelection("/system/remote-watcher") ! Identify(None) expectMsgType[ActorIdentity].ref.get diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index ca8da650c5..6d97ac78a7 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -217,11 +217,6 @@ akka { # been received. expected-response-after = 3 s - # When a node unwatch another node it will end that - # with this number of EndHeartbeatRequest messages, which will stop the - # heartbeating from the other side - nr-of-end-heartbeats = 8 - } # After failed to establish an outbound connection, the remoting will mark the diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 91273cc20b..c1943acf44 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -191,8 +191,7 @@ private[akka] class RemoteActorRefProvider( failureDetector, heartbeatInterval = WatchHeartBeatInterval, unreachableReaperInterval = WatchUnreachableReaperInterval, - heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter, - numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher") + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher") } protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 086c87e67b..a500f2268c 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -87,9 +87,6 @@ class RemoteSettings(val config: Config) { val WatchUnreachableReaperInterval: FiniteDuration = { Duration(WatchFailureDetectorConfig.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) } requiring (_ > Duration.Zero, "watch-failure-detector.unreachable-nodes-reaper-interval must be > 0") - val WatchNumberOfEndHeartbeatRequests: Int = { - WatchFailureDetectorConfig.getInt("nr-of-end-heartbeats") - } requiring (_ > 0, "watch-failure-detector.nr-of-end-heartbeats must be > 0") val WatchHeartbeatExpectedResponseAfter: FiniteDuration = { Duration(WatchFailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS) } requiring (_ > Duration.Zero, "watch-failure-detector.expected-response-after > 0") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 9421ea10c0..a9c877dab6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -28,17 +28,15 @@ private[akka] object RemoteWatcher { failureDetector: FailureDetectorRegistry[Address], heartbeatInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration, - heartbeatExpectedResponseAfter: FiniteDuration, - numberOfEndHeartbeatRequests: Int): Props = + heartbeatExpectedResponseAfter: FiniteDuration): Props = Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, - heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests) + heartbeatExpectedResponseAfter) case class WatchRemote(watchee: ActorRef, watcher: ActorRef) case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef) - @SerialVersionUID(1L) case object HeartbeatRequest - @SerialVersionUID(1L) case object EndHeartbeatRequest - @SerialVersionUID(1L) case class Heartbeat(addressUid: Int) + @SerialVersionUID(1L) case object Heartbeat + @SerialVersionUID(1L) case class HeartbeatRsp(addressUid: Int) // sent to self only case object HeartbeatTick @@ -47,17 +45,17 @@ private[akka] object RemoteWatcher { // test purpose object Stats { - lazy val empty: Stats = counts(0, 0, 0) - def counts(watching: Int, watchingNodes: Int, watchedByNodes: Int): Stats = - new Stats(watching, watchingNodes, watchedByNodes)(Set.empty) + lazy val empty: Stats = counts(0, 0) + def counts(watching: Int, watchingNodes: Int): Stats = + new Stats(watching, watchingNodes)(Set.empty) } - case class Stats(watching: Int, watchingNodes: Int, watchedByNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) { + case class Stats(watching: Int, watchingNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) { override def toString: String = { def formatWatchingRefs: String = if (watchingRefs.isEmpty) "" else ", watchingRefs=" + watchingRefs.map(x ⇒ x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]") - s"Stats(watching=${watching}, watchingNodes=${watchingNodes}, watchedByNodes=${watchedByNodes}${formatWatchingRefs})" + s"Stats(watching=${watching}, watchingNodes=${watchingNodes}${formatWatchingRefs})" } } } @@ -70,18 +68,13 @@ private[akka] object RemoteWatcher { * intercepts Watch and Unwatch system messages and sends corresponding * [[RemoteWatcher.WatchRemote]] and [[RemoteWatcher.UnwatchRemote]] to this actor. * - * For a new node to be watched this actor starts the monitoring by sending [[RemoteWatcher.HeartbeatRequest]] - * to the peer actor on the other node, which then sends periodic [[RemoteWatcher.Heartbeat]] - * messages back. The failure detector on the watching side monitors these heartbeat messages. + * For a new node to be watched this actor periodically sends [[RemoteWatcher.Heartbeat]] + * to the peer actor on the other node, which replies with [[RemoteWatcher.HeartbeatRsp]] + * message back. The failure detector on the watching side monitors these heartbeat messages. * If arrival of hearbeat messages stops it will be detected and this actor will publish * [[akka.actor.AddressTerminated]] to the `eventStream`. * - * When all actors on a node have been unwatched, or terminated, this actor sends - * [[RemoteWatcher.EndHeartbeatRequest]] messages to the peer actor on the other node, - * which will then stop sending heartbeat messages. - * - * The actor sending heartbeat messages will also watch the peer on the other node, - * to be able to stop sending heartbeat messages in case of network failure or JVM crash. + * When all actors on a node have been unwatched it will stop sending heartbeat messages. * * For bi-directional watch between two nodes the same thing will be established in * both directions, but independent of each other. @@ -91,8 +84,7 @@ private[akka] class RemoteWatcher( failureDetector: FailureDetectorRegistry[Address], heartbeatInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration, - heartbeatExpectedResponseAfter: FiniteDuration, - numberOfEndHeartbeatRequests: Int) + heartbeatExpectedResponseAfter: FiniteDuration) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import RemoteWatcher._ @@ -105,16 +97,13 @@ private[akka] class RemoteWatcher( s"ActorSystem [${context.system}] needs to have a 'RemoteActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]") } - val selfHeartbeatMsg = Heartbeat(AddressUidExtension(context.system).addressUid) + val selfHeartbeatRspMsg = HeartbeatRsp(AddressUidExtension(context.system).addressUid) // actors that this node is watching, tuple with (watcher, watchee) var watching: Set[(ActorRef, ActorRef)] = Set.empty // nodes that this node is watching, i.e. expecting hearteats from these nodes var watchingNodes: Set[Address] = Set.empty - // heartbeats will be sent to watchedByNodes, ref is RemoteWatcher at other side - var watchedByNodes: Set[ActorRef] = Set.empty var unreachable: Set[Address] = Set.empty - var endWatchingNodes: Map[Address, Int] = Map.empty var addressUids: Map[Address, Int] = Map.empty val heartbeatTask = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick) @@ -128,14 +117,10 @@ private[akka] class RemoteWatcher( } def receive = { - case HeartbeatTick ⇒ - sendHeartbeat() - sendHeartbeatRequest() - sendEndHeartbeatRequest() - case Heartbeat(uid) ⇒ heartbeat(uid) + case HeartbeatTick ⇒ sendHeartbeat() + case Heartbeat ⇒ receiveHeartbeat() + case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid) case ReapUnreachableTick ⇒ reapUnreachable() - case HeartbeatRequest ⇒ heartbeatRequest() - case EndHeartbeatRequest ⇒ endHeartbeatRequest() case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) case WatchRemote(watchee, watcher) ⇒ watchRemote(watchee, watcher) case UnwatchRemote(watchee, watcher) ⇒ unwatchRemote(watchee, watcher) @@ -145,17 +130,19 @@ private[akka] class RemoteWatcher( case Stats ⇒ sender ! Stats( watching = watching.size, - watchingNodes = watchingNodes.size, - watchedByNodes = watchedByNodes.size)(watching) + watchingNodes = watchingNodes.size)(watching) } - def heartbeat(uid: Int): Unit = { + def receiveHeartbeat(): Unit = + sender ! selfHeartbeatRspMsg + + def receiveHeartbeatRsp(uid: Int): Unit = { val from = sender.path.address if (failureDetector.isMonitoring(from)) - log.debug("Received heartbeat from [{}]", from) + log.debug("Received heartbeat rsp from [{}]", from) else - log.debug("Received first heartbeat from [{}]", from) + log.debug("Received first heartbeat rsp from [{}]", from) if (watchingNodes(from) && !unreachable(from)) { addressUids += (from -> uid) @@ -163,24 +150,6 @@ private[akka] class RemoteWatcher( } } - def heartbeatRequest(): Unit = { - // request to start sending heartbeats to the node - log.debug("Received HeartbeatRequest from [{}]", sender.path.address) - watchedByNodes += sender - // watch back to stop heartbeating if other side dies - context watch sender - addWatching(sender, self) - } - - def endHeartbeatRequest(): Unit = { - // request to stop sending heartbeats to the node - log.debug("Received EndHeartbeatRequest from [{}]", sender.path.address) - watchedByNodes -= sender - context unwatch sender - watching -= ((sender, self)) - checkLastUnwatchOfNode(sender.path.address) - } - def reapUnreachable(): Unit = watchingNodes foreach { a ⇒ if (!unreachable(a) && !failureDetector.isAvailable(a)) { @@ -218,7 +187,6 @@ private[akka] class RemoteWatcher( failureDetector.remove(watcheeAddress) } watchingNodes += watcheeAddress - endWatchingNodes -= watcheeAddress } def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = @@ -242,71 +210,38 @@ private[akka] class RemoteWatcher( } def terminated(watchee: ActorRef): Unit = { - if (matchingPathElements(self, watchee)) { - log.debug("Other side terminated: [{}]", watchee.path) - // stop heartbeating to that node immediately, and cleanup - watchedByNodes -= watchee - watching -= ((watchee, self)) - } else { - log.debug("Watchee terminated: [{}]", watchee.path) - watching = watching.filterNot { - case (wee, _) ⇒ wee == watchee - } + log.debug("Watchee terminated: [{}]", watchee.path) + watching = watching.filterNot { + case (wee, _) ⇒ wee == watchee } checkLastUnwatchOfNode(watchee.path.address) } def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = { if (watchingNodes(watcheeAddress) && watching.forall { - case (wee, wer) ⇒ wee.path.address != watcheeAddress || (wer == self && matchingPathElements(self, wee)) + case (wee, wer) ⇒ wee.path.address != watcheeAddress }) { - // unwatched last watchee on that node, not counting RemoteWatcher peer + // unwatched last watchee on that node log.debug("Unwatched last watchee of node: [{}]", watcheeAddress) watchingNodes -= watcheeAddress addressUids -= watcheeAddress - // continue by sending EndHeartbeatRequest for a while - endWatchingNodes += (watcheeAddress -> 0) failureDetector.remove(watcheeAddress) } } - def matchingPathElements(a: ActorRef, b: ActorRef): Boolean = - a.path.elements == b.path.elements - def sendHeartbeat(): Unit = - watchedByNodes foreach { ref ⇒ - val a = ref.path.address + watchingNodes foreach { a ⇒ if (!unreachable(a)) { - log.debug("Sending Heartbeat to [{}]", ref.path.address) - ref ! selfHeartbeatMsg - } - } - - def sendHeartbeatRequest(): Unit = - watchingNodes.foreach { a ⇒ - if (!unreachable(a) && !failureDetector.isMonitoring(a)) { - log.debug("Sending HeartbeatRequest to [{}]", a) - context.actorSelection(RootActorPath(a) / self.path.elements) ! HeartbeatRequest - // schedule the expected heartbeat for later, which will give the - // other side a chance to start heartbeating, and also trigger some resends of - // the heartbeat request - scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a)) - endWatchingNodes -= a - } - } - - def sendEndHeartbeatRequest(): Unit = - endWatchingNodes.foreach { - case (a, count) ⇒ - if (!unreachable(a)) { - log.debug("Sending EndHeartbeatRequest to [{}]", a) - context.actorSelection(RootActorPath(a) / self.path.elements) ! EndHeartbeatRequest - } - if (count == numberOfEndHeartbeatRequests - 1) { - endWatchingNodes -= a + if (failureDetector.isMonitoring(a)) { + log.debug("Sending Heartbeat to [{}]", a) } else { - endWatchingNodes += (a -> (count + 1)) + log.debug("Sending first Heartbeat to [{}]", a) + // schedule the expected first heartbeat for later, which will give the + // other side a chance to reply, and also trigger some resends if needed + scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a)) } + context.actorSelection(RootActorPath(a) / self.path.elements) ! Heartbeat + } } def triggerFirstHeartbeat(address: Address): Unit = diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index ab41562018..503a154eff 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -51,7 +51,6 @@ class RemoteConfigSpec extends AkkaSpec( WatchFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) WatchHeartBeatInterval must be(1 seconds) - WatchNumberOfEndHeartbeatRequests must be(8) WatchHeartbeatExpectedResponseAfter must be(3 seconds) WatchUnreachableReaperInterval must be(1 second) WatchFailureDetectorConfig.getDouble("threshold") must be(10.0 plusOrMinus 0.0001) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index b7a9519543..86a59a693d 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -52,8 +52,7 @@ object RemoteWatcherSpec { class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector, heartbeatInterval = TurnOff, unreachableReaperInterval = TurnOff, - heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter, - numberOfEndHeartbeatRequests = 3) { + heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter) { def this() = this(heartbeatExpectedResponseAfter = TurnOff) @@ -99,7 +98,7 @@ class RemoteWatcherSpec extends AkkaSpec( shutdown(remoteSystem) } - val heartbeatMsgB = Heartbeat(remoteAddressUid) + val heartbeatRspB = HeartbeatRsp(remoteAddressUid) def createRemoteActor(props: Props, name: String): ActorRef = { remoteSystem.actorOf(props, name) @@ -126,32 +125,35 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! Stats // for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2 // (a1->b1), (a1->b2), (a2->b2) - expectMsg(Stats.counts(watching = 5, watchingNodes = 1, watchedByNodes = 0)) + expectMsg(Stats.counts(watching = 5, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick - expectMsg(HeartbeatRequest) + expectMsg(Heartbeat) expectNoMsg(100 millis) monitorA ! HeartbeatTick - expectMsg(HeartbeatRequest) + expectMsg(Heartbeat) expectNoMsg(100 millis) - monitorA.tell(heartbeatMsgB, monitorB) + monitorA.tell(heartbeatRspB, monitorB) monitorA ! HeartbeatTick + expectMsg(Heartbeat) expectNoMsg(100 millis) monitorA ! UnwatchRemote(b1, a1) // still (a1->b2) and (a2->b2) left monitorA ! Stats - expectMsg(Stats.counts(watching = 3, watchingNodes = 1, watchedByNodes = 0)) + expectMsg(Stats.counts(watching = 3, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick + expectMsg(Heartbeat) expectNoMsg(100 millis) monitorA ! UnwatchRemote(b2, a2) // still (a1->b2) left monitorA ! Stats - expectMsg(Stats.counts(watching = 2, watchingNodes = 1, watchedByNodes = 0)) + expectMsg(Stats.counts(watching = 2, watchingNodes = 1)) expectNoMsg(100 millis) monitorA ! HeartbeatTick + expectMsg(Heartbeat) expectNoMsg(100 millis) monitorA ! UnwatchRemote(b2, a1) @@ -159,15 +161,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! Stats expectMsg(Stats.empty) expectNoMsg(100 millis) - // expecting 3 EndHeartbeatRequest monitorA ! HeartbeatTick - expectMsg(EndHeartbeatRequest) - expectNoMsg(100 millis) - monitorA ! HeartbeatTick - expectMsg(EndHeartbeatRequest) - expectNoMsg(100 millis) - monitorA ! HeartbeatTick - expectMsg(EndHeartbeatRequest) expectNoMsg(100 millis) monitorA ! HeartbeatTick expectNoMsg(100 millis) @@ -176,67 +170,6 @@ class RemoteWatcherSpec extends AkkaSpec( expectNoMsg(2 seconds) } - "have correct interaction when beeing watched" in { - - val monitorA = system.actorOf(Props(classOf[TestActorProxy], testActor), "monitor2") - val monitorB = createRemoteActor(Props[TestRemoteWatcher], "monitor2") - - val b3 = createRemoteActor(Props[MyActor], "b3") - - // watch - monitorB.tell(HeartbeatRequest, monitorA) - monitorB ! Stats - // HeartbeatRequest adds cross watch to RemoteWatcher peer - val stats = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1)) - stats.watchingRefs must be(Set((monitorA, monitorB))) - expectNoMsg(100 millis) - monitorB ! HeartbeatTick - expectMsg(heartbeatMsgB) - // HeartbeatRequest for the cross watch - expectMsg(HeartbeatRequest) - expectNoMsg(100 millis) - monitorB ! HeartbeatTick - expectMsg(heartbeatMsgB) - // HeartbeatRequest for the cross watch - expectMsg(HeartbeatRequest) - expectNoMsg(100 millis) - - // unwatch - monitorB.tell(EndHeartbeatRequest, monitorA) - monitorB ! Stats - // EndHeartbeatRequest should remove the cross watch - expectMsg(Stats.empty) - expectNoMsg(100 millis) - monitorB ! HeartbeatTick - // EndHeartbeatRequest for the cross watch - expectMsg(EndHeartbeatRequest) - expectNoMsg(100 millis) - - // start heartbeating again - monitorB.tell(HeartbeatRequest, monitorA) - monitorB ! Stats - val stats2 = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1)) - stats2.watchingRefs must be(Set((monitorA, monitorB))) - expectNoMsg(100 millis) - monitorB ! HeartbeatTick - expectMsg(heartbeatMsgB) - expectMsg(HeartbeatRequest) - expectNoMsg(100 millis) - - // then kill other side, which should stop the heartbeating - monitorA ! PoisonPill - awaitAssert { - monitorB ! Stats - expectMsg(Stats.empty) - } - monitorB ! HeartbeatTick - // no more heartbeats and no EndHeartbeatRequest for the cross watch - expectNoMsg(500 millis) - - // make sure nothing floods over to next test - expectNoMsg(2 seconds) - } - "generate AddressTerminated when missing heartbeats" in { val p = TestProbe() val q = TestProbe() @@ -252,14 +185,18 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! WatchRemote(b, a) monitorA ! HeartbeatTick - expectMsg(HeartbeatRequest) - monitorA.tell(heartbeatMsgB, monitorB) + expectMsg(Heartbeat) + monitorA.tell(heartbeatRspB, monitorB) expectNoMsg(1 second) - monitorA.tell(heartbeatMsgB, monitorB) + monitorA ! HeartbeatTick + expectMsg(Heartbeat) + monitorA.tell(heartbeatRspB, monitorB) within(10 seconds) { awaitAssert { monitorA ! HeartbeatTick + expectMsg(Heartbeat) + // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) @@ -287,12 +224,14 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! WatchRemote(b, a) monitorA ! HeartbeatTick - expectMsg(HeartbeatRequest) - // no heartbeats sent + expectMsg(Heartbeat) + // no HeartbeatRsp sent within(20 seconds) { awaitAssert { monitorA ! HeartbeatTick + expectMsg(Heartbeat) + // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) // no quarantine when missing first heartbeat, uid unknown @@ -300,14 +239,8 @@ class RemoteWatcherSpec extends AkkaSpec( } } - // some more HeartbeatRequest may be sent - receiveWhile(1.second) { - case HeartbeatRequest ⇒ // ok - } - // make sure nothing floods over to next test expectNoMsg(2 seconds) - } "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in { @@ -325,14 +258,18 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! WatchRemote(b, a) monitorA ! HeartbeatTick - expectMsg(HeartbeatRequest) - monitorA.tell(heartbeatMsgB, monitorB) + expectMsg(Heartbeat) + monitorA.tell(heartbeatRspB, monitorB) expectNoMsg(1 second) - monitorA.tell(heartbeatMsgB, monitorB) + monitorA ! HeartbeatTick + expectMsg(Heartbeat) + monitorA.tell(heartbeatRspB, monitorB) within(10 seconds) { awaitAssert { monitorA ! HeartbeatTick + expectMsg(Heartbeat) + // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) @@ -353,15 +290,21 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! WatchRemote(c, a) monitorA ! HeartbeatTick - expectMsg(HeartbeatRequest) - monitorA.tell(heartbeatMsgB, monitorB) + expectMsg(Heartbeat) + monitorA.tell(heartbeatRspB, monitorB) expectNoMsg(1 second) - monitorA.tell(heartbeatMsgB, monitorB) monitorA ! HeartbeatTick + expectMsg(Heartbeat) + monitorA.tell(heartbeatRspB, monitorB) + monitorA ! HeartbeatTick + expectMsg(Heartbeat) monitorA ! ReapUnreachableTick p.expectNoMsg(1 second) - monitorA.tell(heartbeatMsgB, monitorB) monitorA ! HeartbeatTick + expectMsg(Heartbeat) + monitorA.tell(heartbeatRspB, monitorB) + monitorA ! HeartbeatTick + expectMsg(Heartbeat) monitorA ! ReapUnreachableTick p.expectNoMsg(1 second) q.expectNoMsg(1 second) @@ -370,6 +313,8 @@ class RemoteWatcherSpec extends AkkaSpec( within(10 seconds) { awaitAssert { monitorA ! HeartbeatTick + expectMsg(Heartbeat) + // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid)) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index e81b01f9bf..514f1e437d 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -252,7 +252,7 @@ trait TestKitBase { val failed = try { a; false } catch { case NonFatal(e) ⇒ - if (now >= stop) throw e + if ((now + t) >= stop) throw e true } if (failed) {