=clu #17362 Make cluster.joinSeedNodes equivalent to conf seed-nodes
* the difference was in the retry of failed join attempt * also clarify the documentation
This commit is contained in:
parent
c68ebc6d5a
commit
aaa620c35e
3 changed files with 33 additions and 24 deletions
|
|
@ -239,6 +239,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
val statsEnabled = PublishStatsInterval.isFinite
|
||||
var gossipStats = GossipStats()
|
||||
|
||||
var seedNodes = SeedNodes
|
||||
var seedNodeProcess: Option[ActorRef] = None
|
||||
var seedNodeProcessCounter = 0 // for unique names
|
||||
var leaderActionCounter = 0
|
||||
|
|
@ -279,10 +280,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
case _ ⇒ // auto-down is disabled
|
||||
}
|
||||
|
||||
if (SeedNodes.isEmpty)
|
||||
if (seedNodes.isEmpty)
|
||||
logInfo("No seed-nodes configured, manual cluster join required")
|
||||
else
|
||||
self ! JoinSeedNodes(SeedNodes)
|
||||
self ! JoinSeedNodes(seedNodes)
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
|
|
@ -296,7 +297,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
def uninitialized: Actor.Receive = {
|
||||
case InitJoin ⇒ sender() ! InitJoinNack(selfAddress)
|
||||
case ClusterUserAction.JoinTo(address) ⇒ join(address)
|
||||
case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes)
|
||||
case JoinSeedNodes(newSeedNodes) ⇒ joinSeedNodes(newSeedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
}
|
||||
|
||||
|
|
@ -306,14 +307,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
case ClusterUserAction.JoinTo(address) ⇒
|
||||
becomeUninitialized()
|
||||
join(address)
|
||||
case JoinSeedNodes(seedNodes) ⇒
|
||||
case JoinSeedNodes(newSeedNodes) ⇒
|
||||
becomeUninitialized()
|
||||
joinSeedNodes(seedNodes)
|
||||
joinSeedNodes(newSeedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
case _: Tick ⇒
|
||||
if (deadline.exists(_.isOverdue)) {
|
||||
// join attempt failed, retry
|
||||
becomeUninitialized()
|
||||
if (SeedNodes.nonEmpty) joinSeedNodes(SeedNodes)
|
||||
if (seedNodes.nonEmpty) joinSeedNodes(seedNodes)
|
||||
else join(joinWith)
|
||||
}
|
||||
}
|
||||
|
|
@ -371,21 +373,22 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
def initJoin(): Unit = sender() ! InitJoinAck(selfAddress)
|
||||
|
||||
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
|
||||
if (seedNodes.nonEmpty) {
|
||||
def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = {
|
||||
if (newSeedNodes.nonEmpty) {
|
||||
stopSeedNodeProcess()
|
||||
seedNodes = newSeedNodes // keep them for retry
|
||||
seedNodeProcess =
|
||||
if (seedNodes == immutable.IndexedSeq(selfAddress)) {
|
||||
if (newSeedNodes == immutable.IndexedSeq(selfAddress)) {
|
||||
self ! ClusterUserAction.JoinTo(selfAddress)
|
||||
None
|
||||
} else {
|
||||
// use unique name of this actor, stopSeedNodeProcess doesn't wait for termination
|
||||
seedNodeProcessCounter += 1
|
||||
if (seedNodes.head == selfAddress) {
|
||||
Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], seedNodes).
|
||||
if (newSeedNodes.head == selfAddress) {
|
||||
Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], newSeedNodes).
|
||||
withDispatcher(UseDispatcher), name = "firstSeedNodeProcess-" + seedNodeProcessCounter))
|
||||
} else {
|
||||
Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], seedNodes).
|
||||
Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], newSeedNodes).
|
||||
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess-" + seedNodeProcessCounter))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue