Support concurrent startup of seed nodes, see #2270

* Implemented the startup sequence of seed nodes as
  described in #2305
* Test that verifies concurrent startup of seed nodes
This commit is contained in:
Patrik Nordwall 2012-08-14 13:55:22 +02:00
parent 2c6f482b85
commit d7b0089d7e
3 changed files with 44 additions and 35 deletions

View file

@ -72,15 +72,20 @@ private[cluster] object InternalClusterAction {
*/
case class InitJoinAck(address: Address) extends ClusterMessage
case object GossipTick
/**
* Marker interface for periodic tick messages
*/
trait Tick
case object HeartbeatTick
case object GossipTick extends Tick
case object ReapUnreachableTick
case object HeartbeatTick extends Tick
case object LeaderActionsTick
case object ReapUnreachableTick extends Tick
case object PublishStateTick
case object LeaderActionsTick extends Tick
case object PublishStateTick extends Tick
case class SendClusterMessage(to: Address, msg: ClusterMessage)
@ -223,7 +228,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
publishStateTask foreach { _.cancel() }
}
def receive = {
def uninitialized: Actor.Receive = {
case JoinSeedNode joinSeedNode()
case InitJoin // skip, not ready yet
case InitJoinAck(address) join(address)
case JoinTo(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case _: Tick // ignore periodic tasks until initialized
}
def initialized: Actor.Receive = {
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
@ -231,10 +245,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStateTick publishState()
case JoinSeedNode joinSeedNode()
case InitJoin initJoin()
case InitJoinAck(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case JoinTo(address) join(address)
case ClusterUserAction.Join(address) joining(address)
case ClusterUserAction.Down(address) downing(address)
@ -246,10 +257,14 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
}
def receive = uninitialized
def joinSeedNode(): Unit = {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
if (seedRoutees.isEmpty) join(selfAddress)
// only the node which is named first in the list of seed nodes will join itself
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
join(selfAddress)
else {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
implicit val within = Timeout(SeedNodeTimeout)
val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration)))
seedRouter ! InitJoin
@ -259,7 +274,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodeTimeout(): Unit = join(selfAddress)
def joinSeedNodeTimeout(): Unit = {
// try again later, first seed node must be started before other seed nodes can join
clusterScheduler.scheduleOnce(SeedNodeTimeout, self, JoinSeedNode)
}
/**
* Try to join this cluster node with the node specified by 'address'.
@ -276,7 +294,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
notifyListeners(localGossip)
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
context.become(initialized)
if (address == selfAddress)
joining(address)
else
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
}
/**

View file

@ -13,6 +13,7 @@ import scala.concurrent.util.duration._
object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
val seed1 = role("seed1")
val seed2 = role("seed2")
val seed3 = role("seed3")
val ordinary1 = role("ordinary1")
val ordinary2 = role("ordinary2")
@ -25,6 +26,7 @@ class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPup
class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
abstract class JoinSeedNodeSpec
extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec)
@ -32,37 +34,22 @@ abstract class JoinSeedNodeSpec
import JoinSeedNodeMultiJvmSpec._
override def seedNodes = IndexedSeq(seed1, seed2)
override def seedNodes = IndexedSeq(seed3, seed2, seed1)
"A cluster with configured seed nodes" must {
"start the seed nodes sequentially" taggedAs LongRunningTest in {
"be able to start the seed nodes concurrently" taggedAs LongRunningTest in {
// without looking up the addresses first there might be
// [akka://JoinSeedNodeSpec/user/TestConductorClient] cannot write GetAddress(RoleName(seed2)) while waiting for seed1
roles foreach address
// roles foreach address
runOn(seed1) {
startClusterNode()
}
enterBarrier("seed1-started")
runOn(seed2) {
startClusterNode()
}
enterBarrier("seed2-started")
runOn(seed1, seed2) {
awaitUpConvergence(2)
runOn(seed1, seed2, seed3) {
awaitUpConvergence(3)
}
enterBarrier("after-1")
}
"join the seed nodes at startup" taggedAs LongRunningTest in {
startClusterNode()
enterBarrier("all-started")
awaitUpConvergence(4)
awaitUpConvergence(roles.size)
enterBarrier("after-2")
}
}

View file

@ -22,7 +22,7 @@ import akka.actor.RootActorPath
object MultiNodeClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString("""
akka.cluster {
auto-join = off
auto-join = on
auto-down = off
gossip-interval = 200 ms
heartbeat-interval = 400 ms