From e2551494c41eef95a916b38897bb6ad09aef89a0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Jun 2012 14:59:34 +0200 Subject: [PATCH 01/19] Use Use separate heartbeats for FailureDetector, see #2214 * Send Heartbeat message to all members at regular interval * Removed the need to gossip to myself --- .../src/main/resources/reference.conf | 3 + .../akka/cluster/AccrualFailureDetector.scala | 6 +- .../src/main/scala/akka/cluster/Cluster.scala | 76 +++++++++++++------ .../scala/akka/cluster/ClusterSettings.scala | 1 + .../akka/cluster/MultiNodeClusterSpec.scala | 1 + .../scala/akka/cluster/SunnyWeatherSpec.scala | 2 +- .../akka/cluster/ClusterConfigSpec.scala | 1 + .../test/scala/akka/cluster/ClusterSpec.scala | 8 +- 8 files changed, 65 insertions(+), 33 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 8c905d5b29..1e7c0e4c08 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -27,6 +27,9 @@ akka { # how often should the node send out gossip information? gossip-interval = 1s + # how often should the node send out heartbeats? + heartbeat-interval = 1s + # how often should the leader perform maintenance tasks? leader-actions-interval = 1s diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c7aaf12fcf..76c773f759 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -66,7 +66,8 @@ class AccrualFailureDetector( */ @tailrec final def heartbeat(connection: Address) { - log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection) + // FIXME change to debug log level, when failure detector is stable + log.info("Node [{}] - Heartbeat from connection [{}] ", address, connection) val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -155,7 +156,8 @@ class AccrualFailureDetector( else PhiFactor * timestampDiff / mean } - log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) + // FIXME change to debug log level, when failure detector is stable + log.info("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) phi } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 4ea43d50e4..df8f2ec89b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -138,7 +138,7 @@ object Member { /** * Envelope adding a sender address to the gossip. */ -case class GossipEnvelope(sender: Member, gossip: Gossip) extends ClusterMessage +case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage /** * Defines the current status of a cluster member node @@ -244,6 +244,8 @@ case class Gossip( ")" } +case class Heartbeat(from: Address) + /** * Manages routing of the different cluster commands. * Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message. @@ -272,7 +274,8 @@ private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor { val log = Logging(context.system, this) def receive = { - case GossipEnvelope(sender, gossip) ⇒ cluster.receive(sender, gossip) + case Heartbeat(from) ⇒ cluster.receiveHeartbeat(from) + case GossipEnvelope(from, gossip) ⇒ cluster.receiveGossip(from, gossip) } override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown) @@ -388,7 +391,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ log.info("Cluster Node [{}] - is starting up...", selfAddress) - // create superisor for daemons under path "/system/cluster" + // create supervisor for daemons under path "/system/cluster" private val clusterDaemons = { val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster") Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match { @@ -399,8 +402,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val state = { val member = Member(selfAddress, MemberStatus.Joining) - val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock - new AtomicReference[State](State(gossip)) + val versionedGossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock + val seenVersionedGossip = versionedGossip seen selfAddress + new AtomicReference[State](State(seenVersionedGossip)) } // try to join the node defined in the 'akka.cluster.node-to-join' option @@ -415,6 +419,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ gossip() } + // start periodic heartbeat to all nodes in cluster + private val heartbeatCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, HeartbeatInterval) { + heartbeat() + } + // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { reapUnreachableMembers() @@ -491,6 +500,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) gossipCanceller.cancel() + heartbeatCanceller.cancel() failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() system.stop(clusterDaemons) @@ -588,6 +598,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { + // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens if (node != selfAddress) failureDetector heartbeat node notifyMembershipChangeListeners(localState, newState) } @@ -615,7 +626,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update else { - if (address != selfAddress) failureDetector heartbeat address // update heartbeat in failure detector notifyMembershipChangeListeners(localState, newState) } } @@ -708,7 +718,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Receive new gossip. */ @tailrec - final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = { + final private[cluster] def receiveGossip(from: Address, remoteGossip: Gossip): Unit = { val localState = state.get val localGossip = localState.latestGossip @@ -718,8 +728,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip + vclockNode - log.debug( - "Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]", + // FIXME change to debug log level, when failure detector is stable + log.info( + """Can't establish a causal relationship between "remote" gossip [{}] and "local" gossip [{}] - merging them into [{}]""", remoteGossip, localGossip, versionedMergedGossip) versionedMergedGossip @@ -736,15 +747,20 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newState = localState copy (latestGossip = winningGossip seen selfAddress) // if we won the race then update else try again - if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update + if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address) - - if (sender.address != selfAddress) failureDetector heartbeat sender.address + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) notifyMembershipChangeListeners(localState, newState) } } + /** + * INTERNAL API + */ + private[cluster] def receiveHeartbeat(from: Address): Unit = { + failureDetector heartbeat from + } + /** * Joins the pre-configured contact point. */ @@ -769,14 +785,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newSelf = localSelf copy (status = newStatus) // change my state in 'gossip.members' - val newMembersSet = localMembers map { member ⇒ + val newMembers = localMembers map { member ⇒ if (member.address == selfAddress) newSelf else member } - // ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile) - val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*) - val newGossip = localGossip copy (members = newMembersSortedSet) + val newGossip = localGossip copy (members = newMembers) // version my changes val versionedGossip = newGossip + vclockNode @@ -793,7 +807,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private[akka] def gossipTo(address: Address): Unit = { val connection = clusterGossipConnectionFor(address) log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection) - connection ! GossipEnvelope(self, latestGossip) + connection ! GossipEnvelope(selfAddress, latestGossip) } /** @@ -840,12 +854,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) - if (isSingletonCluster(localState)) { - // gossip to myself - // TODO could perhaps be optimized, no need to gossip to myself when Up? - gossipTo(selfAddress) - - } else if (isAvailable(localState)) { + if (!isSingletonCluster(localState) && isAvailable(localState)) { val localGossip = localState.latestGossip // important to not accidentally use `map` of the SortedSet, since the original order is not preserved val localMembers = localGossip.members.toIndexedSeq @@ -876,6 +885,25 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } } + /** + * INTERNAL API + */ + private[akka] def heartbeat(): Unit = { + val localState = state.get + + if (!isSingletonCluster(localState)) { + val liveMembers = localState.latestGossip.members.toIndexedSeq + val unreachableMembers = localState.latestGossip.overview.unreachable + + // FIXME use unreachable? + for (member ← (liveMembers ++ unreachableMembers); if member.address != selfAddress) { + val connection = clusterGossipConnectionFor(member.address) + log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) + connection ! Heartbeat(selfAddress) + } + } + } + /** * INTERNAL API * diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 0e7dac06ab..90831db2e6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -21,6 +21,7 @@ class ClusterSettings(val config: Config, val systemName: String) { } val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index b185067ab0..729923699d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -17,6 +17,7 @@ object MultiNodeClusterSpec { akka.cluster { auto-down = off gossip-interval = 200 ms + heartbeat-interval = 400 ms leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index e36980d859..fcb1393f8a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -24,7 +24,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { gossip-interval = 400 ms nr-of-deputy-nodes = 0 } - akka.loglevel = DEBUG + akka.loglevel = INFO """)) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 6b2ff1962c..3a96451466 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -21,6 +21,7 @@ class ClusterConfigSpec extends AkkaSpec { NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) + HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index d3d1d6d0a2..9b1a9706af 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -99,15 +99,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "A Cluster" must { - "initially be singleton cluster and reach convergence after first gossip" in { + "initially be singleton cluster and reach convergence immediately" in { cluster.isSingletonCluster must be(true) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) - cluster.convergence.isDefined must be(false) - cluster.gossip() - expectMsg(GossipTo(selfAddress)) - awaitCond(cluster.convergence.isDefined) - memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) + cluster.convergence.isDefined must be(true) cluster.leaderActions() memberStatus(selfAddress) must be(Some(MemberStatus.Up)) } From d957c686390b9799e568bbc5a0c5b17621a3d87b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Jun 2012 21:12:57 +0200 Subject: [PATCH 02/19] Incorporate feedback from review, see #2214 --- .../src/main/scala/akka/cluster/Cluster.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index df8f2ec89b..b463c3b0ea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -14,18 +14,14 @@ import akka.pattern.ask import akka.util._ import akka.util.duration._ import akka.ConfigurationException - import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeoutException import akka.jsr166y.ThreadLocalRandom - import java.lang.management.ManagementFactory import javax.management._ - import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec - import com.google.protobuf.ByteString /** @@ -44,6 +40,8 @@ trait MetaDataChangeListener { /** * Base trait for all cluster messages. All ClusterMessage's are serializable. + * + * FIXME Protobuf all ClusterMessages */ sealed trait ClusterMessage extends Serializable @@ -82,6 +80,7 @@ object ClusterAction { /** * Represents the address and the current status of a cluster member node. + * */ class Member(val address: Address, val status: MemberStatus) extends ClusterMessage { override def hashCode = address.## @@ -175,6 +174,10 @@ case class GossipOverview( "])" } +object Gossip { + val emptyMembers: SortedSet[Member] = SortedSet.empty +} + /** * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock. */ @@ -219,7 +222,7 @@ case class Gossip( // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups val mergedMembers = - SortedSet.empty[Member] ++ + Gossip.emptyMembers ++ membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) } @@ -244,7 +247,10 @@ case class Gossip( ")" } -case class Heartbeat(from: Address) +/** + * Sent at regular intervals for failure detection. + */ +case class Heartbeat(from: Address) extends ClusterMessage /** * Manages routing of the different cluster commands. @@ -372,6 +378,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ import clusterSettings._ val selfAddress = remote.transport.address + private val selfHeartbeat = Heartbeat(selfAddress) + val failureDetector = new AccrualFailureDetector( system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize) @@ -402,7 +410,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val state = { val member = Member(selfAddress, MemberStatus.Joining) - val versionedGossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock + val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock val seenVersionedGossip = versionedGossip seen selfAddress new AtomicReference[State](State(seenVersionedGossip)) } @@ -757,9 +765,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * INTERNAL API */ - private[cluster] def receiveHeartbeat(from: Address): Unit = { - failureDetector heartbeat from - } + private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from /** * Joins the pre-configured contact point. @@ -785,10 +791,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newSelf = localSelf copy (status = newStatus) // change my state in 'gossip.members' - val newMembers = localMembers map { member ⇒ - if (member.address == selfAddress) newSelf - else member - } + val newMembers = localMembers map { member ⇒ if (member.address == selfAddress) newSelf else member } val newGossip = localGossip copy (members = newMembers) @@ -893,13 +896,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!isSingletonCluster(localState)) { val liveMembers = localState.latestGossip.members.toIndexedSeq - val unreachableMembers = localState.latestGossip.overview.unreachable - // FIXME use unreachable? - for (member ← (liveMembers ++ unreachableMembers); if member.address != selfAddress) { + for (member ← liveMembers; if member.address != selfAddress) { val connection = clusterGossipConnectionFor(member.address) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) - connection ! Heartbeat(selfAddress) + connection ! selfHeartbeat } } } From 34c9e49ee0915318e4f36c7ce33248c3072ab50c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Jun 2012 22:12:45 +0200 Subject: [PATCH 03/19] Schedule cluster tasks with more accurate, see #2114 * Use scheduler with more accurate settings * New FixedRateTask that compensates for inaccuracy --- .../src/main/scala/akka/cluster/Cluster.scala | 29 ++++++---- .../scala/akka/cluster/FixedRateTask.scala | 54 +++++++++++++++++++ .../akka/cluster/FixedRateTaskSpec.scala | 36 +++++++++++++ 3 files changed, 110 insertions(+), 9 deletions(-) create mode 100644 akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b463c3b0ea..8be6b21d25 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -23,6 +23,8 @@ import javax.management._ import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec import com.google.protobuf.ByteString +import akka.util.internal.HashedWheelTimer +import akka.dispatch.MonitorableThreadFactory /** * Interface for membership change listener. @@ -422,28 +424,35 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // ===================== WORK DAEMONS ===================== // ======================================================== + private def hwt = new HashedWheelTimer(log, + MonitorableThreadFactory(system.name + "-cluster-scheduler", system.settings.Daemonicity, None), 50.millis, + system.settings.SchedulerTicksPerWheel) + private val clusterScheduler = new DefaultScheduler(hwt, log, system.dispatcher) + // start periodic gossip to random nodes in cluster - private val gossipCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, GossipInterval) { + private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) { gossip() } // start periodic heartbeat to all nodes in cluster - private val heartbeatCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, HeartbeatInterval) { + private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) { heartbeat() } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { + private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { reapUnreachableMembers() } // start periodic leader action management (only applies for the current leader) - private val leaderActionsCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, LeaderActionsInterval) { + private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) { leaderActions() } createMBean() + system.registerOnTermination(shutdown()) + log.info("Cluster Node [{}] - has started up successfully", selfAddress) // ====================================================== @@ -507,11 +516,13 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) - gossipCanceller.cancel() - heartbeatCanceller.cancel() - failureDetectorReaperCanceller.cancel() - leaderActionsCanceller.cancel() - system.stop(clusterDaemons) + gossipTask.cancel() + heartbeatTask.cancel() + failureDetectorReaperTask.cancel() + leaderActionsTask.cancel() + clusterScheduler.close() + if (!clusterDaemons.isTerminated) + system.stop(clusterDaemons) try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala new file mode 100644 index 0000000000..0f594316d9 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong + +import akka.actor.Scheduler +import akka.util.Duration + +/** + * INTERNAL API + */ +private[akka] object FixedRateTask { + def apply(scheduler: Scheduler, initalDelay: Duration, delay: Duration)(f: ⇒ Unit): FixedRateTask = { + new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f }) + } +} + +/** + * INTERNAL API + * + * Task to be scheduled periodically at a fixed rate, compensating, on average, + * for inaccuracy in scheduler. It will start when constructed, using the + * initialDelay. + */ +private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, delay: Duration, task: Runnable) extends Runnable { + + private val delayMillis = delay.toMillis + private val minDelayMillis = 1L + private val cancelled = new AtomicBoolean(false) + private val counter = new AtomicLong(0L) + private val startTime = System.currentTimeMillis + initalDelay.toMillis + scheduler.scheduleOnce(initalDelay, this) + + def cancel(): Unit = cancelled.set(true) + + override final def run(): Unit = if (!cancelled.get) try { + task.run() + } finally if (!cancelled.get) { + val nextTime = startTime + delayMillis * counter.incrementAndGet + val nextDelayMillis = nextTime - System.currentTimeMillis + val nextDelay = Duration( + (if (nextDelayMillis <= minDelayMillis) minDelayMillis else nextDelayMillis), + TimeUnit.MILLISECONDS) + try { + scheduler.scheduleOnce(nextDelay, this) + } catch { case e: IllegalStateException ⇒ /* will happen when scheduler is closed, nothing wrong */ } + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala new file mode 100644 index 0000000000..3efa3ab3ab --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import java.util.concurrent.atomic.AtomicInteger +import akka.testkit.AkkaSpec +import akka.util.duration._ +import akka.testkit.TimingTest + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FixedRateTaskSpec extends AkkaSpec { + + "Task scheduled at fixed rate" must { + "adjust for scheduler inaccuracy" taggedAs TimingTest in { + val counter = new AtomicInteger + FixedRateTask(system.scheduler, 150.millis, 150.millis) { + counter.incrementAndGet() + } + 5000.millis.sleep() + counter.get must (be(33) or be(34)) + } + + "compensate for long running task" taggedAs TimingTest in { + val counter = new AtomicInteger + FixedRateTask(system.scheduler, 225.millis, 225.millis) { + counter.incrementAndGet() + 80.millis.sleep() + } + 5000.millis.sleep() + counter.get must (be(22) or be(23)) + } + } +} + From 649b9d51816f3be5c035178ca6daee01cbbfd0af Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Jun 2012 22:53:45 +0200 Subject: [PATCH 04/19] Switching to Mr Pink's Java6 detector --- project/scripts/release | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/scripts/release b/project/scripts/release index 13795b3d53..9e418317bd 100755 --- a/project/scripts/release +++ b/project/scripts/release @@ -93,7 +93,7 @@ fi declare -r version=$1 declare -r publish_path="${release_server}:${release_path}" -[[ `java -version 2>&1 | grep "java version" | awk '{print $3}' | tr -d \" | awk '{split($0, array, ".")} END{print array[2]}'` -eq 6 ]] || fail "Java version is not 1.6" +[[ `java -version 2>&1 | head -1 | cut -d ' ' -f3 | cut -d '.' -f2` -eq 6 ]] || fail "Java version is not 1.6" # check for a git command type -P git &> /dev/null || fail "git command not found" From b27bae655404f9635b206b3a194008a3a7f3f221 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Jun 2012 13:34:59 +0200 Subject: [PATCH 05/19] Use dedicated cluster scheduler only when default scheduler resolution isn't good enough, see #2214 * Config properties for scheduler * Commented shutdown considerations --- .../src/main/resources/reference.conf | 9 ++++++ .../src/main/scala/akka/cluster/Cluster.scala | 30 +++++++++++++++---- .../scala/akka/cluster/ClusterSettings.scala | 26 ++++++++-------- .../akka/cluster/ClusterConfigSpec.scala | 2 ++ 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 7fb930eaef..b9104fe6cf 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -49,5 +49,14 @@ akka { max-sample-size = 1000 } + + # If the tick-duration of the default scheduler is longer than the tick-duration + # configured here a dedicated scheduler will be used for periodic tasks of the cluster, + # otherwise the default scheduler is used. + # See akka.scheduler settings for more details about the HashedWheelTimer. + scheduler { + tick-duration = 33ms + ticks-per-wheel = 512 + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 46c6919cc1..dda05bf6b0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeoutException import akka.jsr166y.ThreadLocalRandom import java.lang.management.ManagementFactory +import java.io.Closeable import javax.management._ import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec @@ -435,10 +436,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== WORK DAEMONS ===================== // ======================================================== - private def hwt = new HashedWheelTimer(log, - MonitorableThreadFactory(system.name + "-cluster-scheduler", system.settings.Daemonicity, None), 50.millis, - system.settings.SchedulerTicksPerWheel) - private val clusterScheduler = new DefaultScheduler(hwt, log, system.dispatcher) + private def useDedicatedScheduler: Boolean = system.settings.SchedulerTickDuration > SchedulerTickDuration + + private val clusterScheduler: Scheduler = { + if (useDedicatedScheduler) { + val threadFactory = system.threadFactory match { + case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler") + case tf ⇒ tf + } + val hwt = new HashedWheelTimer(log, + threadFactory, + SchedulerTickDuration, SchedulerTicksPerWheel) + new DefaultScheduler(hwt, log, system.dispatcher) + } else + system.scheduler + } // start periodic gossip to random nodes in cluster private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) { @@ -527,13 +539,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) + + // cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown gossipTask.cancel() heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() - clusterScheduler.close() + if (useDedicatedScheduler) clusterScheduler match { + case x: Closeable ⇒ x.close() + case _ ⇒ + } + // FIXME isTerminated check can be removed when ticket #2221 is fixed + // now it prevents logging if system is shutdown (or in progress of shutdown) if (!clusterDaemons.isTerminated) system.stop(clusterDaemons) + try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 9a17f2a0eb..ee4f6a03d2 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -13,22 +13,24 @@ import akka.actor.AddressFromURIString class ClusterSettings(val config: Config, val systemName: String) { import config._ - val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") - val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { + final val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") + final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") + final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { case "" ⇒ None case fqcn ⇒ Some(fqcn) } - val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { + final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) } - val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) - val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) - val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) - val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) - val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") - val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") - val AutoDown = getBoolean("akka.cluster.auto-down") + final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) + final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) + final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) + final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) + final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") + final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoDown = getBoolean("akka.cluster.auto-down") + final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) + final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 6c9023d410..481d9f7e5a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -28,6 +28,8 @@ class ClusterConfigSpec extends AkkaSpec { NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) AutoDown must be(true) + SchedulerTickDuration must be(33 millis) + SchedulerTicksPerWheel must be(512) } } } From 7b6ae2f5c91e266a2202c192d00f3f1baeb8b22d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Jun 2012 13:37:21 +0200 Subject: [PATCH 06/19] Use nanoTime in FixedRateTask, see #2214 * Rewrote test to use latch and assert rate instead --- .../scala/akka/cluster/FixedRateTask.scala | 13 ++++------ .../akka/cluster/FixedRateTaskSpec.scala | 25 ++++++++++++------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala index 0f594316d9..25ef058465 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala @@ -29,11 +29,10 @@ private[akka] object FixedRateTask { */ private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, delay: Duration, task: Runnable) extends Runnable { - private val delayMillis = delay.toMillis - private val minDelayMillis = 1L + private val delayNanos = delay.toNanos private val cancelled = new AtomicBoolean(false) private val counter = new AtomicLong(0L) - private val startTime = System.currentTimeMillis + initalDelay.toMillis + private val startTime = System.nanoTime + initalDelay.toNanos scheduler.scheduleOnce(initalDelay, this) def cancel(): Unit = cancelled.set(true) @@ -41,11 +40,9 @@ private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, d override final def run(): Unit = if (!cancelled.get) try { task.run() } finally if (!cancelled.get) { - val nextTime = startTime + delayMillis * counter.incrementAndGet - val nextDelayMillis = nextTime - System.currentTimeMillis - val nextDelay = Duration( - (if (nextDelayMillis <= minDelayMillis) minDelayMillis else nextDelayMillis), - TimeUnit.MILLISECONDS) + val nextTime = startTime + delayNanos * counter.incrementAndGet + // it's ok to schedule with negative duration, will run asap + val nextDelay = Duration(nextTime - System.nanoTime, TimeUnit.NANOSECONDS) try { scheduler.scheduleOnce(nextDelay, this) } catch { case e: IllegalStateException ⇒ /* will happen when scheduler is closed, nothing wrong */ } diff --git a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala index 3efa3ab3ab..d259a5310b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala @@ -4,32 +4,39 @@ package akka.cluster -import java.util.concurrent.atomic.AtomicInteger import akka.testkit.AkkaSpec import akka.util.duration._ import akka.testkit.TimingTest +import akka.testkit.TestLatch +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FixedRateTaskSpec extends AkkaSpec { "Task scheduled at fixed rate" must { "adjust for scheduler inaccuracy" taggedAs TimingTest in { - val counter = new AtomicInteger + val startTime = System.nanoTime + val n = 33 + val latch = new TestLatch(n) FixedRateTask(system.scheduler, 150.millis, 150.millis) { - counter.incrementAndGet() + latch.countDown() } - 5000.millis.sleep() - counter.get must (be(33) or be(34)) + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(6.66 plusOrMinus (0.4)) } "compensate for long running task" taggedAs TimingTest in { - val counter = new AtomicInteger + val startTime = System.nanoTime + val n = 22 + val latch = new TestLatch(n) FixedRateTask(system.scheduler, 225.millis, 225.millis) { - counter.incrementAndGet() 80.millis.sleep() + latch.countDown() } - 5000.millis.sleep() - counter.get must (be(22) or be(23)) + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(4.4 plusOrMinus (0.3)) } } } From 40d9b27e735a092997391a0685ce1e790bd48ab2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Jun 2012 14:16:30 +0200 Subject: [PATCH 07/19] Info log about dedicated scheduler, and refactoring, see #2214 * Refactoring with wrapping of Scheduler according to @viktorklang's wish --- .../src/main/scala/akka/cluster/Cluster.scala | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index dda05bf6b0..571a8eaf68 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -436,10 +436,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== WORK DAEMONS ===================== // ======================================================== - private def useDedicatedScheduler: Boolean = system.settings.SchedulerTickDuration > SchedulerTickDuration - - private val clusterScheduler: Scheduler = { - if (useDedicatedScheduler) { + private val clusterScheduler: Scheduler with Closeable = { + if (system.settings.SchedulerTickDuration > SchedulerTickDuration) { + log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", + system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis) val threadFactory = system.threadFactory match { case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler") case tf ⇒ tf @@ -448,8 +449,26 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) threadFactory, SchedulerTickDuration, SchedulerTicksPerWheel) new DefaultScheduler(hwt, log, system.dispatcher) - } else - system.scheduler + } else { + // delegate to system.scheduler, but don't close + val systemScheduler = system.scheduler + new Scheduler with Closeable { + // we are using system.scheduler, which we are not responsible for closing + def close(): Unit = () + def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable = + systemScheduler.schedule(initialDelay, frequency, receiver, message) + def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable = + systemScheduler.schedule(initialDelay, frequency)(f) + def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable = + systemScheduler.schedule(initialDelay, frequency, runnable) + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = + systemScheduler.scheduleOnce(delay, runnable) + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = + systemScheduler.scheduleOnce(delay, receiver, message) + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = + systemScheduler.scheduleOnce(delay)(f) + } + } } // start periodic gossip to random nodes in cluster @@ -545,10 +564,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() - if (useDedicatedScheduler) clusterScheduler match { - case x: Closeable ⇒ x.close() - case _ ⇒ - } + clusterScheduler.close() + // FIXME isTerminated check can be removed when ticket #2221 is fixed // now it prevents logging if system is shutdown (or in progress of shutdown) if (!clusterDaemons.isTerminated) From 13f3cddbfb86633f8b2abd90233a345255a546ae Mon Sep 17 00:00:00 2001 From: Dale Date: Tue, 12 Jun 2012 16:16:25 +0300 Subject: [PATCH 08/19] Minor markup fix. --- akka-docs/java/typed-actors.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index 6ad870b309..4d36872f1a 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -130,7 +130,7 @@ if needed. It will return ``None`` if a timeout occurs. .. includecode:: code/docs/actor/TypedActorDocTestBase.java :include: typed-actor-call-strict -This will block for as long as the timeout that was set in the ``Props` of the Typed Actor, +This will block for as long as the timeout that was set in the ``Props`` of the Typed Actor, if needed. It will throw a ``java.util.concurrent.TimeoutException`` if a timeout occurs. Request-reply-with-future message send From 8d12385a3edb1b6eedc4ad756296fd542cc6743e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 12 Jun 2012 15:48:23 +0200 Subject: [PATCH 09/19] Prolonging wait time for TypedActorSpec to avoid problems on slower machines --- akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 502712872a..b7a5a8f64b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -307,7 +307,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) "be able to call methods returning Scala Options" in { val t = newFooBar(Duration(500, "ms")) t.optionPigdog(200).get must be("Pigdog") - t.optionPigdog(700) must be(None) + t.optionPigdog(1000) must be(None) mustStop(t) } From de1ad302172ff82b60b7cfbdefa2f6c5d295b811 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Jun 2012 16:07:20 +0200 Subject: [PATCH 10/19] Fix false convergence when singleton cluster, see #2222 * All members must be in seen table for convergence * Added extra debug logging due to convergence issues * Enabled test of convergence for node joining singleton cluster --- .../src/main/scala/akka/cluster/Cluster.scala | 34 +++++++++++++------ .../MembershipChangeListenerJoinSpec.scala | 7 ++-- .../test/scala/akka/cluster/ClusterSpec.scala | 3 +- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 571a8eaf68..c090995e4c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1138,24 +1138,38 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private def convergence(gossip: Gossip): Option[Gossip] = { val overview = gossip.overview val unreachable = overview.unreachable + val seen = overview.seen // First check that: - // 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or + // 1. we don't have any members that are unreachable, or // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence // When that is done we check that all the entries in the 'seen' table have the same vector clock version - if (unreachable.isEmpty || !unreachable.exists { m ⇒ - m.status != MemberStatus.Down && - m.status != MemberStatus.Removed - }) { - val seen = gossip.overview.seen - val views = Set.empty[VectorClock] ++ seen.values + // and that all members exists in seen table + val hasUnreachable = unreachable.nonEmpty && unreachable.exists { m ⇒ + m.status != MemberStatus.Down && m.status != MemberStatus.Removed + } + val allMembersInSeen = gossip.members.forall(m ⇒ seen.contains(m.address)) - if (views.size == 1) { + if (hasUnreachable) { + log.debug("Cluster Node [{}] - No cluster convergence, due to unreachable [{}].", selfAddress, unreachable) + None + } else if (!allMembersInSeen) { + log.debug("Cluster Node [{}] - No cluster convergence, due to members not in seen table [{}].", selfAddress, + gossip.members.map(_.address) -- seen.keySet) + None + } else { + + val views = (Set.empty[VectorClock] ++ seen.values).size + + if (views == 1) { log.debug("Cluster Node [{}] - Cluster convergence reached: [{}]", selfAddress, gossip.members.mkString(", ")) Some(gossip) - } else None - } else None + } else { + log.debug("Cluster Node [{}] - No cluster convergence, due to [{}] different views.", selfAddress, views) + None + } + } } private def isAvailable(state: State): Boolean = !isUnavailable(state) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 2809ae820b..1b296c58f1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -18,7 +18,7 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfig))) } class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy @@ -40,14 +40,13 @@ abstract class MembershipChangeListenerJoinSpec val joinLatch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) // second node is not part of node ring anymore + if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) joinLatch.countDown() } }) testConductor.enter("registered-listener") joinLatch.await - cluster.convergence.isDefined must be(true) } runOn(second) { @@ -55,6 +54,8 @@ abstract class MembershipChangeListenerJoinSpec cluster.join(firstAddress) } + awaitUpConvergence(2) + testConductor.enter("after") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 112da9d0c0..03f6460ea1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -110,8 +110,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { cluster.joining(addresses(1)) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1))) memberStatus(addresses(1)) must be(Some(MemberStatus.Joining)) - // FIXME why is it still convergence immediately after joining? - //cluster.convergence.isDefined must be(false) + cluster.convergence.isDefined must be(false) } "accept a few more joining nodes" in { From 92cab53b1e3b635f0e28ea96d82316d33710188b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Jun 2012 16:15:05 +0200 Subject: [PATCH 11/19] Rename + operator of VectorClock and Versioned to :+ * + is kind of reserved for string concatination --- .../src/main/scala/akka/cluster/Cluster.scala | 20 +-- .../main/scala/akka/cluster/VectorClock.scala | 4 +- .../scala/akka/cluster/VectorClockSpec.scala | 166 +++++++++--------- 3 files changed, 95 insertions(+), 95 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c090995e4c..5bc968920a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -195,9 +195,9 @@ case class Gossip( /** * Increments the version for this 'Node'. */ - def +(node: VectorClock.Node): Gossip = copy(version = version + node) + def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node) - def +(member: Member): Gossip = { + def :+(member: Member): Gossip = { if (members contains member) this else this copy (members = members + member) } @@ -424,7 +424,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private val state = { val member = Member(selfAddress, MemberStatus.Joining) - val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock + val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock val seenVersionedGossip = versionedGossip seen selfAddress new AtomicReference[State](State(seenVersionedGossip)) } @@ -658,7 +658,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining val newGossip = localGossip copy (overview = newOverview, members = newMembers) - val versionedGossip = newGossip + vclockNode + val versionedGossip = newGossip :+ vclockNode val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) @@ -686,7 +686,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING val newGossip = localGossip copy (members = newMembers) - val versionedGossip = newGossip + vclockNode + val versionedGossip = newGossip :+ vclockNode val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) @@ -772,7 +772,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip - val versionedGossip = newGossip + vclockNode + val versionedGossip = newGossip :+ vclockNode val newState = localState copy (latestGossip = versionedGossip seen selfAddress) if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update @@ -793,7 +793,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (remoteGossip.version <> localGossip.version) { // concurrent val mergedGossip = remoteGossip merge localGossip - val versionedMergedGossip = mergedGossip + vclockNode + val versionedMergedGossip = mergedGossip :+ vclockNode // FIXME change to debug log level, when failure detector is stable log.info( @@ -855,7 +855,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newGossip = localGossip copy (members = newMembers) // version my changes - val versionedGossip = newGossip + vclockNode + val versionedGossip = newGossip :+ vclockNode val seenVersionedGossip = versionedGossip seen selfAddress state copy (latestGossip = seenVersionedGossip) @@ -992,7 +992,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newGossip = localGossip copy (overview = newOverview, members = newMembers) // updating vclock and 'seen' table - val versionedGossip = newGossip + vclockNode + val versionedGossip = newGossip :+ vclockNode val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) @@ -1111,7 +1111,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ---------------------- // 5. Updating the vclock version for the changes // ---------------------- - val versionedGossip = newGossip + vclockNode + val versionedGossip = newGossip :+ vclockNode // ---------------------- // 6. Updating the 'seen' table diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index 82c1b9881d..ed6724058f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -19,7 +19,7 @@ class VectorClockException(message: String) extends AkkaException(message) */ trait Versioned[T] { def version: VectorClock - def +(node: VectorClock.Node): T + def :+(node: VectorClock.Node): T } /** @@ -142,7 +142,7 @@ case class VectorClock( /** * Increment the version for the node passed as argument. Returns a new VectorClock. */ - def +(node: Node): VectorClock = copy(versions = versions + (node -> Timestamp())) + def :+(node: Node): VectorClock = copy(versions = versions + (node -> Timestamp())) /** * Returns true if this and that are concurrent else false. diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala index de1142b668..19ad9410c4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala @@ -27,67 +27,67 @@ class VectorClockSpec extends AkkaSpec { "pass misc comparison test 1" in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1 + Node("1") - val clock3_1 = clock2_1 + Node("2") - val clock4_1 = clock3_1 + Node("1") + val clock2_1 = clock1_1 :+ Node("1") + val clock3_1 = clock2_1 :+ Node("2") + val clock4_1 = clock3_1 :+ Node("1") val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + Node("1") - val clock3_2 = clock2_2 + Node("2") - val clock4_2 = clock3_2 + Node("1") + val clock2_2 = clock1_2 :+ Node("1") + val clock3_2 = clock2_2 :+ Node("2") + val clock4_2 = clock3_2 :+ Node("1") clock4_1 <> clock4_2 must be(false) } "pass misc comparison test 2" in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1 + Node("1") - val clock3_1 = clock2_1 + Node("2") - val clock4_1 = clock3_1 + Node("1") + val clock2_1 = clock1_1 :+ Node("1") + val clock3_1 = clock2_1 :+ Node("2") + val clock4_1 = clock3_1 :+ Node("1") val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + Node("1") - val clock3_2 = clock2_2 + Node("2") - val clock4_2 = clock3_2 + Node("1") - val clock5_2 = clock4_2 + Node("3") + val clock2_2 = clock1_2 :+ Node("1") + val clock3_2 = clock2_2 :+ Node("2") + val clock4_2 = clock3_2 :+ Node("1") + val clock5_2 = clock4_2 :+ Node("3") clock4_1 < clock5_2 must be(true) } "pass misc comparison test 3" in { var clock1_1 = VectorClock() - val clock2_1 = clock1_1 + Node("1") + val clock2_1 = clock1_1 :+ Node("1") val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + Node("2") + val clock2_2 = clock1_2 :+ Node("2") clock2_1 <> clock2_2 must be(true) } "pass misc comparison test 4" in { val clock1_3 = VectorClock() - val clock2_3 = clock1_3 + Node("1") - val clock3_3 = clock2_3 + Node("2") - val clock4_3 = clock3_3 + Node("1") + val clock2_3 = clock1_3 :+ Node("1") + val clock3_3 = clock2_3 :+ Node("2") + val clock4_3 = clock3_3 :+ Node("1") val clock1_4 = VectorClock() - val clock2_4 = clock1_4 + Node("1") - val clock3_4 = clock2_4 + Node("1") - val clock4_4 = clock3_4 + Node("3") + val clock2_4 = clock1_4 :+ Node("1") + val clock3_4 = clock2_4 :+ Node("1") + val clock4_4 = clock3_4 :+ Node("3") clock4_3 <> clock4_4 must be(true) } "pass misc comparison test 5" in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1 + Node("2") - val clock3_1 = clock2_1 + Node("2") + val clock2_1 = clock1_1 :+ Node("2") + val clock3_1 = clock2_1 :+ Node("2") val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + Node("1") - val clock3_2 = clock2_2 + Node("2") - val clock4_2 = clock3_2 + Node("2") - val clock5_2 = clock4_2 + Node("3") + val clock2_2 = clock1_2 :+ Node("1") + val clock3_2 = clock2_2 :+ Node("2") + val clock4_2 = clock3_2 :+ Node("2") + val clock5_2 = clock4_2 :+ Node("3") clock3_1 < clock5_2 must be(true) clock5_2 > clock3_1 must be(true) @@ -95,12 +95,12 @@ class VectorClockSpec extends AkkaSpec { "pass misc comparison test 6" in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1 + Node("1") - val clock3_1 = clock2_1 + Node("2") + val clock2_1 = clock1_1 :+ Node("1") + val clock3_1 = clock2_1 :+ Node("2") val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + Node("1") - val clock3_2 = clock2_2 + Node("1") + val clock2_2 = clock1_2 :+ Node("1") + val clock3_2 = clock2_2 :+ Node("1") clock3_1 <> clock3_2 must be(true) clock3_2 <> clock3_1 must be(true) @@ -108,14 +108,14 @@ class VectorClockSpec extends AkkaSpec { "pass misc comparison test 7" in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1 + Node("1") - val clock3_1 = clock2_1 + Node("2") - val clock4_1 = clock3_1 + Node("2") - val clock5_1 = clock4_1 + Node("3") + val clock2_1 = clock1_1 :+ Node("1") + val clock3_1 = clock2_1 :+ Node("2") + val clock4_1 = clock3_1 :+ Node("2") + val clock5_1 = clock4_1 :+ Node("3") val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + Node("2") - val clock3_2 = clock2_2 + Node("2") + val clock2_2 = clock1_2 :+ Node("2") + val clock3_2 = clock2_2 :+ Node("2") clock5_1 <> clock3_2 must be(true) clock3_2 <> clock5_1 must be(true) @@ -127,14 +127,14 @@ class VectorClockSpec extends AkkaSpec { val node3 = Node("3") val clock1_1 = VectorClock() - val clock2_1 = clock1_1 + node1 - val clock3_1 = clock2_1 + node2 - val clock4_1 = clock3_1 + node2 - val clock5_1 = clock4_1 + node3 + val clock2_1 = clock1_1 :+ node1 + val clock3_1 = clock2_1 :+ node2 + val clock4_1 = clock3_1 :+ node2 + val clock5_1 = clock4_1 :+ node3 val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + node2 - val clock3_2 = clock2_2 + node2 + val clock2_2 = clock1_2 :+ node2 + val clock3_2 = clock2_2 :+ node2 val merged1 = clock3_2 merge clock5_1 merged1.versions.size must be(3) @@ -164,14 +164,14 @@ class VectorClockSpec extends AkkaSpec { val node4 = Node("4") val clock1_1 = VectorClock() - val clock2_1 = clock1_1 + node1 - val clock3_1 = clock2_1 + node2 - val clock4_1 = clock3_1 + node2 - val clock5_1 = clock4_1 + node3 + val clock2_1 = clock1_1 :+ node1 + val clock3_1 = clock2_1 :+ node2 + val clock4_1 = clock3_1 :+ node2 + val clock5_1 = clock4_1 :+ node3 val clock1_2 = VectorClock() - val clock2_2 = clock1_2 + node4 - val clock3_2 = clock2_2 + node4 + val clock2_2 = clock1_2 :+ node4 + val clock3_2 = clock2_2 :+ node4 val merged1 = clock3_2 merge clock5_1 merged1.versions.size must be(4) @@ -204,8 +204,8 @@ class VectorClockSpec extends AkkaSpec { val v1 = VectorClock() val v2 = VectorClock() - val vv1 = v1 + node1 - val vv2 = v2 + node2 + val vv1 = v1 :+ node1 + val vv2 = v2 :+ node2 (vv1 > v1) must equal(true) (vv2 > v2) must equal(true) @@ -225,12 +225,12 @@ class VectorClockSpec extends AkkaSpec { val a = VectorClock() val b = VectorClock() - val a1 = a + node1 - val b1 = b + node2 + val a1 = a :+ node1 + val b1 = b :+ node2 - var a2 = a1 + node1 + var a2 = a1 :+ node1 var c = a2.merge(b1) - var c1 = c + node3 + var c1 = c :+ node3 (c1 > a2) must equal(true) (c1 > b1) must equal(true) @@ -239,7 +239,7 @@ class VectorClockSpec extends AkkaSpec { "An instance of Versioned" must { class TestVersioned(val version: VectorClock = VectorClock()) extends Versioned[TestVersioned] { - def +(node: Node): TestVersioned = new TestVersioned(version + node) + def :+(node: Node): TestVersioned = new TestVersioned(version :+ node) } import Versioned.latestVersionOf @@ -251,67 +251,67 @@ class VectorClockSpec extends AkkaSpec { "happen before an identical versioned with a single additional event" in { val versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 + Node("1") - val versioned3_1 = versioned2_1 + Node("2") - val versioned4_1 = versioned3_1 + Node("1") + val versioned2_1 = versioned1_1 :+ Node("1") + val versioned3_1 = versioned2_1 :+ Node("2") + val versioned4_1 = versioned3_1 :+ Node("1") val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 + Node("1") - val versioned3_2 = versioned2_2 + Node("2") - val versioned4_2 = versioned3_2 + Node("1") - val versioned5_2 = versioned4_2 + Node("3") + val versioned2_2 = versioned1_2 :+ Node("1") + val versioned3_2 = versioned2_2 :+ Node("2") + val versioned4_2 = versioned3_2 :+ Node("1") + val versioned5_2 = versioned4_2 :+ Node("3") latestVersionOf[TestVersioned](versioned4_1, versioned5_2) must be(versioned5_2) } "pass misc comparison test 1" in { var versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 + Node("1") + val versioned2_1 = versioned1_1 :+ Node("1") val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 + Node("2") + val versioned2_2 = versioned1_2 :+ Node("2") latestVersionOf[TestVersioned](versioned2_1, versioned2_2) must be(versioned2_2) } "pass misc comparison test 2" in { val versioned1_3 = new TestVersioned() - val versioned2_3 = versioned1_3 + Node("1") - val versioned3_3 = versioned2_3 + Node("2") - val versioned4_3 = versioned3_3 + Node("1") + val versioned2_3 = versioned1_3 :+ Node("1") + val versioned3_3 = versioned2_3 :+ Node("2") + val versioned4_3 = versioned3_3 :+ Node("1") val versioned1_4 = new TestVersioned() - val versioned2_4 = versioned1_4 + Node("1") - val versioned3_4 = versioned2_4 + Node("1") - val versioned4_4 = versioned3_4 + Node("3") + val versioned2_4 = versioned1_4 :+ Node("1") + val versioned3_4 = versioned2_4 :+ Node("1") + val versioned4_4 = versioned3_4 :+ Node("3") latestVersionOf[TestVersioned](versioned4_3, versioned4_4) must be(versioned4_4) } "pass misc comparison test 3" in { val versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 + Node("2") - val versioned3_1 = versioned2_1 + Node("2") + val versioned2_1 = versioned1_1 :+ Node("2") + val versioned3_1 = versioned2_1 :+ Node("2") val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 + Node("1") - val versioned3_2 = versioned2_2 + Node("2") - val versioned4_2 = versioned3_2 + Node("2") - val versioned5_2 = versioned4_2 + Node("3") + val versioned2_2 = versioned1_2 :+ Node("1") + val versioned3_2 = versioned2_2 :+ Node("2") + val versioned4_2 = versioned3_2 :+ Node("2") + val versioned5_2 = versioned4_2 :+ Node("3") latestVersionOf[TestVersioned](versioned3_1, versioned5_2) must be(versioned5_2) } "pass misc comparison test 4" in { val versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 + Node("1") - val versioned3_1 = versioned2_1 + Node("2") - val versioned4_1 = versioned3_1 + Node("2") - val versioned5_1 = versioned4_1 + Node("3") + val versioned2_1 = versioned1_1 :+ Node("1") + val versioned3_1 = versioned2_1 :+ Node("2") + val versioned4_1 = versioned3_1 :+ Node("2") + val versioned5_1 = versioned4_1 :+ Node("3") val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 + Node("2") - val versioned3_2 = versioned2_2 + Node("2") + val versioned2_2 = versioned1_2 :+ Node("2") + val versioned3_2 = versioned2_2 :+ Node("2") latestVersionOf[TestVersioned](versioned5_1, versioned3_2) must be(versioned3_2) } From 42c5281d5a927882106e478b73dfb1aaea63cdf0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 09:37:10 +0200 Subject: [PATCH 12/19] Correct? implementation of merge and other actions, see #2077 * Merge unreachable using highestPriorityOf * Avoid merge result in node existing in both members and unreachable * Fix joining only allowed when !alreadyMember && !isUnreachable (non Down) * Fix filter bug of unreachable in downing and leaderActions * Minor cleanups --- .../src/main/scala/akka/cluster/Cluster.scala | 143 ++++++++---------- .../test/scala/akka/cluster/GossipSpec.scala | 53 +++++-- 2 files changed, 104 insertions(+), 92 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 571a8eaf68..9f241b684d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -220,25 +220,30 @@ case class Gossip( // 1. merge vector clocks val mergedVClock = this.version merge that.version - // 2. group all members by Address => Seq[Member] - val membersGroupedByAddress = (this.members.toSeq ++ that.members.toSeq).groupBy(_.address) - - // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups - val mergedMembers = - Gossip.emptyMembers ++ - membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ - acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) - } - - // 4. merge meta-data + // 2. merge meta-data val mergedMeta = this.meta ++ that.meta - // 5. merge gossip overview - val mergedOverview = GossipOverview( - this.overview.seen ++ that.overview.seen, - this.overview.unreachable ++ that.overview.unreachable) + def reduceHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { + // group all members by Address => Seq[Member] + val groupedByAddress = (a ++ b).groupBy(_.address) + // pick highest MemberStatus + (groupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ + acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) + }).toSet + } - Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock) + // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups + val mergedUnreachable = reduceHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) + + // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, + // and exclude unreachable + val mergedMembers = Gossip.emptyMembers ++ reduceHighestPriority(this.members.toSeq, that.members.toSeq). + filterNot(m ⇒ mergedUnreachable.contains(m)) + + // 5. merge seen (FIXME is this correct?) + val mergedSeen = this.overview.seen ++ that.overview.seen + + Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) } override def toString = @@ -648,11 +653,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members + val localUnreachable = localGossip.overview.unreachable - if (!localMembers.exists(_.address == node)) { + val alreadyMember = localMembers.exists(_.address == node) + val isUnreachable = localUnreachable.exists { m ⇒ + m.address == node && m.status != MemberStatus.Down && m.status != MemberStatus.Removed + } + + if (!alreadyMember && !isUnreachable) { // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node } + val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining @@ -719,8 +730,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } /** - * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not alread there) - * and its status is set to DOWN. The node is alo removed from the 'seen' table. + * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there) + * and its status is set to DOWN. The node is also removed from the 'seen' table. * * The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly * to this node and it will then go through the normal JOINING procedure. @@ -735,42 +746,34 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localUnreachableMembers = localOverview.unreachable // 1. check if the node to DOWN is in the 'members' set - var downedMember: Option[Member] = None - val newMembers = - localMembers - .map { member ⇒ - if (member.address == address) { - log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, member.address) - val newMember = member copy (status = MemberStatus.Down) - downedMember = Some(newMember) - newMember - } else member - } - .filter(_.status != MemberStatus.Down) + val downedMember: Option[Member] = localMembers.find(_.address == address).map(m ⇒ m.copy(status = MemberStatus.Down)) + val newMembers = downedMember match { + case Some(m) ⇒ + log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address) + localMembers - m + case None ⇒ localMembers + } // 2. check if the node to DOWN is in the 'unreachable' set val newUnreachableMembers = - localUnreachableMembers - .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN - .map { member ⇒ - if (member.address == address) { - log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) - member copy (status = MemberStatus.Down) - } else member - } + localUnreachableMembers.map { member ⇒ + // no need to DOWN members already DOWN + if (member.address == address && member.status != MemberStatus.Down) { + log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) + member copy (status = MemberStatus.Down) + } else member + } // 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set. - val newUnreachablePlusNewlyDownedMembers = downedMember match { - case Some(member) ⇒ newUnreachableMembers + member - case None ⇒ newUnreachableMembers - } + val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember // 4. remove nodes marked as DOWN from the 'seen' table - val newSeen = newUnreachablePlusNewlyDownedMembers.foldLeft(localSeen) { (currentSeen, member) ⇒ - currentSeen - member.address + val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { + case m if m.status == MemberStatus.Down ⇒ m.address } - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview + // update gossip overview + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip val versionedGossip = newGossip + vclockNode val newState = localState copy (latestGossip = versionedGossip seen selfAddress) @@ -831,36 +834,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private def autoJoin(): Unit = nodeToJoin foreach join - /** - * Switches the member status. - * - * @param newStatus the new member status - * @param oldState the state to change the member status in - * @return the updated new state with the new member status - */ - private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { - log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus) - - val localSelf = self - - val localGossip = state.latestGossip - val localMembers = localGossip.members - - // change my state into a "new" self - val newSelf = localSelf copy (status = newStatus) - - // change my state in 'gossip.members' - val newMembers = localMembers map { member ⇒ if (member.address == selfAddress) newSelf else member } - - val newGossip = localGossip copy (members = newMembers) - - // version my changes - val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress - - state copy (latestGossip = seenVersionedGossip) - } - /** * INTERNAL API * @@ -985,8 +958,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable - val newMembers = localMembers diff newlyDetectedUnreachableMembers - val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers + val newMembers = localMembers -- newlyDetectedUnreachableMembers + val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers val newOverview = localOverview copy (unreachable = newUnreachableMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) @@ -1090,16 +1063,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 4. Move UNREACHABLE => DOWN (auto-downing by leader) // ---------------------- val newUnreachableMembers = - localUnreachableMembers - .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN - .map { member ⇒ + localUnreachableMembers.map { member ⇒ + // no need to DOWN members already DOWN + if (member.status == MemberStatus.Down) member + else { log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Down) } + } // removing nodes marked as DOWN from the 'seen' table - val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) ⇒ currentSeen - member.address) + val newSeen = localSeen -- newUnreachableMembers.collect { + case m if m.status == MemberStatus.Down ⇒ m.address + } val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview localGossip copy (overview = newOverview) // update gossip diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 77cd0c52ba..985b6d5a89 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -12,18 +12,20 @@ import scala.collection.immutable.SortedSet @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class GossipSpec extends WordSpec with MustMatchers { + import MemberStatus._ + + val a1 = Member(Address("akka", "sys", "a", 2552), Up) + val a2 = Member(Address("akka", "sys", "a", 2552), Joining) + val b1 = Member(Address("akka", "sys", "b", 2552), Up) + val b2 = Member(Address("akka", "sys", "b", 2552), Removed) + val c1 = Member(Address("akka", "sys", "c", 2552), Leaving) + val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) + val d2 = Member(Address("akka", "sys", "d", 2552), Removed) + "A Gossip" must { "merge members by status priority" in { - import MemberStatus._ - val a1 = Member(Address("akka", "sys", "a", 2552), Up) - val a2 = Member(Address("akka", "sys", "a", 2552), Joining) - val b1 = Member(Address("akka", "sys", "b", 2552), Up) - val b2 = Member(Address("akka", "sys", "b", 2552), Removed) - val c1 = Member(Address("akka", "sys", "c", 2552), Leaving) - val c2 = Member(Address("akka", "sys", "c", 2552), Up) - val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) - val d2 = Member(Address("akka", "sys", "d", 2552), Removed) val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)) val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)) @@ -38,5 +40,38 @@ class GossipSpec extends WordSpec with MustMatchers { } + "merge unreachable by status priority" in { + + val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a1, b1, c1, d1))) + val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a2, b2, c2, d2))) + + val merged1 = g1 merge g2 + merged1.overview.unreachable must be(Set(a1, b2, c1, d2)) + merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + + val merged2 = g2 merge g1 + merged2.overview.unreachable must be(Set(a1, b2, c1, d2)) + merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + + } + + "merge by excluding unreachable from members" in { + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = SortedSet(c1, d1))) + val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = SortedSet(b2, d2))) + + val merged1 = g1 merge g2 + merged1.members must be(SortedSet(a1)) + merged1.members.toSeq.map(_.status) must be(Seq(Up)) + merged1.overview.unreachable must be(Set(b2, c1, d2)) + merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) + + val merged2 = g2 merge g1 + merged2.members must be(SortedSet(a1)) + merged2.members.toSeq.map(_.status) must be(Seq(Up)) + merged2.overview.unreachable must be(Set(b2, c1, d2)) + merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) + + } + } } From ff5c99a80d4e595dfa7ab2ae585f624e4e4c7f0d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 11:04:27 +0200 Subject: [PATCH 13/19] Minor cleanup, based on review comments, see #2077 --- .../src/main/scala/akka/cluster/Cluster.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 9f241b684d..17842453bb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -227,9 +227,9 @@ case class Gossip( // group all members by Address => Seq[Member] val groupedByAddress = (a ++ b).groupBy(_.address) // pick highest MemberStatus - (groupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ - acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) - }).toSet + (Set.empty[Member] /: groupedByAddress) { + case (acc, (_, members)) ⇒ acc + members.reduceLeft(Member.highestPriorityOf) + } } // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups @@ -238,7 +238,7 @@ case class Gossip( // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable val mergedMembers = Gossip.emptyMembers ++ reduceHighestPriority(this.members.toSeq, that.members.toSeq). - filterNot(m ⇒ mergedUnreachable.contains(m)) + filterNot(mergedUnreachable.contains) // 5. merge seen (FIXME is this correct?) val mergedSeen = this.overview.seen ++ that.overview.seen @@ -746,7 +746,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localUnreachableMembers = localOverview.unreachable // 1. check if the node to DOWN is in the 'members' set - val downedMember: Option[Member] = localMembers.find(_.address == address).map(m ⇒ m.copy(status = MemberStatus.Down)) + val downedMember: Option[Member] = localMembers.collectFirst { + case m if m.address == address ⇒ m.copy(status = MemberStatus.Down) + } val newMembers = downedMember match { case Some(m) ⇒ log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address) From f3d9f9c4e80e5f521baf4fa279b7de21ccd34b0c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 11:19:06 +0200 Subject: [PATCH 14/19] Merge seen table by starting with empty seen after merge, see #2077 --- .../src/main/scala/akka/cluster/Cluster.scala | 4 ++-- .../src/test/scala/akka/cluster/GossipSpec.scala | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 17842453bb..07712d8ed9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -240,8 +240,8 @@ case class Gossip( val mergedMembers = Gossip.emptyMembers ++ reduceHighestPriority(this.members.toSeq, that.members.toSeq). filterNot(mergedUnreachable.contains) - // 5. merge seen (FIXME is this correct?) - val mergedSeen = this.overview.seen ++ that.overview.seen + // 5. fresh seen table + val mergedSeen = Map.empty[Address, VectorClock] Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 985b6d5a89..8c790cf159 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -73,5 +73,17 @@ class GossipSpec extends WordSpec with MustMatchers { } + "start with fresh seen table after merge" in { + val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)).seen(a1.address).seen(b1.address) + val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)).seen(b2.address).seen(c2.address) + + val merged1 = g1 merge g2 + merged1.overview.seen.isEmpty must be(true) + + val merged2 = g2 merge g1 + merged2.overview.seen.isEmpty must be(true) + + } + } } From 5b89d25c37fc7836e4082f581c87feedb6f89410 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 15:23:45 +0200 Subject: [PATCH 15/19] Add invariant assertions to Gossip, see #2077 * Add doc about how members are "moved" --- .../src/main/scala/akka/cluster/Cluster.scala | 71 +++++++++++++++---- .../test/scala/akka/cluster/GossipSpec.scala | 39 ++++++---- 2 files changed, 85 insertions(+), 25 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 07712d8ed9..3fecd7524b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -147,7 +147,13 @@ case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage * * Can be one of: Joining, Up, Leaving, Exiting and Down. */ -sealed trait MemberStatus extends ClusterMessage +sealed trait MemberStatus extends ClusterMessage { + /** + * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. + */ + def isUnavailable: Boolean = this == MemberStatus.Down || this == MemberStatus.Removed +} + object MemberStatus { case object Joining extends MemberStatus case object Up extends MemberStatus @@ -155,11 +161,6 @@ object MemberStatus { case object Exiting extends MemberStatus case object Down extends MemberStatus case object Removed extends MemberStatus - - /** - * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. - */ - def isUnavailable(status: MemberStatus): Boolean = status == MemberStatus.Down || status == MemberStatus.Removed } /** @@ -169,8 +170,6 @@ case class GossipOverview( seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock], unreachable: Set[Member] = Set.empty[Member]) { - // FIXME document when nodes are put in 'unreachable' set and removed from 'members' - override def toString = "GossipOverview(seen = [" + seen.mkString(", ") + "], unreachable = [" + unreachable.mkString(", ") + @@ -182,7 +181,31 @@ object Gossip { } /** - * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock. + * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - + * all versioned by a vector clock. + * + * When a node is joining the Member, with status Joining, is added to `members`. + * If the joining node was downed it is moved from `overview.unreachable` (status Down) + * to `members` (status Joining). It cannot rejoin if not first downed. + * + * When convergence is reached the leader change status of `members` from Joining + * to Up. + * + * When failure detector consider a node as unavailble it will be moved from + * `members` to `overview.unreachable`. + * + * When a node is downed, either manually or automatically, it is moved from `members` + * to `overview.unreachable` (status Down). It is also removed from `overview.seen` + * table. The node will reside as Down in the `overview.unreachable` set until joining + * again and it will then go through the normal joining procedure. + * + * When a Gossip is received the version (vector clock) is used to determine if the + * received Gossip is newer or older than the current local Gossip. The received Gossip + * and local Gossip is merged in case of concurrent vector clocks, i.e. not same history. + * When merged the seen table is cleared. + * + * TODO document leaving, exiting and removed when that is implemented + * */ case class Gossip( overview: GossipOverview = GossipOverview(), @@ -192,6 +215,28 @@ case class Gossip( extends ClusterMessage // is a serializable cluster message with Versioned[Gossip] { + // FIXME can be disabled as optimization + assertInvariants + private def assertInvariants: Unit = { + val unreachableAndLive = members.intersect(overview.unreachable) + if (unreachableAndLive.nonEmpty) + throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" + format unreachableAndLive.mkString(", ")) + + val allowedLiveMemberStatuses: Set[MemberStatus] = Set(MemberStatus.Joining, MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting) + def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status) + if (members exists hasNotAllowedLiveMemberStatus) + throw new IllegalArgumentException("Live members must have status [%s], got [%s]" + format (allowedLiveMemberStatuses.mkString(", "), + (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) + + val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address) + if (seenButNotMember.nonEmpty) + throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" + format seenButNotMember.mkString(", ")) + + } + /** * Increments the version for this 'Node'. */ @@ -223,7 +268,7 @@ case class Gossip( // 2. merge meta-data val mergedMeta = this.meta ++ that.meta - def reduceHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { + def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { // group all members by Address => Seq[Member] val groupedByAddress = (a ++ b).groupBy(_.address) // pick highest MemberStatus @@ -233,11 +278,11 @@ case class Gossip( } // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = reduceHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) + val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ reduceHighestPriority(this.members.toSeq, that.members.toSeq). + val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq). filterNot(mergedUnreachable.contains) // 5. fresh seen table @@ -1145,7 +1190,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localMembers = localGossip.members val localUnreachableMembers = localOverview.unreachable val isUnreachable = localUnreachableMembers exists { _.address == selfAddress } - val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && MemberStatus.isUnavailable(m.status) } + val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && m.status.isUnavailable } isUnreachable || hasUnavailableMemberStatus } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 8c790cf159..449ebf7bff 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -22,28 +22,30 @@ class GossipSpec extends WordSpec with MustMatchers { val c2 = Member(Address("akka", "sys", "c", 2552), Up) val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) val d2 = Member(Address("akka", "sys", "d", 2552), Removed) + val e1 = Member(Address("akka", "sys", "e", 2552), Joining) + val e2 = Member(Address("akka", "sys", "e", 2552), Up) "A Gossip" must { "merge members by status priority" in { - val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)) - val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)) + val g1 = Gossip(members = SortedSet(a1, c1, e1)) + val g2 = Gossip(members = SortedSet(a2, c2, e2)) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1, b2, c1, d2)) - merged1.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged1.members must be(SortedSet(a1, c1, e2)) + merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1, b2, c1, d2)) - merged2.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged2.members must be(SortedSet(a1, c1, e2)) + merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) } "merge unreachable by status priority" in { - val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a1, b1, c1, d1))) - val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a2, b2, c2, d2))) + val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a1, b1, c1, d1))) + val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) val merged1 = g1 merge g2 merged1.overview.unreachable must be(Set(a1, b2, c1, d2)) @@ -56,8 +58,8 @@ class GossipSpec extends WordSpec with MustMatchers { } "merge by excluding unreachable from members" in { - val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = SortedSet(c1, d1))) - val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = SortedSet(b2, d2))) + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c1, d1))) + val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2))) val merged1 = g1 merge g2 merged1.members must be(SortedSet(a1)) @@ -74,8 +76,8 @@ class GossipSpec extends WordSpec with MustMatchers { } "start with fresh seen table after merge" in { - val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)).seen(a1.address).seen(b1.address) - val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)).seen(b2.address).seen(c2.address) + val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address) + val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address) val merged1 = g1 merge g2 merged1.overview.seen.isEmpty must be(true) @@ -85,5 +87,18 @@ class GossipSpec extends WordSpec with MustMatchers { } + "not have node in both members and unreachable" in intercept[IllegalArgumentException] { + Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(b2))) + } + + "not have live members with wrong status" in intercept[IllegalArgumentException] { + // b2 is Removed + Gossip(members = SortedSet(a2, b2)) + } + + "not have non cluster members in seen table" in intercept[IllegalArgumentException] { + Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address).seen(b1.address) + } + } } From afbeb3e5f91512c8b05d88ba8f1cb5a871ee1537 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 15:33:38 +0200 Subject: [PATCH 16/19] import MemberStatus._ --- .../src/main/scala/akka/cluster/Cluster.scala | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3fecd7524b..48035a0e4e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -26,6 +26,7 @@ import scala.annotation.tailrec import com.google.protobuf.ByteString import akka.util.internal.HashedWheelTimer import akka.dispatch.MonitorableThreadFactory +import MemberStatus._ /** * Interface for membership change listener. @@ -96,7 +97,6 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess * Factory and Utility module for Member instances. */ object Member { - import MemberStatus._ /** * Sort Address by host and port @@ -151,7 +151,7 @@ sealed trait MemberStatus extends ClusterMessage { /** * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. */ - def isUnavailable: Boolean = this == MemberStatus.Down || this == MemberStatus.Removed + def isUnavailable: Boolean = this == Down || this == Removed } object MemberStatus { @@ -223,7 +223,7 @@ case class Gossip( throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" format unreachableAndLive.mkString(", ")) - val allowedLiveMemberStatuses: Set[MemberStatus] = Set(MemberStatus.Joining, MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting) + val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting) def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status) if (members exists hasNotAllowedLiveMemberStatus) throw new IllegalArgumentException("Live members must have status [%s], got [%s]" @@ -473,7 +473,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } private val state = { - val member = Member(selfAddress, MemberStatus.Joining) + val member = Member(selfAddress, Joining) val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock val seenVersionedGossip = versionedGossip seen selfAddress new AtomicReference[State](State(seenVersionedGossip)) @@ -702,7 +702,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val alreadyMember = localMembers.exists(_.address == node) val isUnreachable = localUnreachable.exists { m ⇒ - m.address == node && m.status != MemberStatus.Down && m.status != MemberStatus.Removed + m.address == node && m.status != Down && m.status != Removed } if (!alreadyMember && !isUnreachable) { @@ -711,7 +711,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) - val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining + val newMembers = localMembers + Member(node, Joining) // add joining node as Joining val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip + vclockNode @@ -739,7 +739,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localGossip = localState.latestGossip val localMembers = localGossip.members - val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING + val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING val newGossip = localGossip copy (members = newMembers) val versionedGossip = newGossip + vclockNode @@ -792,7 +792,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 1. check if the node to DOWN is in the 'members' set val downedMember: Option[Member] = localMembers.collectFirst { - case m if m.address == address ⇒ m.copy(status = MemberStatus.Down) + case m if m.address == address ⇒ m.copy(status = Down) } val newMembers = downedMember match { case Some(m) ⇒ @@ -805,9 +805,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachableMembers.map { member ⇒ // no need to DOWN members already DOWN - if (member.address == address && member.status != MemberStatus.Down) { + if (member.address == address && member.status != Down) { log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) - member copy (status = MemberStatus.Down) + member copy (status = Down) } else member } @@ -816,7 +816,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 4. remove nodes marked as DOWN from the 'seen' table val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { - case m if m.status == MemberStatus.Down ⇒ m.address + case m if m.status == Down ⇒ m.address } // update gossip overview @@ -1073,30 +1073,30 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ---------------------- // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) // ---------------------- - if (member.status == MemberStatus.Joining) { + if (member.status == Joining) { log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Up) + member copy (status = Up) } else member } map { member ⇒ // ---------------------- // 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence) // ---------------------- - if (member.status == MemberStatus.Exiting) { + if (member.status == Exiting) { log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Removed) + member copy (status = Removed) } else member } map { member ⇒ // ---------------------- // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) // ---------------------- - if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { + if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Exiting) + member copy (status = Exiting) } else member } @@ -1112,17 +1112,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachableMembers.map { member ⇒ // no need to DOWN members already DOWN - if (member.status == MemberStatus.Down) member + if (member.status == Down) member else { log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Down) + member copy (status = Down) } } // removing nodes marked as DOWN from the 'seen' table val newSeen = localSeen -- newUnreachableMembers.collect { - case m if m.status == MemberStatus.Down ⇒ m.address + case m if m.status == Down ⇒ m.address } val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview @@ -1169,8 +1169,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // Else we can't continue to check for convergence // When that is done we check that all the entries in the 'seen' table have the same vector clock version if (unreachable.isEmpty || !unreachable.exists { m ⇒ - m.status != MemberStatus.Down && - m.status != MemberStatus.Removed + m.status != Down && m.status != Removed }) { val seen = gossip.overview.seen val views = Set.empty[VectorClock] ++ seen.values From 82645ca3c9b7ad97189b007148be1a788e41c0af Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 16:06:34 +0200 Subject: [PATCH 17/19] Additional check of expectedAddresses in listener tests --- .../akka/cluster/MembershipChangeListenerJoinSpec.scala | 3 ++- .../akka/cluster/MembershipChangeListenerLeavingSpec.scala | 4 +++- .../akka/cluster/MembershipChangeListenerUpSpec.scala | 7 +++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 1b296c58f1..536fb3b58d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -38,9 +38,10 @@ abstract class MembershipChangeListenerJoinSpec runOn(first) { val joinLatch = TestLatch() + val expectedAddresses = Set(firstAddress, secondAddress) cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) + if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining)) joinLatch.countDown() } }) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index 57cec4f389..eda29ea0f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -54,9 +54,11 @@ abstract class MembershipChangeListenerLeavingSpec runOn(third) { val latch = TestLatch() + val expectedAddresses = Set(firstAddress, secondAddress, thirdAddress) cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.exists(m ⇒ m.address == secondAddress && m.status == MemberStatus.Leaving)) + if (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == secondAddress && m.status == MemberStatus.Leaving)) latch.countDown() } }) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index c89bbe1f0a..f48f9c8d9b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -29,6 +29,7 @@ abstract class MembershipChangeListenerUpSpec lazy val firstAddress = node(first).address lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address "A set of connected cluster systems" must { @@ -38,9 +39,10 @@ abstract class MembershipChangeListenerUpSpec runOn(first, second) { val latch = TestLatch() + val expectedAddresses = Set(firstAddress, secondAddress) cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) latch.countDown() } }) @@ -59,9 +61,10 @@ abstract class MembershipChangeListenerUpSpec "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { val latch = TestLatch() + val expectedAddresses = Set(firstAddress, secondAddress, thirdAddress) cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) + if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) latch.countDown() } }) From bd7bdff2697fab82ef58a310c04b98f176ac4115 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 16:13:49 +0200 Subject: [PATCH 18/19] Improve debug log message of no convergence, see #2222 --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 5bc968920a..a904075d5e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1152,7 +1152,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val allMembersInSeen = gossip.members.forall(m ⇒ seen.contains(m.address)) if (hasUnreachable) { - log.debug("Cluster Node [{}] - No cluster convergence, due to unreachable [{}].", selfAddress, unreachable) + log.debug("Cluster Node [{}] - No cluster convergence, due to unreachable nodes [{}].", selfAddress, unreachable) None } else if (!allMembersInSeen) { log.debug("Cluster Node [{}] - No cluster convergence, due to members not in seen table [{}].", selfAddress, @@ -1160,13 +1160,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) None } else { - val views = (Set.empty[VectorClock] ++ seen.values).size + val views = seen.values.toSet.size if (views == 1) { log.debug("Cluster Node [{}] - Cluster convergence reached: [{}]", selfAddress, gossip.members.mkString(", ")) Some(gossip) } else { - log.debug("Cluster Node [{}] - No cluster convergence, due to [{}] different views.", selfAddress, views) + log.debug("Cluster Node [{}] - No cluster convergence, since not all nodes have seen the same state yet. [{} of {}]", + selfAddress, views, seen.values.size) None } } From 391e63332908a7a4b06970592094fbb54072b23c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 16:54:21 +0200 Subject: [PATCH 19/19] Improve docs based on feedback, see #2077 --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 48035a0e4e..c3e16de5e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -194,15 +194,15 @@ object Gossip { * When failure detector consider a node as unavailble it will be moved from * `members` to `overview.unreachable`. * - * When a node is downed, either manually or automatically, it is moved from `members` - * to `overview.unreachable` (status Down). It is also removed from `overview.seen` - * table. The node will reside as Down in the `overview.unreachable` set until joining + * When a node is downed, either manually or automatically, its status is changed to Down. + * It is also removed from `overview.seen` table. + * The node will reside as Down in the `overview.unreachable` set until joining * again and it will then go through the normal joining procedure. * * When a Gossip is received the version (vector clock) is used to determine if the * received Gossip is newer or older than the current local Gossip. The received Gossip - * and local Gossip is merged in case of concurrent vector clocks, i.e. not same history. - * When merged the seen table is cleared. + * and local Gossip is merged in case of conflicting version, i.e. vector clocks without + * same history. When merged the seen table is cleared. * * TODO document leaving, exiting and removed when that is implemented *