diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 7d1222a7ab..e2b5c8e751 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -618,13 +618,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update else { - failureDetector heartbeat address // update heartbeat in failure detector - if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners foreach { _ notify newMembers } - } + if (address != selfAddress) failureDetector heartbeat address // update heartbeat in failure detector + notifyMembershipChangeListeners(localState, newState) } } + private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = + if (newState.latestGossip != oldState.latestGossip && convergence(newState.latestGossip).isDefined) { + newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } + } + /** * State transition to EXITING. */ @@ -698,9 +701,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update else { - if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } - } + notifyMembershipChangeListeners(localState, newState) } } @@ -741,10 +742,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address) if (sender.address != selfAddress) failureDetector heartbeat sender.address - - if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } - } + notifyMembershipChangeListeners(localState, newState) } } @@ -841,14 +839,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private[akka] def gossip(): Unit = { val localState = state.get + log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) + if (isSingletonCluster(localState)) { // gossip to myself // TODO could perhaps be optimized, no need to gossip to myself when Up? gossipTo(selfAddress) } else if (isAvailable(localState)) { - log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) - val localGossip = localState.latestGossip // important to not accidentally use `map` of the SortedSet, since the original order is not preserved val localMembers = localGossip.members.toIndexedSeq @@ -917,9 +915,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ else { log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) - if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners foreach { _ notify newMembers } - } + notifyMembershipChangeListeners(localState, newState) } } } @@ -1040,9 +1036,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) leaderActions() // recur else { - if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners foreach { _ notify newGossip.members } - } + notifyMembershipChangeListeners(localState, newState) } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala new file mode 100644 index 0000000000..b74fdd09db --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import java.util.concurrent.atomic.AtomicReference +import scala.collection.immutable.SortedSet + +object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(ConfigFactory.parseString(""" + akka.cluster { + gossip-interval = 400 ms + nr-of-deputy-nodes = 0 + } + akka.loglevel = DEBUG + """)) +} + +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec + +abstract class SunnyWeatherSpec + extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) + with MultiNodeClusterSpec { + + import SunnyWeatherMultiJvmSpec._ + + override def initialParticipants = roles.size + + "A normal cluster" must { + "be healthy" taggedAs LongRunningTest in { + + // start some + awaitClusterUp(first, second, third) + runOn(first, second, third) { + log.info("3 joined") + } + + // add a few more + awaitClusterUp(first, second, third, fourth, fifth) + log.info("5 joined") + + val unexpected = new AtomicReference[SortedSet[Member]] + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + // we don't expected any changes to the cluster + unexpected.set(members) + } + }) + + for (n ← 1 to 40) { + testConductor.enter("period-" + n) + unexpected.get must be(null) + awaitUpConvergence(roles.size) + assertLeaderIn(roles) + if (n % 5 == 0) log.info("Passed period [{}]", n) + 1.seconds.sleep + } + + testConductor.enter("after") + } + } +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 01a08da718..a0d7d5eac4 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -84,7 +84,7 @@ abstract class MultiNodeConfig { private[testkit] def deployments(node: RoleName): Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy - private[testkit] def roles: Seq[RoleName] = _roles + def roles: Seq[RoleName] = _roles }