Changed design of RemoteWatcher due to cleanup race, see #3265

* The problem was a race caused by HeartbeatReq sent out, and
  the watchee terminated immediately. That caused the RemoteWatcher
  peers watching each other without any other watch registered.
  It is racy.
* Instead of one-way heartbeats from the side beeing watched I
  changed to ping-pong style. That makes the problem go away
  and simplifies a lot of things in RemoteWatcher.
This commit is contained in:
Patrik Nordwall 2013-05-04 16:22:40 +02:00
parent cb6ba83f38
commit 7628889b43
10 changed files with 89 additions and 221 deletions

View file

@ -65,8 +65,7 @@ private[akka] class ClusterActorRefProvider(
failureDetector, failureDetector,
heartbeatInterval = WatchHeartBeatInterval, heartbeatInterval = WatchHeartBeatInterval,
unreachableReaperInterval = WatchUnreachableReaperInterval, unreachableReaperInterval = WatchUnreachableReaperInterval,
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter, heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher")
} }
/** /**

View file

@ -26,10 +26,9 @@ private[cluster] object ClusterRemoteWatcher {
failureDetector: FailureDetectorRegistry[Address], failureDetector: FailureDetectorRegistry[Address],
heartbeatInterval: FiniteDuration, heartbeatInterval: FiniteDuration,
unreachableReaperInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration,
heartbeatExpectedResponseAfter: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration): Props =
numberOfEndHeartbeatRequests: Int): Props =
Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests) heartbeatExpectedResponseAfter)
} }
/** /**
@ -47,14 +46,12 @@ private[cluster] class ClusterRemoteWatcher(
failureDetector: FailureDetectorRegistry[Address], failureDetector: FailureDetectorRegistry[Address],
heartbeatInterval: FiniteDuration, heartbeatInterval: FiniteDuration,
unreachableReaperInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration,
heartbeatExpectedResponseAfter: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration)
numberOfEndHeartbeatRequests: Int)
extends RemoteWatcher( extends RemoteWatcher(
failureDetector, failureDetector,
heartbeatInterval, heartbeatInterval,
unreachableReaperInterval, unreachableReaperInterval,
heartbeatExpectedResponseAfter, heartbeatExpectedResponseAfter) {
numberOfEndHeartbeatRequests) {
import RemoteWatcher._ import RemoteWatcher._

View file

@ -86,6 +86,8 @@ abstract class RemoteNodeDeathWatchSpec
override def initialParticipants = roles.size override def initialParticipants = roles.size
muteDeadLetters(Heartbeat.getClass)()
lazy val remoteWatcher: ActorRef = { lazy val remoteWatcher: ActorRef = {
system.actorSelection("/system/remote-watcher") ! Identify(None) system.actorSelection("/system/remote-watcher") ! Identify(None)
expectMsgType[ActorIdentity].ref.get expectMsgType[ActorIdentity].ref.get

View file

@ -209,11 +209,6 @@ akka {
# been received. # been received.
expected-response-after = 3 s expected-response-after = 3 s
# When a node unwatch another node it will end that
# with this number of EndHeartbeatRequest messages, which will stop the
# heartbeating from the other side
nr-of-end-heartbeats = 8
} }
# After failed to establish an outbound connection, the remoting will mark the # After failed to establish an outbound connection, the remoting will mark the

View file

@ -191,8 +191,7 @@ private[akka] class RemoteActorRefProvider(
failureDetector, failureDetector,
heartbeatInterval = WatchHeartBeatInterval, heartbeatInterval = WatchHeartBeatInterval,
unreachableReaperInterval = WatchUnreachableReaperInterval, unreachableReaperInterval = WatchUnreachableReaperInterval,
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter, heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher")
} }
protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = { protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = {

View file

@ -87,9 +87,6 @@ class RemoteSettings(val config: Config) {
val WatchUnreachableReaperInterval: FiniteDuration = { val WatchUnreachableReaperInterval: FiniteDuration = {
Duration(WatchFailureDetectorConfig.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) Duration(WatchFailureDetectorConfig.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS)
} requiring (_ > Duration.Zero, "watch-failure-detector.unreachable-nodes-reaper-interval must be > 0") } requiring (_ > Duration.Zero, "watch-failure-detector.unreachable-nodes-reaper-interval must be > 0")
val WatchNumberOfEndHeartbeatRequests: Int = {
WatchFailureDetectorConfig.getInt("nr-of-end-heartbeats")
} requiring (_ > 0, "watch-failure-detector.nr-of-end-heartbeats must be > 0")
val WatchHeartbeatExpectedResponseAfter: FiniteDuration = { val WatchHeartbeatExpectedResponseAfter: FiniteDuration = {
Duration(WatchFailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS) Duration(WatchFailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS)
} requiring (_ > Duration.Zero, "watch-failure-detector.expected-response-after > 0") } requiring (_ > Duration.Zero, "watch-failure-detector.expected-response-after > 0")

View file

@ -28,17 +28,15 @@ private[akka] object RemoteWatcher {
failureDetector: FailureDetectorRegistry[Address], failureDetector: FailureDetectorRegistry[Address],
heartbeatInterval: FiniteDuration, heartbeatInterval: FiniteDuration,
unreachableReaperInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration,
heartbeatExpectedResponseAfter: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration): Props =
numberOfEndHeartbeatRequests: Int): Props =
Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests) heartbeatExpectedResponseAfter)
case class WatchRemote(watchee: ActorRef, watcher: ActorRef) case class WatchRemote(watchee: ActorRef, watcher: ActorRef)
case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef) case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef)
@SerialVersionUID(1L) case object HeartbeatRequest @SerialVersionUID(1L) case object Heartbeat
@SerialVersionUID(1L) case object EndHeartbeatRequest @SerialVersionUID(1L) case class HeartbeatRsp(addressUid: Int)
@SerialVersionUID(1L) case class Heartbeat(addressUid: Int)
// sent to self only // sent to self only
case object HeartbeatTick case object HeartbeatTick
@ -47,17 +45,17 @@ private[akka] object RemoteWatcher {
// test purpose // test purpose
object Stats { object Stats {
lazy val empty: Stats = counts(0, 0, 0) lazy val empty: Stats = counts(0, 0)
def counts(watching: Int, watchingNodes: Int, watchedByNodes: Int): Stats = def counts(watching: Int, watchingNodes: Int): Stats =
new Stats(watching, watchingNodes, watchedByNodes)(Set.empty) new Stats(watching, watchingNodes)(Set.empty)
} }
case class Stats(watching: Int, watchingNodes: Int, watchedByNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) { case class Stats(watching: Int, watchingNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) {
override def toString: String = { override def toString: String = {
def formatWatchingRefs: String = def formatWatchingRefs: String =
if (watchingRefs.isEmpty) "" if (watchingRefs.isEmpty) ""
else ", watchingRefs=" + watchingRefs.map(x x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]") else ", watchingRefs=" + watchingRefs.map(x x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]")
s"Stats(watching=${watching}, watchingNodes=${watchingNodes}, watchedByNodes=${watchedByNodes}${formatWatchingRefs})" s"Stats(watching=${watching}, watchingNodes=${watchingNodes}${formatWatchingRefs})"
} }
} }
} }
@ -70,18 +68,13 @@ private[akka] object RemoteWatcher {
* intercepts Watch and Unwatch system messages and sends corresponding * intercepts Watch and Unwatch system messages and sends corresponding
* [[RemoteWatcher.WatchRemote]] and [[RemoteWatcher.UnwatchRemote]] to this actor. * [[RemoteWatcher.WatchRemote]] and [[RemoteWatcher.UnwatchRemote]] to this actor.
* *
* For a new node to be watched this actor starts the monitoring by sending [[RemoteWatcher.HeartbeatRequest]] * For a new node to be watched this actor periodically sends [[RemoteWatcher.Heartbeat]]
* to the peer actor on the other node, which then sends periodic [[RemoteWatcher.Heartbeat]] * to the peer actor on the other node, which replies with [[RemoteWatcher.HeartbeatRsp]]
* messages back. The failure detector on the watching side monitors these heartbeat messages. * message back. The failure detector on the watching side monitors these heartbeat messages.
* If arrival of hearbeat messages stops it will be detected and this actor will publish * If arrival of hearbeat messages stops it will be detected and this actor will publish
* [[akka.actor.AddressTerminated]] to the `eventStream`. * [[akka.actor.AddressTerminated]] to the `eventStream`.
* *
* When all actors on a node have been unwatched, or terminated, this actor sends * When all actors on a node have been unwatched it will stop sending heartbeat messages.
* [[RemoteWatcher.EndHeartbeatRequest]] messages to the peer actor on the other node,
* which will then stop sending heartbeat messages.
*
* The actor sending heartbeat messages will also watch the peer on the other node,
* to be able to stop sending heartbeat messages in case of network failure or JVM crash.
* *
* For bi-directional watch between two nodes the same thing will be established in * For bi-directional watch between two nodes the same thing will be established in
* both directions, but independent of each other. * both directions, but independent of each other.
@ -91,8 +84,7 @@ private[akka] class RemoteWatcher(
failureDetector: FailureDetectorRegistry[Address], failureDetector: FailureDetectorRegistry[Address],
heartbeatInterval: FiniteDuration, heartbeatInterval: FiniteDuration,
unreachableReaperInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration,
heartbeatExpectedResponseAfter: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration)
numberOfEndHeartbeatRequests: Int)
extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import RemoteWatcher._ import RemoteWatcher._
@ -105,16 +97,13 @@ private[akka] class RemoteWatcher(
s"ActorSystem [${context.system}] needs to have a 'RemoteActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]") s"ActorSystem [${context.system}] needs to have a 'RemoteActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]")
} }
val selfHeartbeatMsg = Heartbeat(AddressUidExtension(context.system).addressUid) val selfHeartbeatRspMsg = HeartbeatRsp(AddressUidExtension(context.system).addressUid)
// actors that this node is watching, tuple with (watcher, watchee) // actors that this node is watching, tuple with (watcher, watchee)
var watching: Set[(ActorRef, ActorRef)] = Set.empty var watching: Set[(ActorRef, ActorRef)] = Set.empty
// nodes that this node is watching, i.e. expecting hearteats from these nodes // nodes that this node is watching, i.e. expecting hearteats from these nodes
var watchingNodes: Set[Address] = Set.empty var watchingNodes: Set[Address] = Set.empty
// heartbeats will be sent to watchedByNodes, ref is RemoteWatcher at other side
var watchedByNodes: Set[ActorRef] = Set.empty
var unreachable: Set[Address] = Set.empty var unreachable: Set[Address] = Set.empty
var endWatchingNodes: Map[Address, Int] = Map.empty
var addressUids: Map[Address, Int] = Map.empty var addressUids: Map[Address, Int] = Map.empty
val heartbeatTask = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick) val heartbeatTask = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
@ -128,14 +117,10 @@ private[akka] class RemoteWatcher(
} }
def receive = { def receive = {
case HeartbeatTick case HeartbeatTick sendHeartbeat()
sendHeartbeat() case Heartbeat receiveHeartbeat()
sendHeartbeatRequest() case HeartbeatRsp(uid) receiveHeartbeatRsp(uid)
sendEndHeartbeatRequest()
case Heartbeat(uid) heartbeat(uid)
case ReapUnreachableTick reapUnreachable() case ReapUnreachableTick reapUnreachable()
case HeartbeatRequest heartbeatRequest()
case EndHeartbeatRequest endHeartbeatRequest()
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from) case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
case WatchRemote(watchee, watcher) watchRemote(watchee, watcher) case WatchRemote(watchee, watcher) watchRemote(watchee, watcher)
case UnwatchRemote(watchee, watcher) unwatchRemote(watchee, watcher) case UnwatchRemote(watchee, watcher) unwatchRemote(watchee, watcher)
@ -145,17 +130,19 @@ private[akka] class RemoteWatcher(
case Stats case Stats
sender ! Stats( sender ! Stats(
watching = watching.size, watching = watching.size,
watchingNodes = watchingNodes.size, watchingNodes = watchingNodes.size)(watching)
watchedByNodes = watchedByNodes.size)(watching)
} }
def heartbeat(uid: Int): Unit = { def receiveHeartbeat(): Unit =
sender ! selfHeartbeatRspMsg
def receiveHeartbeatRsp(uid: Int): Unit = {
val from = sender.path.address val from = sender.path.address
if (failureDetector.isMonitoring(from)) if (failureDetector.isMonitoring(from))
log.debug("Received heartbeat from [{}]", from) log.debug("Received heartbeat rsp from [{}]", from)
else else
log.debug("Received first heartbeat from [{}]", from) log.debug("Received first heartbeat rsp from [{}]", from)
if (watchingNodes(from) && !unreachable(from)) { if (watchingNodes(from) && !unreachable(from)) {
addressUids += (from -> uid) addressUids += (from -> uid)
@ -163,24 +150,6 @@ private[akka] class RemoteWatcher(
} }
} }
def heartbeatRequest(): Unit = {
// request to start sending heartbeats to the node
log.debug("Received HeartbeatRequest from [{}]", sender.path.address)
watchedByNodes += sender
// watch back to stop heartbeating if other side dies
context watch sender
addWatching(sender, self)
}
def endHeartbeatRequest(): Unit = {
// request to stop sending heartbeats to the node
log.debug("Received EndHeartbeatRequest from [{}]", sender.path.address)
watchedByNodes -= sender
context unwatch sender
watching -= ((sender, self))
checkLastUnwatchOfNode(sender.path.address)
}
def reapUnreachable(): Unit = def reapUnreachable(): Unit =
watchingNodes foreach { a watchingNodes foreach { a
if (!unreachable(a) && !failureDetector.isAvailable(a)) { if (!unreachable(a) && !failureDetector.isAvailable(a)) {
@ -218,7 +187,6 @@ private[akka] class RemoteWatcher(
failureDetector.remove(watcheeAddress) failureDetector.remove(watcheeAddress)
} }
watchingNodes += watcheeAddress watchingNodes += watcheeAddress
endWatchingNodes -= watcheeAddress
} }
def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit =
@ -242,70 +210,37 @@ private[akka] class RemoteWatcher(
} }
def terminated(watchee: ActorRef): Unit = { def terminated(watchee: ActorRef): Unit = {
if (matchingPathElements(self, watchee)) {
log.debug("Other side terminated: [{}]", watchee.path)
// stop heartbeating to that node immediately, and cleanup
watchedByNodes -= watchee
watching -= ((watchee, self))
} else {
log.debug("Watchee terminated: [{}]", watchee.path) log.debug("Watchee terminated: [{}]", watchee.path)
watching = watching.filterNot { watching = watching.filterNot {
case (wee, _) wee == watchee case (wee, _) wee == watchee
} }
}
checkLastUnwatchOfNode(watchee.path.address) checkLastUnwatchOfNode(watchee.path.address)
} }
def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = { def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = {
if (watchingNodes(watcheeAddress) && watching.forall { if (watchingNodes(watcheeAddress) && watching.forall {
case (wee, wer) wee.path.address != watcheeAddress || (wer == self && matchingPathElements(self, wee)) case (wee, wer) wee.path.address != watcheeAddress
}) { }) {
// unwatched last watchee on that node, not counting RemoteWatcher peer // unwatched last watchee on that node
log.debug("Unwatched last watchee of node: [{}]", watcheeAddress) log.debug("Unwatched last watchee of node: [{}]", watcheeAddress)
watchingNodes -= watcheeAddress watchingNodes -= watcheeAddress
addressUids -= watcheeAddress addressUids -= watcheeAddress
// continue by sending EndHeartbeatRequest for a while
endWatchingNodes += (watcheeAddress -> 0)
failureDetector.remove(watcheeAddress) failureDetector.remove(watcheeAddress)
} }
} }
def matchingPathElements(a: ActorRef, b: ActorRef): Boolean =
a.path.elements == b.path.elements
def sendHeartbeat(): Unit = def sendHeartbeat(): Unit =
watchedByNodes foreach { ref watchingNodes foreach { a
val a = ref.path.address
if (!unreachable(a)) { if (!unreachable(a)) {
log.debug("Sending Heartbeat to [{}]", ref.path.address) if (failureDetector.isMonitoring(a)) {
ref ! selfHeartbeatMsg log.debug("Sending Heartbeat to [{}]", a)
}
}
def sendHeartbeatRequest(): Unit =
watchingNodes.foreach { a
if (!unreachable(a) && !failureDetector.isMonitoring(a)) {
log.debug("Sending HeartbeatRequest to [{}]", a)
context.actorSelection(RootActorPath(a) / self.path.elements) ! HeartbeatRequest
// schedule the expected heartbeat for later, which will give the
// other side a chance to start heartbeating, and also trigger some resends of
// the heartbeat request
scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a))
endWatchingNodes -= a
}
}
def sendEndHeartbeatRequest(): Unit =
endWatchingNodes.foreach {
case (a, count)
if (!unreachable(a)) {
log.debug("Sending EndHeartbeatRequest to [{}]", a)
context.actorSelection(RootActorPath(a) / self.path.elements) ! EndHeartbeatRequest
}
if (count == numberOfEndHeartbeatRequests - 1) {
endWatchingNodes -= a
} else { } else {
endWatchingNodes += (a -> (count + 1)) log.debug("Sending first Heartbeat to [{}]", a)
// schedule the expected first heartbeat for later, which will give the
// other side a chance to reply, and also trigger some resends if needed
scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a))
}
context.actorSelection(RootActorPath(a) / self.path.elements) ! Heartbeat
} }
} }

View file

@ -51,7 +51,6 @@ class RemoteConfigSpec extends AkkaSpec(
WatchFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) WatchFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName)
WatchHeartBeatInterval must be(1 seconds) WatchHeartBeatInterval must be(1 seconds)
WatchNumberOfEndHeartbeatRequests must be(8)
WatchHeartbeatExpectedResponseAfter must be(3 seconds) WatchHeartbeatExpectedResponseAfter must be(3 seconds)
WatchUnreachableReaperInterval must be(1 second) WatchUnreachableReaperInterval must be(1 second)
WatchFailureDetectorConfig.getDouble("threshold") must be(10.0 plusOrMinus 0.0001) WatchFailureDetectorConfig.getDouble("threshold") must be(10.0 plusOrMinus 0.0001)

View file

@ -52,8 +52,7 @@ object RemoteWatcherSpec {
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector, class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector,
heartbeatInterval = TurnOff, heartbeatInterval = TurnOff,
unreachableReaperInterval = TurnOff, unreachableReaperInterval = TurnOff,
heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter, heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter) {
numberOfEndHeartbeatRequests = 3) {
def this() = this(heartbeatExpectedResponseAfter = TurnOff) def this() = this(heartbeatExpectedResponseAfter = TurnOff)
@ -99,7 +98,7 @@ class RemoteWatcherSpec extends AkkaSpec(
remoteSystem.shutdown() remoteSystem.shutdown()
} }
val heartbeatMsgB = Heartbeat(remoteAddressUid) val heartbeatRspB = HeartbeatRsp(remoteAddressUid)
def createRemoteActor(props: Props, name: String): ActorRef = { def createRemoteActor(props: Props, name: String): ActorRef = {
remoteSystem.actorOf(props, name) remoteSystem.actorOf(props, name)
@ -126,32 +125,35 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! Stats monitorA ! Stats
// for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2 // for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2
// (a1->b1), (a1->b2), (a2->b2) // (a1->b1), (a1->b2), (a2->b2)
expectMsg(Stats.counts(watching = 5, watchingNodes = 1, watchedByNodes = 0)) expectMsg(Stats.counts(watching = 5, watchingNodes = 1))
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(HeartbeatRequest) expectMsg(Heartbeat)
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(HeartbeatRequest) expectMsg(Heartbeat)
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA.tell(heartbeatMsgB, monitorB) monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b1, a1) monitorA ! UnwatchRemote(b1, a1)
// still (a1->b2) and (a2->b2) left // still (a1->b2) and (a2->b2) left
monitorA ! Stats monitorA ! Stats
expectMsg(Stats.counts(watching = 3, watchingNodes = 1, watchedByNodes = 0)) expectMsg(Stats.counts(watching = 3, watchingNodes = 1))
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b2, a2) monitorA ! UnwatchRemote(b2, a2)
// still (a1->b2) left // still (a1->b2) left
monitorA ! Stats monitorA ! Stats
expectMsg(Stats.counts(watching = 2, watchingNodes = 1, watchedByNodes = 0)) expectMsg(Stats.counts(watching = 2, watchingNodes = 1))
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b2, a1) monitorA ! UnwatchRemote(b2, a1)
@ -159,15 +161,7 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! Stats monitorA ! Stats
expectMsg(Stats.empty) expectMsg(Stats.empty)
expectNoMsg(100 millis) expectNoMsg(100 millis)
// expecting 3 EndHeartbeatRequest
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(EndHeartbeatRequest)
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectMsg(EndHeartbeatRequest)
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectMsg(EndHeartbeatRequest)
expectNoMsg(100 millis) expectNoMsg(100 millis)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectNoMsg(100 millis) expectNoMsg(100 millis)
@ -176,67 +170,6 @@ class RemoteWatcherSpec extends AkkaSpec(
expectNoMsg(2 seconds) expectNoMsg(2 seconds)
} }
"have correct interaction when beeing watched" in {
val monitorA = system.actorOf(Props(classOf[TestActorProxy], testActor), "monitor2")
val monitorB = createRemoteActor(Props[TestRemoteWatcher], "monitor2")
val b3 = createRemoteActor(Props[MyActor], "b3")
// watch
monitorB.tell(HeartbeatRequest, monitorA)
monitorB ! Stats
// HeartbeatRequest adds cross watch to RemoteWatcher peer
val stats = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1))
stats.watchingRefs must be(Set((monitorA, monitorB)))
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
expectMsg(heartbeatMsgB)
// HeartbeatRequest for the cross watch
expectMsg(HeartbeatRequest)
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
expectMsg(heartbeatMsgB)
// HeartbeatRequest for the cross watch
expectMsg(HeartbeatRequest)
expectNoMsg(100 millis)
// unwatch
monitorB.tell(EndHeartbeatRequest, monitorA)
monitorB ! Stats
// EndHeartbeatRequest should remove the cross watch
expectMsg(Stats.empty)
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
// EndHeartbeatRequest for the cross watch
expectMsg(EndHeartbeatRequest)
expectNoMsg(100 millis)
// start heartbeating again
monitorB.tell(HeartbeatRequest, monitorA)
monitorB ! Stats
val stats2 = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1))
stats2.watchingRefs must be(Set((monitorA, monitorB)))
expectNoMsg(100 millis)
monitorB ! HeartbeatTick
expectMsg(heartbeatMsgB)
expectMsg(HeartbeatRequest)
expectNoMsg(100 millis)
// then kill other side, which should stop the heartbeating
monitorA ! PoisonPill
awaitAssert {
monitorB ! Stats
expectMsg(Stats.empty)
}
monitorB ! HeartbeatTick
// no more heartbeats and no EndHeartbeatRequest for the cross watch
expectNoMsg(500 millis)
// make sure nothing floods over to next test
expectNoMsg(2 seconds)
}
"generate AddressTerminated when missing heartbeats" in { "generate AddressTerminated when missing heartbeats" in {
val p = TestProbe() val p = TestProbe()
val q = TestProbe() val q = TestProbe()
@ -252,14 +185,18 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(b, a) monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(HeartbeatRequest) expectMsg(Heartbeat)
monitorA.tell(heartbeatMsgB, monitorB) monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second) expectNoMsg(1 second)
monitorA.tell(heartbeatMsgB, monitorB) monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
within(10 seconds) { within(10 seconds) {
awaitAssert { awaitAssert {
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
@ -287,12 +224,14 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(b, a) monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(HeartbeatRequest) expectMsg(Heartbeat)
// no heartbeats sent // no HeartbeatRsp sent
within(20 seconds) { within(20 seconds) {
awaitAssert { awaitAssert {
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
// no quarantine when missing first heartbeat, uid unknown // no quarantine when missing first heartbeat, uid unknown
@ -300,14 +239,8 @@ class RemoteWatcherSpec extends AkkaSpec(
} }
} }
// some more HeartbeatRequest may be sent
receiveWhile(1.second) {
case HeartbeatRequest // ok
}
// make sure nothing floods over to next test // make sure nothing floods over to next test
expectNoMsg(2 seconds) expectNoMsg(2 seconds)
} }
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" in { "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
@ -325,14 +258,18 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(b, a) monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(HeartbeatRequest) expectMsg(Heartbeat)
monitorA.tell(heartbeatMsgB, monitorB) monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second) expectNoMsg(1 second)
monitorA.tell(heartbeatMsgB, monitorB) monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
within(10 seconds) { within(10 seconds) {
awaitAssert { awaitAssert {
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
@ -353,15 +290,21 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(c, a) monitorA ! WatchRemote(c, a)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(HeartbeatRequest) expectMsg(Heartbeat)
monitorA.tell(heartbeatMsgB, monitorB) monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second) expectNoMsg(1 second)
monitorA.tell(heartbeatMsgB, monitorB)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectNoMsg(1 second) p.expectNoMsg(1 second)
monitorA.tell(heartbeatMsgB, monitorB)
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectNoMsg(1 second) p.expectNoMsg(1 second)
q.expectNoMsg(1 second) q.expectNoMsg(1 second)
@ -370,6 +313,8 @@ class RemoteWatcherSpec extends AkkaSpec(
within(10 seconds) { within(10 seconds) {
awaitAssert { awaitAssert {
monitorA ! HeartbeatTick monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid))

View file

@ -247,7 +247,7 @@ trait TestKitBase {
val failed = val failed =
try { a; false } catch { try { a; false } catch {
case NonFatal(e) case NonFatal(e)
if (now >= stop) throw e if ((now + t) >= stop) throw e
true true
} }
if (failed) { if (failed) {