Support restart of first seed node, see #2854
* Try to first join other seed nodes before joining itself
This commit is contained in:
parent
5e54ddaa67
commit
679c4d313d
4 changed files with 278 additions and 51 deletions
|
|
@ -14,6 +14,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props,
|
|||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.Status.Failure
|
||||
import akka.actor.SupervisorStrategy.Stop
|
||||
import akka.actor.Terminated
|
||||
import akka.event.EventStream
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
|
|
@ -72,6 +73,8 @@ private[cluster] object InternalClusterAction {
|
|||
* The node sends `InitJoin` to all seed nodes, which replies
|
||||
* with `InitJoinAck`. The first reply is used others are discarded.
|
||||
* The node sends `Join` command to the seed node that replied first.
|
||||
* If a node is uninitialized it will reply to `InitJoin` with
|
||||
* `InitJoinNack`.
|
||||
*/
|
||||
case object JoinSeedNode extends ClusterMessage
|
||||
|
||||
|
|
@ -85,6 +88,11 @@ private[cluster] object InternalClusterAction {
|
|||
*/
|
||||
case class InitJoinAck(address: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* @see JoinSeedNode
|
||||
*/
|
||||
case class InitJoinNack(address: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* Marker interface for periodic tick messages
|
||||
*/
|
||||
|
|
@ -229,6 +237,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
|
||||
var stats = ClusterStats()
|
||||
|
||||
var seedNodeProcess: Option[ActorRef] = None
|
||||
|
||||
/**
|
||||
* Looks up and returns the remote cluster command connection for the specific address.
|
||||
*/
|
||||
|
|
@ -270,7 +280,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
}
|
||||
|
||||
def uninitialized: Actor.Receive = {
|
||||
case InitJoin ⇒ // skip, not ready yet
|
||||
case InitJoin ⇒ sender ! InitJoinNack(selfAddress)
|
||||
case JoinTo(address) ⇒ join(address)
|
||||
case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
|
|
@ -306,12 +316,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
|
||||
|
||||
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
|
||||
// only the node which is named first in the list of seed nodes will join itself
|
||||
if (seedNodes.isEmpty || seedNodes.head == selfAddress)
|
||||
self ! JoinTo(selfAddress)
|
||||
else
|
||||
context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)).
|
||||
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
|
||||
require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress")
|
||||
seedNodeProcess =
|
||||
if (seedNodes.isEmpty || seedNodes == immutable.IndexedSeq(selfAddress)) {
|
||||
self ! JoinTo(selfAddress)
|
||||
None
|
||||
} else if (seedNodes.head == selfAddress) {
|
||||
Some(context.actorOf(Props(new FirstSeedNodeProcess(seedNodes)).
|
||||
withDispatcher(UseDispatcher), name = "firstSeedNodeProcess"))
|
||||
} else {
|
||||
Some(context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)).
|
||||
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess"))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -320,12 +336,26 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
*/
|
||||
def join(address: Address): Unit = {
|
||||
if (address.protocol != selfAddress.protocol)
|
||||
log.info("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
log.warning("Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.protocol, address.protocol)
|
||||
else if (address.system != selfAddress.system)
|
||||
log.info("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
log.warning("Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.system, address.system)
|
||||
else if (!latestGossip.members.exists(_.address == address)) {
|
||||
|
||||
// to support manual join when joining to seed nodes is stuck (no seed nodes available)
|
||||
val snd = sender
|
||||
seedNodeProcess match {
|
||||
case Some(`snd`) ⇒
|
||||
// seedNodeProcess completed, it will stop itself
|
||||
seedNodeProcess = None
|
||||
case Some(s) ⇒
|
||||
// manual join, abort current seedNodeProcess
|
||||
context stop s
|
||||
seedNodeProcess = None
|
||||
case None ⇒ // no seedNodeProcess in progress
|
||||
}
|
||||
|
||||
// wipe our state since a node that joins a cluster must be empty
|
||||
latestGossip = Gossip.empty
|
||||
// wipe the failure detector since we are starting fresh and shouldn't care about the past
|
||||
|
|
@ -347,37 +377,45 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
* State transition to JOINING - new node joining.
|
||||
*/
|
||||
def joining(node: Address): Unit = {
|
||||
val localMembers = latestGossip.members
|
||||
val localUnreachable = latestGossip.overview.unreachable
|
||||
if (node.protocol != selfAddress.protocol)
|
||||
log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.protocol, node.protocol)
|
||||
else if (node.system != selfAddress.system)
|
||||
log.warning("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.system, node.system)
|
||||
else {
|
||||
val localMembers = latestGossip.members
|
||||
val localUnreachable = latestGossip.overview.unreachable
|
||||
|
||||
val alreadyMember = localMembers.exists(_.address == node)
|
||||
val isUnreachable = latestGossip.overview.isNonDownUnreachable(node)
|
||||
val alreadyMember = localMembers.exists(_.address == node)
|
||||
val isUnreachable = latestGossip.overview.isNonDownUnreachable(node)
|
||||
|
||||
if (!alreadyMember && !isUnreachable) {
|
||||
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
|
||||
val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node }
|
||||
val newOverview = latestGossip.overview copy (unreachable = newUnreachableMembers)
|
||||
if (!alreadyMember && !isUnreachable) {
|
||||
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
|
||||
val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node }
|
||||
val newOverview = latestGossip.overview copy (unreachable = newUnreachableMembers)
|
||||
|
||||
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster
|
||||
if (rejoiningMember.nonEmpty) failureDetector.remove(node)
|
||||
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster
|
||||
if (rejoiningMember.nonEmpty) failureDetector.remove(node)
|
||||
|
||||
// add joining node as Joining
|
||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
|
||||
val newGossip = latestGossip copy (overview = newOverview, members = newMembers)
|
||||
// add joining node as Joining
|
||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
|
||||
val newGossip = latestGossip copy (overview = newOverview, members = newMembers)
|
||||
|
||||
val versionedGossip = newGossip :+ vclockNode
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
val versionedGossip = newGossip :+ vclockNode
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
|
||||
latestGossip = seenVersionedGossip
|
||||
latestGossip = seenVersionedGossip
|
||||
|
||||
log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
|
||||
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
|
||||
if (node != selfAddress) {
|
||||
gossipTo(node)
|
||||
log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
|
||||
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
|
||||
if (node != selfAddress) {
|
||||
gossipTo(node)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -847,7 +885,64 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Sends InitJoinAck to all seed nodes (except itself) and expect
|
||||
* Used only for the first seed node.
|
||||
* Sends InitJoin to all seed nodes (except itself).
|
||||
* If other seed nodes are not part of the cluster yet they will reply with
|
||||
* InitJoinNack or not respond at all and then the first seed node
|
||||
* will join itself to initialize the new cluster. When the first
|
||||
* seed node is restarted, and some other seed node is part of the cluster
|
||||
* it will reply with InitJoinAck and then the first seed node will join
|
||||
* that other seed node to join existing cluster.
|
||||
*/
|
||||
private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging {
|
||||
import InternalClusterAction._
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
def selfAddress = cluster.selfAddress
|
||||
|
||||
if (seedNodes.size <= 1 || seedNodes.head != selfAddress)
|
||||
throw new IllegalArgumentException("Join seed node should not be done")
|
||||
|
||||
val timeout = Deadline.now + cluster.settings.SeedNodeTimeout
|
||||
|
||||
var remainingSeedNodes = seedNodes.toSet - selfAddress
|
||||
|
||||
// retry until one ack, or all nack, or timeout
|
||||
import context.dispatcher
|
||||
val retryTask = cluster.scheduler.schedule(1.second, 1.second, self, JoinSeedNode)
|
||||
self ! JoinSeedNode
|
||||
|
||||
override def postStop(): Unit = retryTask.cancel()
|
||||
|
||||
def receive = {
|
||||
case JoinSeedNode ⇒
|
||||
if (timeout.hasTimeLeft) {
|
||||
// send InitJoin to remaining seed nodes (except myself)
|
||||
remainingSeedNodes foreach { a ⇒ context.actorFor(context.parent.path.toStringWithAddress(a)) ! InitJoin }
|
||||
} else {
|
||||
// no InitJoinAck received, initialize new cluster by joining myself
|
||||
context.parent ! JoinTo(selfAddress)
|
||||
context.stop(self)
|
||||
}
|
||||
case InitJoinAck(address) ⇒
|
||||
// first InitJoinAck reply, join existing cluster
|
||||
context.parent ! JoinTo(address)
|
||||
context.stop(self)
|
||||
case InitJoinNack(address) ⇒
|
||||
remainingSeedNodes -= address
|
||||
if (remainingSeedNodes.isEmpty) {
|
||||
// initialize new cluster by joining myself when nacks from all other seed nodes
|
||||
context.parent ! JoinTo(selfAddress)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Sends InitJoin to all seed nodes (except itself) and expect
|
||||
* InitJoinAck reply back. The seed node that replied first
|
||||
* will be used, joined to. InitJoinAck replies received after the
|
||||
* first one are ignored.
|
||||
|
|
@ -890,6 +985,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
|
|||
// first InitJoinAck reply
|
||||
context.parent ! JoinTo(address)
|
||||
context.become(done)
|
||||
case InitJoinNack(_) ⇒ // that seed was uninitialized
|
||||
case ReceiveTimeout ⇒
|
||||
// no InitJoinAck received, try again
|
||||
self ! JoinSeedNode
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue