Merge pull request #18746 from akka/wip-18554-singleton-startup-patriknw

=clu #18554 Make oldest assignment deterministic when joining
This commit is contained in:
Patrik Nordwall 2015-11-06 14:48:57 +01:00
commit 1e36e5e187
27 changed files with 162 additions and 145 deletions

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.duration._
import akka.actor.Address
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.CurrentClusterState
object DeterministicOldestWhenJoiningMultiJvmSpec extends MultiNodeConfig {
val seed1 = role("seed1")
val seed2 = role("seed2")
val seed3 = role("seed3")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
# not too quick to trigger problematic scenario more often
akka.cluster.leader-actions-interval = 2000 ms
akka.cluster.gossip-interval = 500 ms
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class DeterministicOldestWhenJoiningMultiJvmNode1 extends DeterministicOldestWhenJoiningSpec
class DeterministicOldestWhenJoiningMultiJvmNode2 extends DeterministicOldestWhenJoiningSpec
class DeterministicOldestWhenJoiningMultiJvmNode3 extends DeterministicOldestWhenJoiningSpec
abstract class DeterministicOldestWhenJoiningSpec
extends MultiNodeSpec(DeterministicOldestWhenJoiningMultiJvmSpec)
with MultiNodeClusterSpec {
import DeterministicOldestWhenJoiningMultiJvmSpec._
// reverse order because that expose the bug in issue #18554
def seedNodes: immutable.IndexedSeq[Address] =
Vector(address(seed1), address(seed2), address(seed3)).sorted(Member.addressOrdering).reverse
val roleByAddress = Map(address(seed1) -> seed1, address(seed2) -> seed2, address(seed3) -> seed3)
"Joining a cluster" must {
"result in deterministic oldest node" taggedAs LongRunningTest in {
cluster.subscribe(testActor, classOf[MemberUp])
expectMsgType[CurrentClusterState]
runOn(roleByAddress(seedNodes.head)) {
cluster.joinSeedNodes(seedNodes)
}
enterBarrier("first-seed-joined")
runOn(roleByAddress(seedNodes(1)), roleByAddress(roleByAddress(seedNodes(2)))) {
cluster.joinSeedNodes(seedNodes)
}
within(10.seconds) {
val ups = List(expectMsgType[MemberUp], expectMsgType[MemberUp], expectMsgType[MemberUp])
ups.map(_.member).sorted(Member.ageOrdering).head.address should ===(seedNodes.head)
}
enterBarrier("after-1")
}
}
}

View file

@ -303,7 +303,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
* Wait until the specified nodes have seen the same gossip overview.
*/
def awaitSeenSameState(addresses: Address*): Unit =
awaitAssert((addresses.toSet -- clusterView.seenBy) should ===(Set.empty))
awaitAssert((addresses.toSet diff clusterView.seenBy) should ===(Set.empty))
/**
* Leader according to the address ordering of the roles.

View file

@ -3,8 +3,8 @@
*/
package akka.cluster
// TODO remove metrics
// FIXME this test is not migrated to metrics extension
// TODO remove metrics
// FIXME this test is not migrated to metrics extension
import language.postfixOps
import scala.annotation.tailrec
@ -981,7 +981,7 @@ abstract class StressSpec
timeout = remainingOrDefault)
awaitAllReachable()
}
val nextAddresses = clusterView.members.map(_.address) -- usedAddresses
val nextAddresses = clusterView.members.map(_.address) diff usedAddresses
runOn(usedRoles: _*) {
nextAddresses.size should ===(numberOfNodesJoinRemove)
}

View file

@ -46,7 +46,7 @@ abstract class TransitionSpec
def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail
def memberStatus(address: Address): MemberStatus = {
val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst {
val statusOption = (clusterView.members union clusterView.unreachableMembers).collectFirst {
case m if m.address == address m.status
}
statusOption.getOrElse(Removed)
@ -91,7 +91,7 @@ abstract class TransitionSpec
clusterView.latestStats.gossipStats.receivedGossipCount != oldCount // received gossip
}
// gossip chat will synchronize the views
awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty)
awaitCond((Set(fromRole, toRole) diff seenLatestGossip).isEmpty)
enterBarrier("after-gossip-" + gossipBarrierCounter)
}
runOn(fromRole) {
@ -99,7 +99,7 @@ abstract class TransitionSpec
// send gossip
cluster.clusterCore ! InternalClusterAction.SendGossipTo(toRole)
// gossip chat will synchronize the views
awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty)
awaitCond((Set(fromRole, toRole) diff seenLatestGossip).isEmpty)
enterBarrier("after-gossip-" + gossipBarrierCounter)
}
runOn(roles.filterNot(r r == fromRole || r == toRole): _*) {
@ -115,8 +115,7 @@ abstract class TransitionSpec
runOn(first) {
cluster join myself
awaitMemberStatus(myself, Joining)
leaderActions()
// first joining itself will immediately be moved to Up
awaitMemberStatus(myself, Up)
awaitCond(clusterView.isSingletonCluster)
}

View file

@ -342,7 +342,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult
def routeeAddresses = (routees map { case ActorRefRoutee(ref) fullAddress(ref) }).toSet
routees foreach { case ActorRefRoutee(ref) watch(ref) }
val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head
val notUsedAddress = ((roles map address).toSet diff routeeAddresses).head
val downAddress = routeeAddresses.find(_ != address(first)).get
val downRouteeRef = routees.collectFirst {
case ActorRefRoutee(ref) if ref.path.address == downAddress ref