From 37826533d3a7466141df0c58a855ad386ad6f237 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 5 Jul 2012 09:50:58 +0200 Subject: [PATCH] Fix race in UnreachableNodeRejoinsClusterSpec --- .../main/scala/akka/cluster/AccrualFailureDetector.scala | 5 +++-- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 9 +++++---- .../akka/cluster/UnreachableNodeRejoinsClusterSpec.scala | 5 +++++ 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index f1c761dec7..76c9595a59 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -167,8 +167,9 @@ class AccrualFailureDetector( val φ = phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation) // FIXME change to debug log level, when failure detector is stable - if (φ > 1.0) log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]", - φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")") + if (φ > 1.0 && timeDiff < (acceptableHeartbeatPauseMillis + 5000)) + log.info("Phi value [{}] for connection [{}], after [{} ms], based on [{}]", + φ, connection, timeDiff, "N(" + mean + ", " + stdDeviation + ")") φ } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 43cb115b4d..86caf231c3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -569,7 +569,6 @@ private[cluster] final class ClusterHeartbeatSenderWorker( def receive = { case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ - log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) if (!deadline.isOverdue) { // the CircuitBreaker will measure elapsed time and open if too many long calls try breaker.withSyncCircuitBreaker { @@ -977,8 +976,8 @@ private[cluster] final class ClusterCore(cluster: Cluster) extends Actor with Ac joinInProgress = newJoinInProgress // for all new joining nodes we remove them from the failure detector - (latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { - case node ⇒ cluster.failureDetector.remove(node.address) + (latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { node ⇒ + cluster.failureDetector.remove(node.address) } log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) @@ -1250,7 +1249,9 @@ private[cluster] final class ClusterCore(cluster: Cluster) extends Actor with Ac val localMembers = localGossip.members val localUnreachableMembers = localGossip.overview.unreachable - val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ cluster.failureDetector.isAvailable(member.address) } + val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ + member.address == selfAddress || cluster.failureDetector.isAvailable(member.address) + } if (newlyDetectedUnreachableMembers.nonEmpty) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 14f48bfbab..65a36080ff 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -58,6 +58,11 @@ abstract class UnreachableNodeRejoinsClusterSpec } "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in { + // let them send at least one heartbeat to each other after the gossip convergence + // because for new joining nodes we remove them from the failure detector when + // receive gossip + 2.seconds.dilated.sleep + runOn(first) { // pull network for victim node from all nodes allBut(victim).foreach { roleName ⇒