Merge pull request #1738 from akka/wip-3612-join-self-patriknw

+clu #3612 Allow join to uninitialized node
This commit is contained in:
Patrik Nordwall 2013-09-29 22:41:15 -07:00
commit d3f295e5fe
2 changed files with 45 additions and 16 deletions

View file

@ -292,6 +292,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case InitJoin sender ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address) join(address)
case JoinSeedNodes(seedNodes) joinSeedNodes(seedNodes)
case Join(node, roles) joiningUninitialized(node, roles)
case msg: SubscriptionMessage publisher forward msg
}
@ -299,25 +300,34 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case Welcome(from, gossip) welcome(joinWith, from, gossip)
case InitJoin sender ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address)
context.become(uninitialized)
becomeUninitialized()
join(address)
case JoinSeedNodes(seedNodes)
context.become(uninitialized)
becomeUninitialized()
joinSeedNodes(seedNodes)
case Join(node, roles) joiningUninitialized(node, roles)
case msg: SubscriptionMessage publisher forward msg
case _: Tick
if (deadline.exists(_.isOverdue)) {
context.become(uninitialized)
becomeUninitialized()
if (SeedNodes.nonEmpty) joinSeedNodes(SeedNodes)
else join(joinWith)
}
}
def becomeUninitialized(): Unit = {
// make sure that join process is stopped
stopSeedNodeProcess()
context.become(uninitialized)
}
def becomeInitialized(): Unit = {
// start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received
context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
// make sure that join process is stopped
stopSeedNodeProcess()
context.become(initialized)
}
@ -358,8 +368,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress")
if (seedNodes.nonEmpty) {
stopSeedNodeProcess()
seedNodeProcess =
if (seedNodes == immutable.IndexedSeq(selfAddress)) {
self ! ClusterUserAction.JoinTo(selfAddress)
@ -391,17 +401,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
require(latestGossip.members.isEmpty, "Join can only be done from empty state")
// to support manual join when joining to seed nodes is stuck (no seed nodes available)
val snd = sender
seedNodeProcess match {
case Some(`snd`)
// seedNodeProcess completed, it will stop itself
seedNodeProcess = None
case Some(s)
// manual join, abort current seedNodeProcess
context stop s
seedNodeProcess = None
case None // no seedNodeProcess in progress
}
stopSeedNodeProcess()
if (address == selfAddress) {
becomeInitialized()
@ -417,6 +417,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}
def stopSeedNodeProcess(): Unit = {
seedNodeProcess match {
case Some(s)
// manual join, abort current seedNodeProcess
context stop s
seedNodeProcess = None
case None // no seedNodeProcess in progress
}
}
/**
* State transition to JOINING - new node joining.
* Received `Join` message and replies with `Welcome` message, containing
@ -463,6 +473,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}
/**
* Another node is joining when this node is uninitialized.
*/
def joiningUninitialized(node: UniqueAddress, roles: Set[String]): Unit = {
require(latestGossip.members.isEmpty, "Joining an uninitialized node can only be done from empty state")
joining(node, roles)
if (latestGossip.hasMember(selfUniqueAddress))
becomeInitialized()
}
/**
* Reply from Join request.
*/