Test normal healthy cluster, see #2195

* Fix that membership listeners should only notified when something changed
This commit is contained in:
Patrik Nordwall 2012-06-07 15:14:38 +02:00
parent bd979a8845
commit fcd08ed2b9
3 changed files with 92 additions and 20 deletions

View file

@ -618,11 +618,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
failureDetector heartbeat address // update heartbeat in failure detector
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
if (address != selfAddress) failureDetector heartbeat address // update heartbeat in failure detector
notifyMembershipChangeListeners(localState, newState)
}
}
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit =
if (newState.latestGossip != oldState.latestGossip && convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
/**
@ -698,9 +701,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
notifyMembershipChangeListeners(localState, newState)
}
}
@ -741,10 +742,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address)
if (sender.address != selfAddress) failureDetector heartbeat sender.address
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
notifyMembershipChangeListeners(localState, newState)
}
}
@ -841,14 +839,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private[akka] def gossip(): Unit = {
val localState = state.get
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (isSingletonCluster(localState)) {
// gossip to myself
// TODO could perhaps be optimized, no need to gossip to myself when Up?
gossipTo(selfAddress)
} else if (isAvailable(localState)) {
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
val localGossip = localState.latestGossip
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
@ -917,9 +915,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
else {
log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
}
notifyMembershipChangeListeners(localState, newState)
}
}
}
@ -1040,9 +1036,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newGossip.members }
}
notifyMembershipChangeListeners(localState, newState)
}
}
}

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic.AtomicReference
import scala.collection.immutable.SortedSet
object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(ConfigFactory.parseString("""
akka.cluster {
gossip-interval = 400 ms
nr-of-deputy-nodes = 0
}
akka.loglevel = DEBUG
"""))
}
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec
abstract class SunnyWeatherSpec
extends MultiNodeSpec(SunnyWeatherMultiJvmSpec)
with MultiNodeClusterSpec {
import SunnyWeatherMultiJvmSpec._
override def initialParticipants = roles.size
"A normal cluster" must {
"be healthy" taggedAs LongRunningTest in {
// start some
awaitClusterUp(first, second, third)
runOn(first, second, third) {
log.info("3 joined")
}
// add a few more
awaitClusterUp(first, second, third, fourth, fifth)
log.info("5 joined")
val unexpected = new AtomicReference[SortedSet[Member]]
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
// we don't expected any changes to the cluster
unexpected.set(members)
}
})
for (n 1 to 40) {
testConductor.enter("period-" + n)
unexpected.get must be(null)
awaitUpConvergence(roles.size)
assertLeaderIn(roles)
if (n % 5 == 0) log.info("Passed period [{}]", n)
1.seconds.sleep
}
testConductor.enter("after")
}
}
}

View file

@ -84,7 +84,7 @@ abstract class MultiNodeConfig {
private[testkit] def deployments(node: RoleName): Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy
private[testkit] def roles: Seq[RoleName] = _roles
def roles: Seq[RoleName] = _roles
}