Merge pull request #23551 from akka/wip-23502-join-timeout-patriknw
Add timeout to abort joining of seed nodes, #23502
This commit is contained in:
commit
4f8856f108
7 changed files with 146 additions and 24 deletions
|
|
@ -310,6 +310,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
var seedNodes = SeedNodes
|
||||
var seedNodeProcess: Option[ActorRef] = None
|
||||
var seedNodeProcessCounter = 0 // for unique names
|
||||
var joinSeedNodesDeadline: Option[Deadline] = None
|
||||
var leaderActionCounter = 0
|
||||
|
||||
var exitingTasksInProgress = false
|
||||
|
|
@ -393,9 +394,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
case InitJoin ⇒
|
||||
logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender())
|
||||
sender() ! InitJoinNack(selfAddress)
|
||||
case ClusterUserAction.JoinTo(address) ⇒ join(address)
|
||||
case JoinSeedNodes(newSeedNodes) ⇒ joinSeedNodes(newSeedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
case ClusterUserAction.JoinTo(address) ⇒
|
||||
join(address)
|
||||
case JoinSeedNodes(newSeedNodes) ⇒
|
||||
resetJoinSeedNodesDeadline()
|
||||
joinSeedNodes(newSeedNodes)
|
||||
case msg: SubscriptionMessage ⇒
|
||||
publisher forward msg
|
||||
case _: Tick ⇒
|
||||
if (joinSeedNodesDeadline.exists(_.isOverdue))
|
||||
joinSeedNodesWasUnsuccessful()
|
||||
}: Actor.Receive).orElse(receiveExitingCompleted)
|
||||
|
||||
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({
|
||||
|
|
@ -407,11 +415,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
becomeUninitialized()
|
||||
join(address)
|
||||
case JoinSeedNodes(newSeedNodes) ⇒
|
||||
resetJoinSeedNodesDeadline()
|
||||
becomeUninitialized()
|
||||
joinSeedNodes(newSeedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
case _: Tick ⇒
|
||||
if (deadline.exists(_.isOverdue)) {
|
||||
if (joinSeedNodesDeadline.exists(_.isOverdue))
|
||||
joinSeedNodesWasUnsuccessful()
|
||||
else if (deadline.exists(_.isOverdue)) {
|
||||
// join attempt failed, retry
|
||||
becomeUninitialized()
|
||||
if (seedNodes.nonEmpty) joinSeedNodes(seedNodes)
|
||||
|
|
@ -419,6 +430,22 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
}: Actor.Receive).orElse(receiveExitingCompleted)
|
||||
|
||||
private def resetJoinSeedNodesDeadline(): Unit = {
|
||||
joinSeedNodesDeadline = ShutdownAfterUnsuccessfulJoinSeedNodes match {
|
||||
case d: FiniteDuration ⇒ Some(Deadline.now + d)
|
||||
case _ ⇒ None // off
|
||||
}
|
||||
}
|
||||
|
||||
private def joinSeedNodesWasUnsuccessful(): Unit = {
|
||||
log.warning(
|
||||
"Joining of seed-nodes [{}] was unsuccessful after configured " +
|
||||
"shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.",
|
||||
seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes)
|
||||
joinSeedNodesDeadline = None
|
||||
CoordinatedShutdown(context.system).run()
|
||||
}
|
||||
|
||||
def becomeUninitialized(): Unit = {
|
||||
// make sure that join process is stopped
|
||||
stopSeedNodeProcess()
|
||||
|
|
@ -436,6 +463,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
// make sure that join process is stopped
|
||||
stopSeedNodeProcess()
|
||||
joinSeedNodesDeadline = None
|
||||
context.become(initialized)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue