Stop heartbeating when watching node crash, see #3265

This commit is contained in:
Patrik Nordwall 2013-04-25 21:25:46 +02:00
parent b62f5f46f5
commit 551e2d1321
4 changed files with 136 additions and 47 deletions

View file

@ -82,7 +82,9 @@ private[cluster] class ClusterRemoteWatcher(
case state: CurrentClusterState
clusterNodes = state.members.collect { case m if m.address != selfAddress m.address }
clusterNodes foreach takeOverResponsibility
unreachable = state.unreachable.collect { case m if m.address != selfAddress m.address }
val clusterUnreachable = state.unreachable.collect { case m if m.address != selfAddress m.address }
unreachable --= clusterNodes
unreachable ++= clusterUnreachable
case MemberUp(m)
if (m.address != selfAddress) {
clusterNodes += m.address
@ -101,6 +103,7 @@ private[cluster] class ClusterRemoteWatcher(
}
publishAddressTerminated(m.address)
}
case _: MemberEvent // not interesting
}
/**

View file

@ -22,6 +22,7 @@ import akka.testkit._
object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
@ -59,12 +60,14 @@ object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig {
class RemoteNodeDeathWatchFastMultiJvmNode1 extends RemoteNodeDeathWatchFastSpec
class RemoteNodeDeathWatchFastMultiJvmNode2 extends RemoteNodeDeathWatchFastSpec
class RemoteNodeDeathWatchFastMultiJvmNode3 extends RemoteNodeDeathWatchFastSpec
abstract class RemoteNodeDeathWatchFastSpec extends RemoteNodeDeathWatchSpec {
override def scenario = "fast"
}
class RemoteNodeDeathWatchSlowMultiJvmNode1 extends RemoteNodeDeathWatchSlowSpec
class RemoteNodeDeathWatchSlowMultiJvmNode2 extends RemoteNodeDeathWatchSlowSpec
class RemoteNodeDeathWatchSlowMultiJvmNode3 extends RemoteNodeDeathWatchSlowSpec
abstract class RemoteNodeDeathWatchSlowSpec extends RemoteNodeDeathWatchSpec {
override def scenario = "slow"
override def sleep(): Unit = Thread.sleep(3000)
@ -128,6 +131,11 @@ abstract class RemoteNodeDeathWatchSpec
system.stop(subject)
}
runOn(third) {
enterBarrier("actors-started-1")
enterBarrier("watch-established-1")
}
enterBarrier("terminated-verified-1")
// verify that things are cleaned up, and heartbeating is stopped
@ -156,6 +164,8 @@ abstract class RemoteNodeDeathWatchSpec
runOn(second) {
system.actorOf(Props(classOf[ProbeActor], testActor), "subject2")
}
runOn(second, third) {
enterBarrier("actors-started-2")
enterBarrier("watch-2")
enterBarrier("unwatch-2")
@ -170,20 +180,28 @@ abstract class RemoteNodeDeathWatchSpec
}
"cleanup after bi-directional watch/unwatch" taggedAs LongRunningTest in {
val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher3")
system.actorOf(Props(classOf[ProbeActor], testActor), "subject3")
enterBarrier("actors-started-3")
runOn(first, second) {
val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher3")
system.actorOf(Props(classOf[ProbeActor], testActor), "subject3")
enterBarrier("actors-started-3")
val other = if (myself == first) second else first
val subject = identify(other, "subject3")
watcher ! WatchIt(subject)
expectMsg(1 second, Ack)
enterBarrier("watch-3")
val other = if (myself == first) second else first
val subject = identify(other, "subject3")
watcher ! WatchIt(subject)
expectMsg(1 second, Ack)
enterBarrier("watch-3")
sleep()
watcher ! UnwatchIt(subject)
expectMsg(1 second, Ack)
enterBarrier("unwatch-3")
sleep()
watcher ! UnwatchIt(subject)
expectMsg(1 second, Ack)
enterBarrier("unwatch-3")
}
runOn(third) {
enterBarrier("actors-started-3")
enterBarrier("watch-3")
enterBarrier("unwatch-3")
}
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
@ -194,28 +212,36 @@ abstract class RemoteNodeDeathWatchSpec
}
"cleanup after bi-directional watch/stop/unwatch" taggedAs LongRunningTest in {
val watcher1 = system.actorOf(Props(classOf[ProbeActor], testActor), "w1")
val watcher2 = system.actorOf(Props(classOf[ProbeActor], testActor), "w2")
system.actorOf(Props(classOf[ProbeActor], testActor), "s1")
val s2 = system.actorOf(Props(classOf[ProbeActor], testActor), "s2")
enterBarrier("actors-started-4")
runOn(first, second) {
val watcher1 = system.actorOf(Props(classOf[ProbeActor], testActor), "w1")
val watcher2 = system.actorOf(Props(classOf[ProbeActor], testActor), "w2")
system.actorOf(Props(classOf[ProbeActor], testActor), "s1")
val s2 = system.actorOf(Props(classOf[ProbeActor], testActor), "s2")
enterBarrier("actors-started-4")
val other = if (myself == first) second else first
val subject1 = identify(other, "s1")
val subject2 = identify(other, "s2")
watcher1 ! WatchIt(subject1)
expectMsg(1 second, Ack)
watcher2 ! WatchIt(subject2)
expectMsg(1 second, Ack)
enterBarrier("watch-4")
val other = if (myself == first) second else first
val subject1 = identify(other, "s1")
val subject2 = identify(other, "s2")
watcher1 ! WatchIt(subject1)
expectMsg(1 second, Ack)
watcher2 ! WatchIt(subject2)
expectMsg(1 second, Ack)
enterBarrier("watch-4")
sleep()
watcher1 ! UnwatchIt(subject1)
expectMsg(1 second, Ack)
system.stop(s2)
enterBarrier("unwatch-stop-4")
sleep()
watcher1 ! UnwatchIt(subject1)
expectMsg(1 second, Ack)
system.stop(s2)
enterBarrier("unwatch-stop-4")
expectMsgType[WrappedTerminated].t.actor must be(subject2)
expectMsgType[WrappedTerminated].t.actor must be(subject2)
}
runOn(third) {
enterBarrier("actors-started-4")
enterBarrier("watch-4")
enterBarrier("unwatch-stop-4")
}
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
@ -307,10 +333,17 @@ abstract class RemoteNodeDeathWatchSpec
assertCleanup()
}
runOn(third) {
enterBarrier("actors-started-5")
enterBarrier("watch-established-5")
enterBarrier("stopped-5")
enterBarrier("terminated-verified-5")
}
enterBarrier("after-5")
}
"receive Terminated when watched node is shutdown" taggedAs LongRunningTest in {
"receive Terminated when watched node crash" taggedAs LongRunningTest in {
runOn(first) {
val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher6")
val watcher2 = system.actorOf(Props(classOf[ProbeActor], system.deadLetters))
@ -332,7 +365,7 @@ abstract class RemoteNodeDeathWatchSpec
sleep()
log.info("shutdown second")
log.info("exit second")
testConductor.exit(second, 0).await
expectMsgType[WrappedTerminated](15 seconds).t.actor must be(subject)
@ -350,8 +383,47 @@ abstract class RemoteNodeDeathWatchSpec
enterBarrier("watch-established-6")
}
runOn(third) {
enterBarrier("actors-started-6")
enterBarrier("watch-established-6")
}
enterBarrier("after-6")
}
"cleanup when watching node crash" taggedAs LongRunningTest in {
runOn(third) {
val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher7")
enterBarrier("actors-started-7")
val subject = identify(first, "subject7")
watcher ! WatchIt(subject)
expectMsg(1 second, Ack)
subject ! "hello7"
enterBarrier("watch-established-7")
}
runOn(first) {
system.actorOf(Props(classOf[ProbeActor], testActor), "subject7")
enterBarrier("actors-started-7")
expectMsg(3 seconds, "hello7")
enterBarrier("watch-established-7")
sleep()
log.info("exit third")
testConductor.exit(third, 0).await
// verify that things are cleaned up, and heartbeating is stopped
within(20 seconds) {
assertCleanup()
expectNoMsg(2.seconds)
assertCleanup()
}
}
enterBarrier("after-7")
}
}
}

View file

@ -168,7 +168,7 @@ private[akka] class RemoteWatcher(
watchedByNodes += sender
// watch back to stop heartbeating if other side dies
context watch sender
watching += ((sender, self))
addWatching(sender, self)
}
def endHeartbeatRequest(): Unit = {
@ -201,21 +201,25 @@ private[akka] class RemoteWatcher(
logActorForDeprecationWarning(watchee)
else if (watcher != self) {
log.debug("Watching: [{} -> {}]", watcher.path, watchee.path)
watching += ((watchee, watcher))
val watcheeAddress = watchee.path.address
if (!watchingNodes(watcheeAddress) && unreachable(watcheeAddress)) {
// first watch to that node after a previous unreachable
unreachable -= watcheeAddress
failureDetector.remove(watcheeAddress)
}
watchingNodes += watcheeAddress
endWatchingNodes -= watcheeAddress
addWatching(watchee, watcher)
// also watch from self, to be able to cleanup on termination of the watchee
context watch watchee
watching += ((watchee, self))
}
def addWatching(watchee: ActorRef, watcher: ActorRef): Unit = {
watching += ((watchee, watcher))
val watcheeAddress = watchee.path.address
if (!watchingNodes(watcheeAddress) && unreachable(watcheeAddress)) {
// first watch to that node after a previous unreachable
unreachable -= watcheeAddress
failureDetector.remove(watcheeAddress)
}
watchingNodes += watcheeAddress
endWatchingNodes -= watcheeAddress
}
def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit =
if (watchee.path.uid == akka.actor.ActorCell.undefinedUid)
logActorForDeprecationWarning(watchee)
@ -247,8 +251,8 @@ private[akka] class RemoteWatcher(
watching = watching.filterNot {
case (wee, _) wee == watchee
}
checkLastUnwatchOfNode(watchee.path.address)
}
checkLastUnwatchOfNode(watchee.path.address)
}
def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = {

View file

@ -192,13 +192,18 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorB.tell(HeartbeatRequest, monitorA)
monitorB ! Stats
// HeartbeatRequest adds cross watch to RemoteWatcher peer
expectMsg(Stats.counts(watching = 1, watchingNodes = 0, watchedByNodes = 1))
val stats = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1))
stats.watchingRefs must be(Set((monitorA, monitorB)))
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
expectMsg(heartbeatMsgB)
// HeartbeatRequest for the cross watch
expectMsg(HeartbeatRequest)
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
expectMsg(heartbeatMsgB)
// HeartbeatRequest for the cross watch
expectMsg(HeartbeatRequest)
expectNoMsg(100 millis)
// unwatch
@ -208,15 +213,19 @@ class RemoteWatcherSpec extends AkkaSpec(
expectMsg(Stats.empty)
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
// EndHeartbeatRequest for the cross watch
expectMsg(EndHeartbeatRequest)
expectNoMsg(100 millis)
// start heartbeating again
monitorB.tell(HeartbeatRequest, monitorA)
monitorB ! Stats
expectMsg(Stats.counts(watching = 1, watchingNodes = 0, watchedByNodes = 1))
val stats2 = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1))
stats2.watchingRefs must be(Set((monitorA, monitorB)))
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
expectMsg(heartbeatMsgB)
expectMsg(HeartbeatRequest)
expectNoMsg(100 millis)
// then kill other side, which should stop the heartbeating
@ -226,6 +235,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectMsg(Stats.empty)
}
monitorB ! HeartbeatTick
// no more heartbeats and no EndHeartbeatRequest for the cross watch
expectNoMsg(500 millis)
// make sure nothing floods over to next test