Merge pull request #1401 from akka/wip-3265-heartbeating-race2-patriknw
Changed design of RemoteWatcher due to cleanup race, see #3265
This commit is contained in:
commit
236330f3c6
10 changed files with 89 additions and 221 deletions
|
|
@ -65,8 +65,7 @@ private[akka] class ClusterActorRefProvider(
|
|||
failureDetector,
|
||||
heartbeatInterval = WatchHeartBeatInterval,
|
||||
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter,
|
||||
numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher")
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -26,10 +26,9 @@ private[cluster] object ClusterRemoteWatcher {
|
|||
failureDetector: FailureDetectorRegistry[Address],
|
||||
heartbeatInterval: FiniteDuration,
|
||||
unreachableReaperInterval: FiniteDuration,
|
||||
heartbeatExpectedResponseAfter: FiniteDuration,
|
||||
numberOfEndHeartbeatRequests: Int): Props =
|
||||
heartbeatExpectedResponseAfter: FiniteDuration): Props =
|
||||
Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests)
|
||||
heartbeatExpectedResponseAfter)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -47,14 +46,12 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
failureDetector: FailureDetectorRegistry[Address],
|
||||
heartbeatInterval: FiniteDuration,
|
||||
unreachableReaperInterval: FiniteDuration,
|
||||
heartbeatExpectedResponseAfter: FiniteDuration,
|
||||
numberOfEndHeartbeatRequests: Int)
|
||||
heartbeatExpectedResponseAfter: FiniteDuration)
|
||||
extends RemoteWatcher(
|
||||
failureDetector,
|
||||
heartbeatInterval,
|
||||
unreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter,
|
||||
numberOfEndHeartbeatRequests) {
|
||||
heartbeatExpectedResponseAfter) {
|
||||
|
||||
import RemoteWatcher._
|
||||
|
||||
|
|
|
|||
|
|
@ -86,6 +86,8 @@ abstract class RemoteNodeDeathWatchSpec
|
|||
|
||||
override def initialParticipants = roles.size
|
||||
|
||||
muteDeadLetters(Heartbeat.getClass)()
|
||||
|
||||
lazy val remoteWatcher: ActorRef = {
|
||||
system.actorSelection("/system/remote-watcher") ! Identify(None)
|
||||
expectMsgType[ActorIdentity].ref.get
|
||||
|
|
|
|||
|
|
@ -217,11 +217,6 @@ akka {
|
|||
# been received.
|
||||
expected-response-after = 3 s
|
||||
|
||||
# When a node unwatch another node it will end that
|
||||
# with this number of EndHeartbeatRequest messages, which will stop the
|
||||
# heartbeating from the other side
|
||||
nr-of-end-heartbeats = 8
|
||||
|
||||
}
|
||||
|
||||
# After failed to establish an outbound connection, the remoting will mark the
|
||||
|
|
|
|||
|
|
@ -191,8 +191,7 @@ private[akka] class RemoteActorRefProvider(
|
|||
failureDetector,
|
||||
heartbeatInterval = WatchHeartBeatInterval,
|
||||
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter,
|
||||
numberOfEndHeartbeatRequests = WatchNumberOfEndHeartbeatRequests), "remote-watcher")
|
||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
|
||||
}
|
||||
|
||||
protected def createRemoteWatcherFailureDetector(system: ExtendedActorSystem): FailureDetectorRegistry[Address] = {
|
||||
|
|
|
|||
|
|
@ -87,9 +87,6 @@ class RemoteSettings(val config: Config) {
|
|||
val WatchUnreachableReaperInterval: FiniteDuration = {
|
||||
Duration(WatchFailureDetectorConfig.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS)
|
||||
} requiring (_ > Duration.Zero, "watch-failure-detector.unreachable-nodes-reaper-interval must be > 0")
|
||||
val WatchNumberOfEndHeartbeatRequests: Int = {
|
||||
WatchFailureDetectorConfig.getInt("nr-of-end-heartbeats")
|
||||
} requiring (_ > 0, "watch-failure-detector.nr-of-end-heartbeats must be > 0")
|
||||
val WatchHeartbeatExpectedResponseAfter: FiniteDuration = {
|
||||
Duration(WatchFailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS)
|
||||
} requiring (_ > Duration.Zero, "watch-failure-detector.expected-response-after > 0")
|
||||
|
|
|
|||
|
|
@ -28,17 +28,15 @@ private[akka] object RemoteWatcher {
|
|||
failureDetector: FailureDetectorRegistry[Address],
|
||||
heartbeatInterval: FiniteDuration,
|
||||
unreachableReaperInterval: FiniteDuration,
|
||||
heartbeatExpectedResponseAfter: FiniteDuration,
|
||||
numberOfEndHeartbeatRequests: Int): Props =
|
||||
heartbeatExpectedResponseAfter: FiniteDuration): Props =
|
||||
Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter, numberOfEndHeartbeatRequests)
|
||||
heartbeatExpectedResponseAfter)
|
||||
|
||||
case class WatchRemote(watchee: ActorRef, watcher: ActorRef)
|
||||
case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef)
|
||||
|
||||
@SerialVersionUID(1L) case object HeartbeatRequest
|
||||
@SerialVersionUID(1L) case object EndHeartbeatRequest
|
||||
@SerialVersionUID(1L) case class Heartbeat(addressUid: Int)
|
||||
@SerialVersionUID(1L) case object Heartbeat
|
||||
@SerialVersionUID(1L) case class HeartbeatRsp(addressUid: Int)
|
||||
|
||||
// sent to self only
|
||||
case object HeartbeatTick
|
||||
|
|
@ -47,17 +45,17 @@ private[akka] object RemoteWatcher {
|
|||
|
||||
// test purpose
|
||||
object Stats {
|
||||
lazy val empty: Stats = counts(0, 0, 0)
|
||||
def counts(watching: Int, watchingNodes: Int, watchedByNodes: Int): Stats =
|
||||
new Stats(watching, watchingNodes, watchedByNodes)(Set.empty)
|
||||
lazy val empty: Stats = counts(0, 0)
|
||||
def counts(watching: Int, watchingNodes: Int): Stats =
|
||||
new Stats(watching, watchingNodes)(Set.empty)
|
||||
}
|
||||
case class Stats(watching: Int, watchingNodes: Int, watchedByNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) {
|
||||
case class Stats(watching: Int, watchingNodes: Int)(val watchingRefs: Set[(ActorRef, ActorRef)]) {
|
||||
override def toString: String = {
|
||||
def formatWatchingRefs: String =
|
||||
if (watchingRefs.isEmpty) ""
|
||||
else ", watchingRefs=" + watchingRefs.map(x ⇒ x._2.path.name + " -> " + x._1.path.name).mkString("[", ", ", "]")
|
||||
|
||||
s"Stats(watching=${watching}, watchingNodes=${watchingNodes}, watchedByNodes=${watchedByNodes}${formatWatchingRefs})"
|
||||
s"Stats(watching=${watching}, watchingNodes=${watchingNodes}${formatWatchingRefs})"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -70,18 +68,13 @@ private[akka] object RemoteWatcher {
|
|||
* intercepts Watch and Unwatch system messages and sends corresponding
|
||||
* [[RemoteWatcher.WatchRemote]] and [[RemoteWatcher.UnwatchRemote]] to this actor.
|
||||
*
|
||||
* For a new node to be watched this actor starts the monitoring by sending [[RemoteWatcher.HeartbeatRequest]]
|
||||
* to the peer actor on the other node, which then sends periodic [[RemoteWatcher.Heartbeat]]
|
||||
* messages back. The failure detector on the watching side monitors these heartbeat messages.
|
||||
* For a new node to be watched this actor periodically sends [[RemoteWatcher.Heartbeat]]
|
||||
* to the peer actor on the other node, which replies with [[RemoteWatcher.HeartbeatRsp]]
|
||||
* message back. The failure detector on the watching side monitors these heartbeat messages.
|
||||
* If arrival of hearbeat messages stops it will be detected and this actor will publish
|
||||
* [[akka.actor.AddressTerminated]] to the `eventStream`.
|
||||
*
|
||||
* When all actors on a node have been unwatched, or terminated, this actor sends
|
||||
* [[RemoteWatcher.EndHeartbeatRequest]] messages to the peer actor on the other node,
|
||||
* which will then stop sending heartbeat messages.
|
||||
*
|
||||
* The actor sending heartbeat messages will also watch the peer on the other node,
|
||||
* to be able to stop sending heartbeat messages in case of network failure or JVM crash.
|
||||
* When all actors on a node have been unwatched it will stop sending heartbeat messages.
|
||||
*
|
||||
* For bi-directional watch between two nodes the same thing will be established in
|
||||
* both directions, but independent of each other.
|
||||
|
|
@ -91,8 +84,7 @@ private[akka] class RemoteWatcher(
|
|||
failureDetector: FailureDetectorRegistry[Address],
|
||||
heartbeatInterval: FiniteDuration,
|
||||
unreachableReaperInterval: FiniteDuration,
|
||||
heartbeatExpectedResponseAfter: FiniteDuration,
|
||||
numberOfEndHeartbeatRequests: Int)
|
||||
heartbeatExpectedResponseAfter: FiniteDuration)
|
||||
extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
|
||||
import RemoteWatcher._
|
||||
|
|
@ -105,16 +97,13 @@ private[akka] class RemoteWatcher(
|
|||
s"ActorSystem [${context.system}] needs to have a 'RemoteActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]")
|
||||
}
|
||||
|
||||
val selfHeartbeatMsg = Heartbeat(AddressUidExtension(context.system).addressUid)
|
||||
val selfHeartbeatRspMsg = HeartbeatRsp(AddressUidExtension(context.system).addressUid)
|
||||
|
||||
// actors that this node is watching, tuple with (watcher, watchee)
|
||||
var watching: Set[(ActorRef, ActorRef)] = Set.empty
|
||||
// nodes that this node is watching, i.e. expecting hearteats from these nodes
|
||||
var watchingNodes: Set[Address] = Set.empty
|
||||
// heartbeats will be sent to watchedByNodes, ref is RemoteWatcher at other side
|
||||
var watchedByNodes: Set[ActorRef] = Set.empty
|
||||
var unreachable: Set[Address] = Set.empty
|
||||
var endWatchingNodes: Map[Address, Int] = Map.empty
|
||||
var addressUids: Map[Address, Int] = Map.empty
|
||||
|
||||
val heartbeatTask = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
|
||||
|
|
@ -128,14 +117,10 @@ private[akka] class RemoteWatcher(
|
|||
}
|
||||
|
||||
def receive = {
|
||||
case HeartbeatTick ⇒
|
||||
sendHeartbeat()
|
||||
sendHeartbeatRequest()
|
||||
sendEndHeartbeatRequest()
|
||||
case Heartbeat(uid) ⇒ heartbeat(uid)
|
||||
case HeartbeatTick ⇒ sendHeartbeat()
|
||||
case Heartbeat ⇒ receiveHeartbeat()
|
||||
case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid)
|
||||
case ReapUnreachableTick ⇒ reapUnreachable()
|
||||
case HeartbeatRequest ⇒ heartbeatRequest()
|
||||
case EndHeartbeatRequest ⇒ endHeartbeatRequest()
|
||||
case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from)
|
||||
case WatchRemote(watchee, watcher) ⇒ watchRemote(watchee, watcher)
|
||||
case UnwatchRemote(watchee, watcher) ⇒ unwatchRemote(watchee, watcher)
|
||||
|
|
@ -145,17 +130,19 @@ private[akka] class RemoteWatcher(
|
|||
case Stats ⇒
|
||||
sender ! Stats(
|
||||
watching = watching.size,
|
||||
watchingNodes = watchingNodes.size,
|
||||
watchedByNodes = watchedByNodes.size)(watching)
|
||||
watchingNodes = watchingNodes.size)(watching)
|
||||
}
|
||||
|
||||
def heartbeat(uid: Int): Unit = {
|
||||
def receiveHeartbeat(): Unit =
|
||||
sender ! selfHeartbeatRspMsg
|
||||
|
||||
def receiveHeartbeatRsp(uid: Int): Unit = {
|
||||
val from = sender.path.address
|
||||
|
||||
if (failureDetector.isMonitoring(from))
|
||||
log.debug("Received heartbeat from [{}]", from)
|
||||
log.debug("Received heartbeat rsp from [{}]", from)
|
||||
else
|
||||
log.debug("Received first heartbeat from [{}]", from)
|
||||
log.debug("Received first heartbeat rsp from [{}]", from)
|
||||
|
||||
if (watchingNodes(from) && !unreachable(from)) {
|
||||
addressUids += (from -> uid)
|
||||
|
|
@ -163,24 +150,6 @@ private[akka] class RemoteWatcher(
|
|||
}
|
||||
}
|
||||
|
||||
def heartbeatRequest(): Unit = {
|
||||
// request to start sending heartbeats to the node
|
||||
log.debug("Received HeartbeatRequest from [{}]", sender.path.address)
|
||||
watchedByNodes += sender
|
||||
// watch back to stop heartbeating if other side dies
|
||||
context watch sender
|
||||
addWatching(sender, self)
|
||||
}
|
||||
|
||||
def endHeartbeatRequest(): Unit = {
|
||||
// request to stop sending heartbeats to the node
|
||||
log.debug("Received EndHeartbeatRequest from [{}]", sender.path.address)
|
||||
watchedByNodes -= sender
|
||||
context unwatch sender
|
||||
watching -= ((sender, self))
|
||||
checkLastUnwatchOfNode(sender.path.address)
|
||||
}
|
||||
|
||||
def reapUnreachable(): Unit =
|
||||
watchingNodes foreach { a ⇒
|
||||
if (!unreachable(a) && !failureDetector.isAvailable(a)) {
|
||||
|
|
@ -218,7 +187,6 @@ private[akka] class RemoteWatcher(
|
|||
failureDetector.remove(watcheeAddress)
|
||||
}
|
||||
watchingNodes += watcheeAddress
|
||||
endWatchingNodes -= watcheeAddress
|
||||
}
|
||||
|
||||
def unwatchRemote(watchee: ActorRef, watcher: ActorRef): Unit =
|
||||
|
|
@ -242,71 +210,38 @@ private[akka] class RemoteWatcher(
|
|||
}
|
||||
|
||||
def terminated(watchee: ActorRef): Unit = {
|
||||
if (matchingPathElements(self, watchee)) {
|
||||
log.debug("Other side terminated: [{}]", watchee.path)
|
||||
// stop heartbeating to that node immediately, and cleanup
|
||||
watchedByNodes -= watchee
|
||||
watching -= ((watchee, self))
|
||||
} else {
|
||||
log.debug("Watchee terminated: [{}]", watchee.path)
|
||||
watching = watching.filterNot {
|
||||
case (wee, _) ⇒ wee == watchee
|
||||
}
|
||||
log.debug("Watchee terminated: [{}]", watchee.path)
|
||||
watching = watching.filterNot {
|
||||
case (wee, _) ⇒ wee == watchee
|
||||
}
|
||||
checkLastUnwatchOfNode(watchee.path.address)
|
||||
}
|
||||
|
||||
def checkLastUnwatchOfNode(watcheeAddress: Address): Unit = {
|
||||
if (watchingNodes(watcheeAddress) && watching.forall {
|
||||
case (wee, wer) ⇒ wee.path.address != watcheeAddress || (wer == self && matchingPathElements(self, wee))
|
||||
case (wee, wer) ⇒ wee.path.address != watcheeAddress
|
||||
}) {
|
||||
// unwatched last watchee on that node, not counting RemoteWatcher peer
|
||||
// unwatched last watchee on that node
|
||||
log.debug("Unwatched last watchee of node: [{}]", watcheeAddress)
|
||||
watchingNodes -= watcheeAddress
|
||||
addressUids -= watcheeAddress
|
||||
// continue by sending EndHeartbeatRequest for a while
|
||||
endWatchingNodes += (watcheeAddress -> 0)
|
||||
failureDetector.remove(watcheeAddress)
|
||||
}
|
||||
}
|
||||
|
||||
def matchingPathElements(a: ActorRef, b: ActorRef): Boolean =
|
||||
a.path.elements == b.path.elements
|
||||
|
||||
def sendHeartbeat(): Unit =
|
||||
watchedByNodes foreach { ref ⇒
|
||||
val a = ref.path.address
|
||||
watchingNodes foreach { a ⇒
|
||||
if (!unreachable(a)) {
|
||||
log.debug("Sending Heartbeat to [{}]", ref.path.address)
|
||||
ref ! selfHeartbeatMsg
|
||||
}
|
||||
}
|
||||
|
||||
def sendHeartbeatRequest(): Unit =
|
||||
watchingNodes.foreach { a ⇒
|
||||
if (!unreachable(a) && !failureDetector.isMonitoring(a)) {
|
||||
log.debug("Sending HeartbeatRequest to [{}]", a)
|
||||
context.actorSelection(RootActorPath(a) / self.path.elements) ! HeartbeatRequest
|
||||
// schedule the expected heartbeat for later, which will give the
|
||||
// other side a chance to start heartbeating, and also trigger some resends of
|
||||
// the heartbeat request
|
||||
scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a))
|
||||
endWatchingNodes -= a
|
||||
}
|
||||
}
|
||||
|
||||
def sendEndHeartbeatRequest(): Unit =
|
||||
endWatchingNodes.foreach {
|
||||
case (a, count) ⇒
|
||||
if (!unreachable(a)) {
|
||||
log.debug("Sending EndHeartbeatRequest to [{}]", a)
|
||||
context.actorSelection(RootActorPath(a) / self.path.elements) ! EndHeartbeatRequest
|
||||
}
|
||||
if (count == numberOfEndHeartbeatRequests - 1) {
|
||||
endWatchingNodes -= a
|
||||
if (failureDetector.isMonitoring(a)) {
|
||||
log.debug("Sending Heartbeat to [{}]", a)
|
||||
} else {
|
||||
endWatchingNodes += (a -> (count + 1))
|
||||
log.debug("Sending first Heartbeat to [{}]", a)
|
||||
// schedule the expected first heartbeat for later, which will give the
|
||||
// other side a chance to reply, and also trigger some resends if needed
|
||||
scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a))
|
||||
}
|
||||
context.actorSelection(RootActorPath(a) / self.path.elements) ! Heartbeat
|
||||
}
|
||||
}
|
||||
|
||||
def triggerFirstHeartbeat(address: Address): Unit =
|
||||
|
|
|
|||
|
|
@ -51,7 +51,6 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
|
||||
WatchFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName)
|
||||
WatchHeartBeatInterval must be(1 seconds)
|
||||
WatchNumberOfEndHeartbeatRequests must be(8)
|
||||
WatchHeartbeatExpectedResponseAfter must be(3 seconds)
|
||||
WatchUnreachableReaperInterval must be(1 second)
|
||||
WatchFailureDetectorConfig.getDouble("threshold") must be(10.0 plusOrMinus 0.0001)
|
||||
|
|
|
|||
|
|
@ -52,8 +52,7 @@ object RemoteWatcherSpec {
|
|||
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector,
|
||||
heartbeatInterval = TurnOff,
|
||||
unreachableReaperInterval = TurnOff,
|
||||
heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter,
|
||||
numberOfEndHeartbeatRequests = 3) {
|
||||
heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter) {
|
||||
|
||||
def this() = this(heartbeatExpectedResponseAfter = TurnOff)
|
||||
|
||||
|
|
@ -99,7 +98,7 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
shutdown(remoteSystem)
|
||||
}
|
||||
|
||||
val heartbeatMsgB = Heartbeat(remoteAddressUid)
|
||||
val heartbeatRspB = HeartbeatRsp(remoteAddressUid)
|
||||
|
||||
def createRemoteActor(props: Props, name: String): ActorRef = {
|
||||
remoteSystem.actorOf(props, name)
|
||||
|
|
@ -126,32 +125,35 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
monitorA ! Stats
|
||||
// for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2
|
||||
// (a1->b1), (a1->b2), (a2->b2)
|
||||
expectMsg(Stats.counts(watching = 5, watchingNodes = 1, watchedByNodes = 0))
|
||||
expectMsg(Stats.counts(watching = 5, watchingNodes = 1))
|
||||
expectNoMsg(100 millis)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(HeartbeatRequest)
|
||||
expectMsg(Heartbeat)
|
||||
expectNoMsg(100 millis)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(HeartbeatRequest)
|
||||
expectMsg(Heartbeat)
|
||||
expectNoMsg(100 millis)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
expectNoMsg(100 millis)
|
||||
|
||||
monitorA ! UnwatchRemote(b1, a1)
|
||||
// still (a1->b2) and (a2->b2) left
|
||||
monitorA ! Stats
|
||||
expectMsg(Stats.counts(watching = 3, watchingNodes = 1, watchedByNodes = 0))
|
||||
expectMsg(Stats.counts(watching = 3, watchingNodes = 1))
|
||||
expectNoMsg(100 millis)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
expectNoMsg(100 millis)
|
||||
|
||||
monitorA ! UnwatchRemote(b2, a2)
|
||||
// still (a1->b2) left
|
||||
monitorA ! Stats
|
||||
expectMsg(Stats.counts(watching = 2, watchingNodes = 1, watchedByNodes = 0))
|
||||
expectMsg(Stats.counts(watching = 2, watchingNodes = 1))
|
||||
expectNoMsg(100 millis)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
expectNoMsg(100 millis)
|
||||
|
||||
monitorA ! UnwatchRemote(b2, a1)
|
||||
|
|
@ -159,15 +161,7 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
monitorA ! Stats
|
||||
expectMsg(Stats.empty)
|
||||
expectNoMsg(100 millis)
|
||||
// expecting 3 EndHeartbeatRequest
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(EndHeartbeatRequest)
|
||||
expectNoMsg(100 millis)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(EndHeartbeatRequest)
|
||||
expectNoMsg(100 millis)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(EndHeartbeatRequest)
|
||||
expectNoMsg(100 millis)
|
||||
monitorA ! HeartbeatTick
|
||||
expectNoMsg(100 millis)
|
||||
|
|
@ -176,67 +170,6 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
expectNoMsg(2 seconds)
|
||||
}
|
||||
|
||||
"have correct interaction when beeing watched" in {
|
||||
|
||||
val monitorA = system.actorOf(Props(classOf[TestActorProxy], testActor), "monitor2")
|
||||
val monitorB = createRemoteActor(Props[TestRemoteWatcher], "monitor2")
|
||||
|
||||
val b3 = createRemoteActor(Props[MyActor], "b3")
|
||||
|
||||
// watch
|
||||
monitorB.tell(HeartbeatRequest, monitorA)
|
||||
monitorB ! Stats
|
||||
// HeartbeatRequest adds cross watch to RemoteWatcher peer
|
||||
val stats = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1))
|
||||
stats.watchingRefs must be(Set((monitorA, monitorB)))
|
||||
expectNoMsg(100 millis)
|
||||
monitorB ! HeartbeatTick
|
||||
expectMsg(heartbeatMsgB)
|
||||
// HeartbeatRequest for the cross watch
|
||||
expectMsg(HeartbeatRequest)
|
||||
expectNoMsg(100 millis)
|
||||
monitorB ! HeartbeatTick
|
||||
expectMsg(heartbeatMsgB)
|
||||
// HeartbeatRequest for the cross watch
|
||||
expectMsg(HeartbeatRequest)
|
||||
expectNoMsg(100 millis)
|
||||
|
||||
// unwatch
|
||||
monitorB.tell(EndHeartbeatRequest, monitorA)
|
||||
monitorB ! Stats
|
||||
// EndHeartbeatRequest should remove the cross watch
|
||||
expectMsg(Stats.empty)
|
||||
expectNoMsg(100 millis)
|
||||
monitorB ! HeartbeatTick
|
||||
// EndHeartbeatRequest for the cross watch
|
||||
expectMsg(EndHeartbeatRequest)
|
||||
expectNoMsg(100 millis)
|
||||
|
||||
// start heartbeating again
|
||||
monitorB.tell(HeartbeatRequest, monitorA)
|
||||
monitorB ! Stats
|
||||
val stats2 = expectMsg(Stats.counts(watching = 1, watchingNodes = 1, watchedByNodes = 1))
|
||||
stats2.watchingRefs must be(Set((monitorA, monitorB)))
|
||||
expectNoMsg(100 millis)
|
||||
monitorB ! HeartbeatTick
|
||||
expectMsg(heartbeatMsgB)
|
||||
expectMsg(HeartbeatRequest)
|
||||
expectNoMsg(100 millis)
|
||||
|
||||
// then kill other side, which should stop the heartbeating
|
||||
monitorA ! PoisonPill
|
||||
awaitAssert {
|
||||
monitorB ! Stats
|
||||
expectMsg(Stats.empty)
|
||||
}
|
||||
monitorB ! HeartbeatTick
|
||||
// no more heartbeats and no EndHeartbeatRequest for the cross watch
|
||||
expectNoMsg(500 millis)
|
||||
|
||||
// make sure nothing floods over to next test
|
||||
expectNoMsg(2 seconds)
|
||||
}
|
||||
|
||||
"generate AddressTerminated when missing heartbeats" in {
|
||||
val p = TestProbe()
|
||||
val q = TestProbe()
|
||||
|
|
@ -252,14 +185,18 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
monitorA ! WatchRemote(b, a)
|
||||
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(HeartbeatRequest)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
expectMsg(Heartbeat)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
expectNoMsg(1 second)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
|
||||
within(10 seconds) {
|
||||
awaitAssert {
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
// but no HeartbeatRsp
|
||||
monitorA ! ReapUnreachableTick
|
||||
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
|
||||
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
|
||||
|
|
@ -287,12 +224,14 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
monitorA ! WatchRemote(b, a)
|
||||
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(HeartbeatRequest)
|
||||
// no heartbeats sent
|
||||
expectMsg(Heartbeat)
|
||||
// no HeartbeatRsp sent
|
||||
|
||||
within(20 seconds) {
|
||||
awaitAssert {
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
// but no HeartbeatRsp
|
||||
monitorA ! ReapUnreachableTick
|
||||
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
|
||||
// no quarantine when missing first heartbeat, uid unknown
|
||||
|
|
@ -300,14 +239,8 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
}
|
||||
}
|
||||
|
||||
// some more HeartbeatRequest may be sent
|
||||
receiveWhile(1.second) {
|
||||
case HeartbeatRequest ⇒ // ok
|
||||
}
|
||||
|
||||
// make sure nothing floods over to next test
|
||||
expectNoMsg(2 seconds)
|
||||
|
||||
}
|
||||
|
||||
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
|
||||
|
|
@ -325,14 +258,18 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
monitorA ! WatchRemote(b, a)
|
||||
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(HeartbeatRequest)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
expectMsg(Heartbeat)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
expectNoMsg(1 second)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
|
||||
within(10 seconds) {
|
||||
awaitAssert {
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
// but no HeartbeatRsp
|
||||
monitorA ! ReapUnreachableTick
|
||||
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
|
||||
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
|
||||
|
|
@ -353,15 +290,21 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
monitorA ! WatchRemote(c, a)
|
||||
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(HeartbeatRequest)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
expectMsg(Heartbeat)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
expectNoMsg(1 second)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
monitorA ! ReapUnreachableTick
|
||||
p.expectNoMsg(1 second)
|
||||
monitorA.tell(heartbeatMsgB, monitorB)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
monitorA.tell(heartbeatRspB, monitorB)
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
monitorA ! ReapUnreachableTick
|
||||
p.expectNoMsg(1 second)
|
||||
q.expectNoMsg(1 second)
|
||||
|
|
@ -370,6 +313,8 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
within(10 seconds) {
|
||||
awaitAssert {
|
||||
monitorA ! HeartbeatTick
|
||||
expectMsg(Heartbeat)
|
||||
// but no HeartbeatRsp
|
||||
monitorA ! ReapUnreachableTick
|
||||
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address))
|
||||
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid))
|
||||
|
|
|
|||
|
|
@ -252,7 +252,7 @@ trait TestKitBase {
|
|||
val failed =
|
||||
try { a; false } catch {
|
||||
case NonFatal(e) ⇒
|
||||
if (now >= stop) throw e
|
||||
if ((now + t) >= stop) throw e
|
||||
true
|
||||
}
|
||||
if (failed) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue