=cls add logging info on seed node joining (#22724)
* =cls add logging info on seed node joining * adjust message
This commit is contained in:
parent
bfdde013e1
commit
ef76af7add
1 changed files with 25 additions and 12 deletions
|
|
@ -373,7 +373,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
|
||||
def uninitialized: Actor.Receive = ({
|
||||
case InitJoin ⇒ sender() ! InitJoinNack(selfAddress)
|
||||
case InitJoin ⇒
|
||||
logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender())
|
||||
sender() ! InitJoinNack(selfAddress)
|
||||
case ClusterUserAction.JoinTo(address) ⇒ join(address)
|
||||
case JoinSeedNodes(newSeedNodes) ⇒ joinSeedNodes(newSeedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
|
|
@ -381,7 +383,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({
|
||||
case Welcome(from, gossip) ⇒ welcome(joinWith, from, gossip)
|
||||
case InitJoin ⇒ sender() ! InitJoinNack(selfAddress)
|
||||
case InitJoin ⇒
|
||||
logInfo("Received InitJoin message from [{}], but this node is not a member yet", sender())
|
||||
sender() ! InitJoinNack(selfAddress)
|
||||
case ClusterUserAction.JoinTo(address) ⇒
|
||||
becomeUninitialized()
|
||||
join(address)
|
||||
|
|
@ -415,14 +419,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
|
||||
def initialized: Actor.Receive = ({
|
||||
case msg: GossipEnvelope ⇒ receiveGossip(msg)
|
||||
case msg: GossipStatus ⇒ receiveGossipStatus(msg)
|
||||
case GossipTick ⇒ gossipTick()
|
||||
case GossipSpeedupTick ⇒ gossipSpeedupTick()
|
||||
case ReapUnreachableTick ⇒ reapUnreachableMembers()
|
||||
case LeaderActionsTick ⇒ leaderActions()
|
||||
case PublishStatsTick ⇒ publishInternalStats()
|
||||
case InitJoin ⇒ initJoin()
|
||||
case msg: GossipEnvelope ⇒ receiveGossip(msg)
|
||||
case msg: GossipStatus ⇒ receiveGossipStatus(msg)
|
||||
case GossipTick ⇒ gossipTick()
|
||||
case GossipSpeedupTick ⇒ gossipSpeedupTick()
|
||||
case ReapUnreachableTick ⇒ reapUnreachableMembers()
|
||||
case LeaderActionsTick ⇒ leaderActions()
|
||||
case PublishStatsTick ⇒ publishInternalStats()
|
||||
case InitJoin ⇒
|
||||
logInfo("Received InitJoin message from [{}] to [{}]", sender(), selfAddress)
|
||||
initJoin()
|
||||
case Join(node, roles) ⇒ joining(node, roles)
|
||||
case ClusterUserAction.Down(address) ⇒ downing(address)
|
||||
case ClusterUserAction.Leave(address) ⇒ leaving(address)
|
||||
|
|
@ -456,11 +462,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
def initJoin(): Unit = {
|
||||
val selfStatus = latestGossip.member(selfUniqueAddress).status
|
||||
if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus))
|
||||
if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus)) {
|
||||
// prevents a Down and Exiting node from being used for joining
|
||||
logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender())
|
||||
sender() ! InitJoinNack(selfAddress)
|
||||
else
|
||||
} else {
|
||||
logInfo("Sending InitJoinAck message from node [{}] to [{}]", selfAddress, sender())
|
||||
sender() ! InitJoinAck(selfAddress)
|
||||
}
|
||||
}
|
||||
|
||||
def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = {
|
||||
|
|
@ -1312,6 +1321,8 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
|
|||
import ClusterUserAction.JoinTo
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.InfoLogger._
|
||||
|
||||
def selfAddress = cluster.selfAddress
|
||||
|
||||
if (seedNodes.size <= 1 || seedNodes.head != selfAddress)
|
||||
|
|
@ -1343,10 +1354,12 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
|
|||
context.stop(self)
|
||||
}
|
||||
case InitJoinAck(address) ⇒
|
||||
logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
|
||||
// first InitJoinAck reply, join existing cluster
|
||||
context.parent ! JoinTo(address)
|
||||
context.stop(self)
|
||||
case InitJoinNack(address) ⇒
|
||||
logInfo("Received InitJoinNack message from [{}] to [{}]", sender(), selfAddress)
|
||||
remainingSeedNodes -= address
|
||||
if (remainingSeedNodes.isEmpty) {
|
||||
// initialize new cluster by joining myself when nacks from all other seed nodes
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue