diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 3222edd5c8..29bcf69904 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -82,7 +82,9 @@ private[cluster] class ClusterRemoteWatcher( case state: CurrentClusterState ⇒ clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility - unreachable = state.unreachable.collect { case m if m.address != selfAddress ⇒ m.address } + val clusterUnreachable = state.unreachable.collect { case m if m.address != selfAddress ⇒ m.address } + unreachable --= clusterNodes + unreachable ++= clusterUnreachable case MemberUp(m) ⇒ if (m.address != selfAddress) { clusterNodes += m.address @@ -101,6 +103,7 @@ private[cluster] class ClusterRemoteWatcher( } publishAddressTerminated(m.address) } + case _: MemberEvent ⇒ // not interesting } /** diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala index 7c0b0bf209..f24c472d7e 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala @@ -22,6 +22,7 @@ import akka.testkit._ object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") + val third = role("third") commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(""" @@ -59,12 +60,14 @@ object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig { class RemoteNodeDeathWatchFastMultiJvmNode1 extends RemoteNodeDeathWatchFastSpec class RemoteNodeDeathWatchFastMultiJvmNode2 extends RemoteNodeDeathWatchFastSpec +class RemoteNodeDeathWatchFastMultiJvmNode3 extends RemoteNodeDeathWatchFastSpec abstract class RemoteNodeDeathWatchFastSpec extends RemoteNodeDeathWatchSpec { override def scenario = "fast" } class RemoteNodeDeathWatchSlowMultiJvmNode1 extends RemoteNodeDeathWatchSlowSpec class RemoteNodeDeathWatchSlowMultiJvmNode2 extends RemoteNodeDeathWatchSlowSpec +class RemoteNodeDeathWatchSlowMultiJvmNode3 extends RemoteNodeDeathWatchSlowSpec abstract class RemoteNodeDeathWatchSlowSpec extends RemoteNodeDeathWatchSpec { override def scenario = "slow" override def sleep(): Unit = Thread.sleep(3000) @@ -128,6 +131,11 @@ abstract class RemoteNodeDeathWatchSpec system.stop(subject) } + runOn(third) { + enterBarrier("actors-started-1") + enterBarrier("watch-established-1") + } + enterBarrier("terminated-verified-1") // verify that things are cleaned up, and heartbeating is stopped @@ -156,6 +164,8 @@ abstract class RemoteNodeDeathWatchSpec runOn(second) { system.actorOf(Props(classOf[ProbeActor], testActor), "subject2") + } + runOn(second, third) { enterBarrier("actors-started-2") enterBarrier("watch-2") enterBarrier("unwatch-2") @@ -170,20 +180,28 @@ abstract class RemoteNodeDeathWatchSpec } "cleanup after bi-directional watch/unwatch" taggedAs LongRunningTest in { - val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher3") - system.actorOf(Props(classOf[ProbeActor], testActor), "subject3") - enterBarrier("actors-started-3") + runOn(first, second) { + val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher3") + system.actorOf(Props(classOf[ProbeActor], testActor), "subject3") + enterBarrier("actors-started-3") - val other = if (myself == first) second else first - val subject = identify(other, "subject3") - watcher ! WatchIt(subject) - expectMsg(1 second, Ack) - enterBarrier("watch-3") + val other = if (myself == first) second else first + val subject = identify(other, "subject3") + watcher ! WatchIt(subject) + expectMsg(1 second, Ack) + enterBarrier("watch-3") - sleep() - watcher ! UnwatchIt(subject) - expectMsg(1 second, Ack) - enterBarrier("unwatch-3") + sleep() + watcher ! UnwatchIt(subject) + expectMsg(1 second, Ack) + enterBarrier("unwatch-3") + } + + runOn(third) { + enterBarrier("actors-started-3") + enterBarrier("watch-3") + enterBarrier("unwatch-3") + } // verify that things are cleaned up, and heartbeating is stopped assertCleanup() @@ -194,28 +212,36 @@ abstract class RemoteNodeDeathWatchSpec } "cleanup after bi-directional watch/stop/unwatch" taggedAs LongRunningTest in { - val watcher1 = system.actorOf(Props(classOf[ProbeActor], testActor), "w1") - val watcher2 = system.actorOf(Props(classOf[ProbeActor], testActor), "w2") - system.actorOf(Props(classOf[ProbeActor], testActor), "s1") - val s2 = system.actorOf(Props(classOf[ProbeActor], testActor), "s2") - enterBarrier("actors-started-4") + runOn(first, second) { + val watcher1 = system.actorOf(Props(classOf[ProbeActor], testActor), "w1") + val watcher2 = system.actorOf(Props(classOf[ProbeActor], testActor), "w2") + system.actorOf(Props(classOf[ProbeActor], testActor), "s1") + val s2 = system.actorOf(Props(classOf[ProbeActor], testActor), "s2") + enterBarrier("actors-started-4") - val other = if (myself == first) second else first - val subject1 = identify(other, "s1") - val subject2 = identify(other, "s2") - watcher1 ! WatchIt(subject1) - expectMsg(1 second, Ack) - watcher2 ! WatchIt(subject2) - expectMsg(1 second, Ack) - enterBarrier("watch-4") + val other = if (myself == first) second else first + val subject1 = identify(other, "s1") + val subject2 = identify(other, "s2") + watcher1 ! WatchIt(subject1) + expectMsg(1 second, Ack) + watcher2 ! WatchIt(subject2) + expectMsg(1 second, Ack) + enterBarrier("watch-4") - sleep() - watcher1 ! UnwatchIt(subject1) - expectMsg(1 second, Ack) - system.stop(s2) - enterBarrier("unwatch-stop-4") + sleep() + watcher1 ! UnwatchIt(subject1) + expectMsg(1 second, Ack) + system.stop(s2) + enterBarrier("unwatch-stop-4") - expectMsgType[WrappedTerminated].t.actor must be(subject2) + expectMsgType[WrappedTerminated].t.actor must be(subject2) + } + + runOn(third) { + enterBarrier("actors-started-4") + enterBarrier("watch-4") + enterBarrier("unwatch-stop-4") + } // verify that things are cleaned up, and heartbeating is stopped assertCleanup() @@ -307,10 +333,17 @@ abstract class RemoteNodeDeathWatchSpec assertCleanup() } + runOn(third) { + enterBarrier("actors-started-5") + enterBarrier("watch-established-5") + enterBarrier("stopped-5") + enterBarrier("terminated-verified-5") + } + enterBarrier("after-5") } - "receive Terminated when watched node is shutdown" taggedAs LongRunningTest in { + "receive Terminated when watched node crash" taggedAs LongRunningTest in { runOn(first) { val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher6") val watcher2 = system.actorOf(Props(classOf[ProbeActor], system.deadLetters)) @@ -332,7 +365,7 @@ abstract class RemoteNodeDeathWatchSpec sleep() - log.info("shutdown second") + log.info("exit second") testConductor.exit(second, 0).await expectMsgType[WrappedTerminated](15 seconds).t.actor must be(subject) @@ -350,8 +383,47 @@ abstract class RemoteNodeDeathWatchSpec enterBarrier("watch-established-6") } + runOn(third) { + enterBarrier("actors-started-6") + enterBarrier("watch-established-6") + } + enterBarrier("after-6") } + "cleanup when watching node crash" taggedAs LongRunningTest in { + runOn(third) { + val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher7") + enterBarrier("actors-started-7") + + val subject = identify(first, "subject7") + watcher ! WatchIt(subject) + expectMsg(1 second, Ack) + subject ! "hello7" + enterBarrier("watch-established-7") + } + + runOn(first) { + system.actorOf(Props(classOf[ProbeActor], testActor), "subject7") + enterBarrier("actors-started-7") + + expectMsg(3 seconds, "hello7") + enterBarrier("watch-established-7") + + sleep() + log.info("exit third") + testConductor.exit(third, 0).await + + // verify that things are cleaned up, and heartbeating is stopped + within(20 seconds) { + assertCleanup() + expectNoMsg(2.seconds) + assertCleanup() + } + } + + enterBarrier("after-7") + } + } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 446f2b39df..70701698db 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -168,7 +168,7 @@ private[akka] class RemoteWatcher( watchedByNodes += sender // watch back to stop heartbeating if other side dies context watch sender - watching += ((sender, self)) + addWatching(sender, self) } def endHeartbeatRequest(): Unit = { @@ -201,21 +201,25 @@ private[akka] class RemoteWatcher( logActorForDeprecationWarning(watchee) else if (watcher != self) { log.debug("Watching: [{} -> {}]", watcher.path, watchee.path) - watching += ((watchee, watcher)) - val watcheeAddress = watchee.path.address - if (!watchingNodes(watcheeAddress) && unreachable(watcheeAddress)) { - // first watch to that node after a previous unreachable - unreachable -= watcheeAddress - failureDetector.remove(watcheeAddress) - } - watchingNodes += watcheeAddress - endWatchingNodes -= watcheeAddress + addWatching(watchee, watcher) // also watch from self, to be able to cleanup on termination of the watchee context watch watchee watching += ((watchee, self)) } + def addWatching(watchee: ActorRef, watcher: ActorRef): Unit = { + watching += ((watchee, watcher)) + val watcheeAddress = watchee.path.address + if (!watchingNodes(watcheeAddress) && unreachable(watcheeAddress)) { + // first watch to that node after a previous unreachable + unreachable -= watcheeAddress + failureDetector.remove(watcheeAddress) + } + watchingNodes += watcheeAddress + endWatchingNodes -= watcheeAddress + } + def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) logActorForDeprecationWarning(watchee) @@ -247,8 +251,8 @@ private[akka] class RemoteWatcher( watching = watching.filterNot { case (wee, _) ⇒ wee == watchee } - checkLastUnwatchOfNode(watchee.path.address) } + checkLastUnwatchOfNode(watchee.path.address) } def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index f1388e0ff7..d6527e0026 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -192,13 +192,18 @@ class RemoteWatcherSpec extends AkkaSpec( monitorB.tell(HeartbeatRequest, monitorA) monitorB ! Stats // HeartbeatRequest adds cross watch to RemoteWatcher peer - expectMsg(Stats.counts(watching = 1, watchingNodes = 0, watchedByNodes = 1)) + val stats = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1)) + stats.watchingRefs must be(Set((monitorA, monitorB))) expectNoMsg(100 millis) monitorB ! HeartbeatTick expectMsg(heartbeatMsgB) + // HeartbeatRequest for the cross watch + expectMsg(HeartbeatRequest) expectNoMsg(100 millis) monitorB ! HeartbeatTick expectMsg(heartbeatMsgB) + // HeartbeatRequest for the cross watch + expectMsg(HeartbeatRequest) expectNoMsg(100 millis) // unwatch @@ -208,15 +213,19 @@ class RemoteWatcherSpec extends AkkaSpec( expectMsg(Stats.empty) expectNoMsg(100 millis) monitorB ! HeartbeatTick + // EndHeartbeatRequest for the cross watch + expectMsg(EndHeartbeatRequest) expectNoMsg(100 millis) // start heartbeating again monitorB.tell(HeartbeatRequest, monitorA) monitorB ! Stats - expectMsg(Stats.counts(watching = 1, watchingNodes = 0, watchedByNodes = 1)) + val stats2 = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1)) + stats2.watchingRefs must be(Set((monitorA, monitorB))) expectNoMsg(100 millis) monitorB ! HeartbeatTick expectMsg(heartbeatMsgB) + expectMsg(HeartbeatRequest) expectNoMsg(100 millis) // then kill other side, which should stop the heartbeating @@ -226,6 +235,7 @@ class RemoteWatcherSpec extends AkkaSpec( expectMsg(Stats.empty) } monitorB ! HeartbeatTick + // no more heartbeats and no EndHeartbeatRequest for the cross watch expectNoMsg(500 millis) // make sure nothing floods over to next test