+clu #3612 Allow join to uninitialized node

* join to self not needed when performing manual joining
This commit is contained in:
Patrik Nordwall 2013-09-17 14:20:29 +02:00
parent 9abf5bc4bd
commit cb42bf0785
2 changed files with 45 additions and 16 deletions

View file

@ -285,6 +285,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case InitJoin sender ! InitJoinNack(selfAddress) case InitJoin sender ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address) join(address) case ClusterUserAction.JoinTo(address) join(address)
case JoinSeedNodes(seedNodes) joinSeedNodes(seedNodes) case JoinSeedNodes(seedNodes) joinSeedNodes(seedNodes)
case Join(node, roles) joiningUninitialized(node, roles)
case msg: SubscriptionMessage publisher forward msg case msg: SubscriptionMessage publisher forward msg
} }
@ -292,25 +293,34 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case Welcome(from, gossip) welcome(joinWith, from, gossip) case Welcome(from, gossip) welcome(joinWith, from, gossip)
case InitJoin sender ! InitJoinNack(selfAddress) case InitJoin sender ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address) case ClusterUserAction.JoinTo(address)
context.become(uninitialized) becomeUninitialized()
join(address) join(address)
case JoinSeedNodes(seedNodes) case JoinSeedNodes(seedNodes)
context.become(uninitialized) becomeUninitialized()
joinSeedNodes(seedNodes) joinSeedNodes(seedNodes)
case Join(node, roles) joiningUninitialized(node, roles)
case msg: SubscriptionMessage publisher forward msg case msg: SubscriptionMessage publisher forward msg
case _: Tick case _: Tick
if (deadline.exists(_.isOverdue)) { if (deadline.exists(_.isOverdue)) {
context.become(uninitialized) becomeUninitialized()
if (SeedNodes.nonEmpty) joinSeedNodes(SeedNodes) if (SeedNodes.nonEmpty) joinSeedNodes(SeedNodes)
else join(joinWith) else join(joinWith)
} }
} }
def becomeUninitialized(): Unit = {
// make sure that join process is stopped
stopSeedNodeProcess()
context.become(uninitialized)
}
def becomeInitialized(): Unit = { def becomeInitialized(): Unit = {
// start heartbeatSender here, and not in constructor to make sure that // start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received // heartbeating doesn't start before Welcome is received
context.actorOf(Props[ClusterHeartbeatSender]. context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender") withDispatcher(UseDispatcher), name = "heartbeatSender")
// make sure that join process is stopped
stopSeedNodeProcess()
context.become(initialized) context.become(initialized)
} }
@ -351,8 +361,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def initJoin(): Unit = sender ! InitJoinAck(selfAddress) def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = { def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress")
if (seedNodes.nonEmpty) { if (seedNodes.nonEmpty) {
stopSeedNodeProcess()
seedNodeProcess = seedNodeProcess =
if (seedNodes == immutable.IndexedSeq(selfAddress)) { if (seedNodes == immutable.IndexedSeq(selfAddress)) {
self ! ClusterUserAction.JoinTo(selfAddress) self ! ClusterUserAction.JoinTo(selfAddress)
@ -384,17 +394,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
require(latestGossip.members.isEmpty, "Join can only be done from empty state") require(latestGossip.members.isEmpty, "Join can only be done from empty state")
// to support manual join when joining to seed nodes is stuck (no seed nodes available) // to support manual join when joining to seed nodes is stuck (no seed nodes available)
val snd = sender stopSeedNodeProcess()
seedNodeProcess match {
case Some(`snd`)
// seedNodeProcess completed, it will stop itself
seedNodeProcess = None
case Some(s)
// manual join, abort current seedNodeProcess
context stop s
seedNodeProcess = None
case None // no seedNodeProcess in progress
}
if (address == selfAddress) { if (address == selfAddress) {
becomeInitialized() becomeInitialized()
@ -410,6 +410,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} }
} }
def stopSeedNodeProcess(): Unit = {
seedNodeProcess match {
case Some(s)
// manual join, abort current seedNodeProcess
context stop s
seedNodeProcess = None
case None // no seedNodeProcess in progress
}
}
/** /**
* State transition to JOINING - new node joining. * State transition to JOINING - new node joining.
* Received `Join` message and replies with `Welcome` message, containing * Received `Join` message and replies with `Welcome` message, containing
@ -456,6 +466,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} }
} }
/**
* Another node is joining when this node is uninitialized.
*/
def joiningUninitialized(node: UniqueAddress, roles: Set[String]): Unit = {
require(latestGossip.members.isEmpty, "Joining an uninitialized node can only be done from empty state")
joining(node, roles)
if (latestGossip.hasMember(selfUniqueAddress))
becomeInitialized()
}
/** /**
* Reply from Join request. * Reply from Join request.
*/ */

View file

@ -34,7 +34,16 @@ abstract class NodeUpSpec
"A cluster node that is joining another cluster" must { "A cluster node that is joining another cluster" must {
"be moved to UP by the leader after a convergence" taggedAs LongRunningTest in { "be moved to UP by the leader after a convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second) // it should be possible to join an uninitialized node
// test race on purpose
runOn(first) {
cluster.join(second)
}
runOn(second) {
cluster.join(first)
}
awaitMembersUp(2)
enterBarrier("after-1") enterBarrier("after-1")
} }