diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 209d6c335f..fe4b5048af 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -313,6 +313,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh cluster.settings.SelfDataCenter, cluster.settings.MultiDataCenter.CrossDcConnections) + var isCurrentlyLeader = false + def latestGossip: Gossip = membershipState.latestGossip val statsEnabled = PublishStatsInterval.isFinite @@ -1020,6 +1022,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def leaderActions(): Unit = { if (membershipState.isLeader(selfUniqueAddress)) { // only run the leader actions if we are the LEADER of the data center + if (!isCurrentlyLeader) { + logInfo("Cluster Node [{}] dc [{}] is the new leader", selfAddress, cluster.settings.SelfDataCenter) + isCurrentlyLeader = true + } val firstNotice = 20 val periodicNotice = 60 if (membershipState.convergence(exitingConfirmed)) { @@ -1041,6 +1047,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}" }.mkString(", ")) } + } else if (isCurrentlyLeader) { + logInfo("Cluster Node [{}] dc [{}] is no longer the leader", selfAddress, cluster.settings.SelfDataCenter) + isCurrentlyLeader = false } cleanupExitingConfirmed() shutdownSelfWhenDown() diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterLogSpec.scala new file mode 100644 index 0000000000..26eb2a4885 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterLogSpec.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.cluster + +import akka.actor.{ Address, ExtendedActorSystem } +import akka.cluster.InternalClusterAction.LeaderActionsTick +import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender } + +object ClusterLogSpec { + val config = """ + akka.cluster { + auto-down-unreachable-after = 0s + publish-stats-interval = 0 s # always, when it happens + failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet + } + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.loglevel = "INFO" + akka.loggers = ["akka.testkit.TestEventListener"] + """ + +} + +class ClusterLogSpec extends AkkaSpec(ClusterLogSpec.config) with ImplicitSender { + + val selfAddress: Address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + + val cluster = Cluster(system) + def clusterView: ClusterReadView = cluster.readView + + "A Cluster" must { + + "Log a message when becoming and stopping being a leader" in { + EventFilter + .info(occurrences = 1, pattern = "is the new leader") + .intercept { + cluster.join(selfAddress) + } + + awaitCond(clusterView.isSingletonCluster) + clusterView.self.address should ===(selfAddress) + clusterView.members.map(_.address) should ===(Set(selfAddress)) + awaitAssert(clusterView.status should ===(MemberStatus.Up)) + + EventFilter + .info(occurrences = 1, pattern = "is no longer the leader") + .intercept { + cluster.down(selfAddress) + } + } + + } +}