Added log messages when leadership is gained or lost (#25053)
This commit is contained in:
parent
d03f21a35a
commit
fceca07ec0
2 changed files with 66 additions and 0 deletions
|
|
@ -313,6 +313,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
cluster.settings.SelfDataCenter,
|
||||
cluster.settings.MultiDataCenter.CrossDcConnections)
|
||||
|
||||
var isCurrentlyLeader = false
|
||||
|
||||
def latestGossip: Gossip = membershipState.latestGossip
|
||||
|
||||
val statsEnabled = PublishStatsInterval.isFinite
|
||||
|
|
@ -1020,6 +1022,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
def leaderActions(): Unit = {
|
||||
if (membershipState.isLeader(selfUniqueAddress)) {
|
||||
// only run the leader actions if we are the LEADER of the data center
|
||||
if (!isCurrentlyLeader) {
|
||||
logInfo("Cluster Node [{}] dc [{}] is the new leader", selfAddress, cluster.settings.SelfDataCenter)
|
||||
isCurrentlyLeader = true
|
||||
}
|
||||
val firstNotice = 20
|
||||
val periodicNotice = 60
|
||||
if (membershipState.convergence(exitingConfirmed)) {
|
||||
|
|
@ -1041,6 +1047,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}"
|
||||
}.mkString(", "))
|
||||
}
|
||||
} else if (isCurrentlyLeader) {
|
||||
logInfo("Cluster Node [{}] dc [{}] is no longer the leader", selfAddress, cluster.settings.SelfDataCenter)
|
||||
isCurrentlyLeader = false
|
||||
}
|
||||
cleanupExitingConfirmed()
|
||||
shutdownSelfWhenDown()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import akka.actor.{ Address, ExtendedActorSystem }
|
||||
import akka.cluster.InternalClusterAction.LeaderActionsTick
|
||||
import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender }
|
||||
|
||||
object ClusterLogSpec {
|
||||
val config = """
|
||||
akka.cluster {
|
||||
auto-down-unreachable-after = 0s
|
||||
publish-stats-interval = 0 s # always, when it happens
|
||||
failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet
|
||||
}
|
||||
akka.actor.provider = "cluster"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.remote.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.loglevel = "INFO"
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
"""
|
||||
|
||||
}
|
||||
|
||||
class ClusterLogSpec extends AkkaSpec(ClusterLogSpec.config) with ImplicitSender {
|
||||
|
||||
val selfAddress: Address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||
|
||||
val cluster = Cluster(system)
|
||||
def clusterView: ClusterReadView = cluster.readView
|
||||
|
||||
"A Cluster" must {
|
||||
|
||||
"Log a message when becoming and stopping being a leader" in {
|
||||
EventFilter
|
||||
.info(occurrences = 1, pattern = "is the new leader")
|
||||
.intercept {
|
||||
cluster.join(selfAddress)
|
||||
}
|
||||
|
||||
awaitCond(clusterView.isSingletonCluster)
|
||||
clusterView.self.address should ===(selfAddress)
|
||||
clusterView.members.map(_.address) should ===(Set(selfAddress))
|
||||
awaitAssert(clusterView.status should ===(MemberStatus.Up))
|
||||
|
||||
EventFilter
|
||||
.info(occurrences = 1, pattern = "is no longer the leader")
|
||||
.intercept {
|
||||
cluster.down(selfAddress)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue