diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 33616f5812..73cb24e92c 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -36,6 +36,10 @@ akka { # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? unreachable-nodes-reaper-interval = 1s + # A joining node stops sending heartbeats to the node to join if it hasn't become member + # of the cluster within this deadline. + join-timeout = 60s + failure-detector { # defines the failure detector threshold diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index b10962ce11..db5f21607b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -85,11 +85,7 @@ class AccrualFailureDetector( settings.FailureDetectorMaxSampleSize, settings.FailureDetectorAcceptableHeartbeatPause, settings.FailureDetectorMinStdDeviation, - // we use a conservative estimate for the first heartbeat because - // gossip needs to spread back to the joining node before the - // first real heartbeat is sent. Initial heartbeat is added when joining. - // FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed - settings.GossipInterval * 3 + settings.HeartbeatInterval, + settings.HeartbeatInterval, clock) private val log = Logging(system, "FailureDetector") diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index db776ae954..7593245587 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -194,8 +194,8 @@ object MemberStatus { * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. */ case class GossipOverview( - seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock], - unreachable: Set[Member] = Set.empty[Member]) { + seen: Map[Address, VectorClock] = Map.empty, + unreachable: Set[Member] = Set.empty) { override def toString = "GossipOverview(seen = [" + seen.mkString(", ") + @@ -241,7 +241,7 @@ object Gossip { case class Gossip( overview: GossipOverview = GossipOverview(), members: SortedSet[Member], // sorted set of members with their status, sorted by address - meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]], + meta: Map[String, Array[Byte]] = Map.empty, version: VectorClock = VectorClock()) // vector clock version extends ClusterMessage // is a serializable cluster message with Versioned[Gossip] { @@ -461,7 +461,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private case class State( latestGossip: Gossip, - memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener]) + joinInProgress: Map[Address, Deadline] = Map.empty, + memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty) if (!system.provider.isInstanceOf[RemoteActorRefProvider]) throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration") @@ -672,11 +673,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. */ - def join(address: Address): Unit = { - val connection = clusterCommandConnectionFor(address) - val command = ClusterUserAction.Join(selfAddress) - log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection) - connection ! command + @tailrec + final def join(address: Address): Unit = { + val localState = state.get + val newState = localState copy (joinInProgress = localState.joinInProgress + + (address -> (Deadline.now + JoinTimeout))) + if (!state.compareAndSet(localState, newState)) join(address) // recur + else { + val connection = clusterCommandConnectionFor(address) + val command = ClusterUserAction.Join(selfAddress) + log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection) + connection ! command + } } /** @@ -915,7 +923,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) remoteGossip } - val newState = localState copy (latestGossip = winningGossip seen selfAddress) + val newJoinInProgress = + if (localState.joinInProgress.isEmpty) localState.joinInProgress + else localState.joinInProgress -- + winningGossip.members.map(_.address) -- + winningGossip.overview.unreachable.map(_.address) + + val newState = localState copy ( + latestGossip = winningGossip seen selfAddress, + joinInProgress = newJoinInProgress) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update @@ -1025,16 +1041,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * INTERNAL API. */ private[cluster] def heartbeat(): Unit = { + removeOverdueJoinInProgress() val localState = state.get - if (!isSingletonCluster(localState)) { - val liveMembers = localState.latestGossip.members.toIndexedSeq + val beatTo = localState.latestGossip.members.toSeq.map(_.address) ++ localState.joinInProgress.keys - for (member ← liveMembers; if member.address != selfAddress) { - val connection = clusterGossipConnectionFor(member.address) - log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) - connection ! selfHeartbeat - } + for (address ← beatTo; if address != selfAddress) { + val connection = clusterGossipConnectionFor(address) + log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) + connection ! selfHeartbeat } } @@ -1082,6 +1097,23 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + /** + * INTERNAL API. + * + * Removes overdue joinInProgress from State. + */ + @tailrec + final private[cluster] def removeOverdueJoinInProgress(): Unit = { + val localState = state.get + val overdueJoins = localState.joinInProgress collect { + case (address, deadline) if deadline.isOverdue ⇒ address + } + if (overdueJoins.nonEmpty) { + val newState = localState copy (joinInProgress = localState.joinInProgress -- overdueJoins) + if (!state.compareAndSet(localState, newState)) removeOverdueJoinInProgress() // recur + } + } + /** * INTERNAL API. * diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 64ae1c28cb..ba5d2a0b03 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -26,14 +26,15 @@ class ClusterSettings(val config: Config, val systemName: String) { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) } - final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) - final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) - final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) - final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) - final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") - final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") - final val AutoDown = getBoolean("akka.cluster.auto-down") - final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) - final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") + final val PeriodicTasksInitialDelay: Duration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) + final val GossipInterval: Duration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) + final val LeaderActionsInterval: Duration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) + final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) + final val NrOfGossipDaemons: Int = getInt("akka.cluster.nr-of-gossip-daemons") + final val NrOfDeputyNodes: Int = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") + final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) + final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) + final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala new file mode 100644 index 0000000000..256b7d563d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.util.Deadline + +object JoinInProgressMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + # simulate delay in gossip by turning it off + gossip-interval = 300 s + failure-detector { + threshold = 4 + acceptable-heartbeat-pause = 1 second + } + }""") // increase the leader action task interval + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class JoinInProgressMultiJvmNode1 extends JoinInProgressSpec with AccrualFailureDetectorStrategy +class JoinInProgressMultiJvmNode2 extends JoinInProgressSpec with AccrualFailureDetectorStrategy + +abstract class JoinInProgressSpec + extends MultiNodeSpec(JoinInProgressMultiJvmSpec) + with MultiNodeClusterSpec { + + import JoinInProgressMultiJvmSpec._ + + "A cluster node" must { + "send heartbeats immediately when joining to avoid false failure detection due to delayed gossip" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + enterBarrier("first-started") + + runOn(second) { + cluster.join(first) + } + + runOn(first) { + val until = Deadline.now + 5.seconds + while (!until.isOverdue) { + 200.millis.sleep + cluster.failureDetector.isAvailable(second) must be(true) + } + } + + enterBarrier("after") + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 58f0683c25..e8d68303a0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -27,6 +27,7 @@ class ClusterConfigSpec extends AkkaSpec { HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) + JoinTimeout must be(60 seconds) NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) AutoDown must be(true)