Add timeout to abort joining of seed nodes, #23502

This commit is contained in:
Patrik Nordwall 2017-08-21 10:49:56 +02:00
parent d07df2b917
commit 5cf698a2f6
7 changed files with 146 additions and 24 deletions

View file

@ -293,6 +293,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
var seedNodes = SeedNodes
var seedNodeProcess: Option[ActorRef] = None
var seedNodeProcessCounter = 0 // for unique names
var joinSeedNodesDeadline: Option[Deadline] = None
var leaderActionCounter = 0
var exitingTasksInProgress = false
@ -376,9 +377,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case InitJoin
logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender())
sender() ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address) join(address)
case JoinSeedNodes(newSeedNodes) joinSeedNodes(newSeedNodes)
case msg: SubscriptionMessage publisher forward msg
case ClusterUserAction.JoinTo(address)
join(address)
case JoinSeedNodes(newSeedNodes)
resetJoinSeedNodesDeadline()
joinSeedNodes(newSeedNodes)
case msg: SubscriptionMessage
publisher forward msg
case _: Tick
if (joinSeedNodesDeadline.exists(_.isOverdue))
joinSeedNodesWasUnsuccessful()
}: Actor.Receive).orElse(receiveExitingCompleted)
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({
@ -390,11 +398,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
becomeUninitialized()
join(address)
case JoinSeedNodes(newSeedNodes)
resetJoinSeedNodesDeadline()
becomeUninitialized()
joinSeedNodes(newSeedNodes)
case msg: SubscriptionMessage publisher forward msg
case _: Tick
if (deadline.exists(_.isOverdue)) {
if (joinSeedNodesDeadline.exists(_.isOverdue))
joinSeedNodesWasUnsuccessful()
else if (deadline.exists(_.isOverdue)) {
// join attempt failed, retry
becomeUninitialized()
if (seedNodes.nonEmpty) joinSeedNodes(seedNodes)
@ -402,6 +413,22 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}: Actor.Receive).orElse(receiveExitingCompleted)
private def resetJoinSeedNodesDeadline(): Unit = {
joinSeedNodesDeadline = ShutdownAfterUnsuccessfulJoinSeedNodes match {
case d: FiniteDuration Some(Deadline.now + d)
case _ None // off
}
}
private def joinSeedNodesWasUnsuccessful(): Unit = {
log.warning(
"Joining of seed-nodes [{}] was unsuccessful after configured " +
"shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.",
seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes)
joinSeedNodesDeadline = None
CoordinatedShutdown(context.system).run()
}
def becomeUninitialized(): Unit = {
// make sure that join process is stopped
stopSeedNodeProcess()
@ -415,6 +442,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
withDispatcher(UseDispatcher), name = "heartbeatSender")
// make sure that join process is stopped
stopSeedNodeProcess()
joinSeedNodesDeadline = None
context.become(initialized)
}