=clu Accept Welcome message from previous joinSeedNodeProcess #25295 (#25297)

* =clu Accept Welcome message from previous joinSeedNodeProcess #25295
This commit is contained in:
Konrad `ktoso` Malawski 2018-07-03 23:22:20 +09:00 committed by Christopher Batey
parent bc6cb3b1da
commit 29f30a4a78

View file

@ -414,13 +414,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
joinSeedNodes(newSeedNodes) joinSeedNodes(newSeedNodes)
case msg: SubscriptionMessage case msg: SubscriptionMessage
publisher forward msg publisher forward msg
case Welcome(from, gossip)
welcome(from.address, from, gossip)
case _: Tick case _: Tick
if (joinSeedNodesDeadline.exists(_.isOverdue)) if (joinSeedNodesDeadline.exists(_.isOverdue))
joinSeedNodesWasUnsuccessful() joinSeedNodesWasUnsuccessful()
}: Actor.Receive).orElse(receiveExitingCompleted) }: Actor.Receive).orElse(receiveExitingCompleted)
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({ def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({
case Welcome(from, gossip) welcome(joinWith, from, gossip) case Welcome(from, gossip)
welcome(joinWith, from, gossip)
case InitJoin case InitJoin
logInfo("Received InitJoin message from [{}], but this node is not a member yet", sender()) logInfo("Received InitJoin message from [{}], but this node is not a member yet", sender())
sender() ! InitJoinNack(selfAddress) sender() ! InitJoinNack(selfAddress)
@ -499,10 +502,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case QuarantinedEvent(address, uid) quarantined(UniqueAddress(address, uid)) case QuarantinedEvent(address, uid) quarantined(UniqueAddress(address, uid))
case ClusterUserAction.JoinTo(address) case ClusterUserAction.JoinTo(address)
logInfo("Trying to join [{}] when already part of a cluster, ignoring", address) logInfo("Trying to join [{}] when already part of a cluster, ignoring", address)
case JoinSeedNodes(seedNodes) case JoinSeedNodes(nodes)
logInfo( logInfo(
"Trying to join seed nodes [{}] when already part of a cluster, ignoring", "Trying to join seed nodes [{}] when already part of a cluster, ignoring",
seedNodes.mkString(", ")) nodes.mkString(", "))
case ExitingConfirmed(address) receiveExitingConfirmed(address) case ExitingConfirmed(address) receiveExitingConfirmed(address)
}: Actor.Receive).orElse(receiveExitingCompleted) }: Actor.Receive).orElse(receiveExitingCompleted)
@ -560,6 +563,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = { def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = {
if (newSeedNodes.nonEmpty) { if (newSeedNodes.nonEmpty) {
stopSeedNodeProcess() stopSeedNodeProcess()
seedNodes = newSeedNodes // keep them for retry seedNodes = newSeedNodes // keep them for retry
seedNodeProcess = seedNodeProcess =
if (newSeedNodes == immutable.IndexedSeq(selfAddress)) { if (newSeedNodes == immutable.IndexedSeq(selfAddress)) {
@ -692,7 +696,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
} }
/** /**
* Reply from Join request. * Accept reply from Join request.
*/ */
def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = { def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = {
require(latestGossip.members.isEmpty, "Join can only be done from empty state") require(latestGossip.members.isEmpty, "Join can only be done from empty state")