cleanup Cluster logging

* include Cluster Node prefix in same way for all logging
This commit is contained in:
Patrik Nordwall 2019-01-07 13:26:36 +01:00
parent 00b235d9c5
commit 19c9fbc355
8 changed files with 180 additions and 76 deletions

View file

@ -130,7 +130,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
import context.dispatcher import context.dispatcher
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler } import cluster.{ selfAddress, scheduler }
import cluster.InfoLogger._ import cluster.ClusterLogger._
val metrics = ClusterMetricsExtension(context.system) val metrics = ClusterMetricsExtension(context.system)
import metrics.settings._ import metrics.settings._

View file

@ -2,3 +2,19 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterDaemon.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterDaemon.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterDaemon.this") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterDaemon.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterRemoteWatcher#DelayedQuarantine.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterRemoteWatcher#DelayedQuarantine.this")
# cleanup Cluster logging (internals)
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.JoinSeedNodeProcess")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.JoinSeedNodeProcess.log")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.OnMemberStatusChangedListener")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.OnMemberStatusChangedListener.log")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Cluster.InfoLogger")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterCoreSupervisor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreSupervisor.log")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.FirstSeedNodeProcess")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.FirstSeedNodeProcess.log")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterDaemon")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterDaemon.log")
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.Cluster$InfoLogger$")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterCoreDaemon")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.log")

View file

@ -56,7 +56,7 @@ private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration)
extends AutoDownBase(autoDownUnreachableAfter) with ActorLogging { extends AutoDownBase(autoDownUnreachableAfter) with ActorLogging {
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.InfoLogger._ import cluster.ClusterLogger._
override def selfAddress = cluster.selfAddress override def selfAddress = cluster.selfAddress

View file

@ -18,13 +18,14 @@ import akka.japi.Util
import akka.pattern._ import akka.pattern._
import akka.remote.{ UniqueAddress _, _ } import akka.remote.{ UniqueAddress _, _ }
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.varargs import scala.annotation.varargs
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext } import scala.concurrent.{ Await, ExecutionContext }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.event.Logging.LogLevel
/** /**
* Cluster Extension Id and factory for creating Cluster extension. * Cluster Extension Id and factory for creating Cluster extension.
*/ */
@ -59,7 +60,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
import ClusterEvent._ import ClusterEvent._
val settings = new ClusterSettings(system.settings.config, system.name) val settings = new ClusterSettings(system.settings.config, system.name)
import InfoLogger._ import ClusterLogger._
import settings._ import settings._
private val joinConfigCompatChecker: JoinConfigCompatChecker = JoinConfigCompatChecker.load(system, settings) private val joinConfigCompatChecker: JoinConfigCompatChecker = JoinConfigCompatChecker.load(system, settings)
@ -437,35 +438,118 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[cluster] object InfoLogger { private[cluster] object ClusterLogger {
def isDebugEnabled: Boolean =
log.isDebugEnabled
def logDebug(message: String): Unit =
logAtLevel(Logging.DebugLevel, message)
def logDebug(template: String, arg1: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1)
def logDebug(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2)
def logDebug(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3)
def logInfo(message: String): Unit = def logInfo(message: String): Unit =
if (LogInfo) logAtLevel(Logging.InfoLevel, message)
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - {}", selfAddress, message)
else
log.info("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
def logInfo(template: String, arg1: Any): Unit = def logInfo(template: String, arg1: Any): Unit =
if (LogInfo) logAtLevel(Logging.InfoLevel, template, arg1)
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
def logInfo(template: String, arg1: Any, arg2: Any): Unit = def logInfo(template: String, arg1: Any, arg2: Any): Unit =
if (LogInfo) logAtLevel(Logging.InfoLevel, template, arg1, arg2)
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (LogInfo) logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3)
def logWarning(message: String): Unit =
logAtLevel(Logging.WarningLevel, message)
def logWarning(template: String, arg1: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1)
def logWarning(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1, arg2)
def logWarning(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1, arg2, arg3)
def logError(message: String): Unit =
logAtLevel(Logging.ErrorLevel, message)
def logError(template: String, arg1: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1)
def logError(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1, arg2)
def logError(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1, arg2, arg3)
def logError(cause: Throwable, message: String): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - {}", selfAddress, message)
else
log.error(cause, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
}
def logError(cause: Throwable, template: String, arg1: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
}
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
}
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
else
log.error(cause, "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3)
}
private def logAtLevel(logLevel: LogLevel, message: String): Unit = {
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3) log.log(logLevel, "Cluster Node [{}] - {}", selfAddress, message)
else else
log.info("Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3) log.log(logLevel, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
}
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any): Unit = {
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
}
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any): Unit =
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
else
log.log(logLevel, "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3)
private def isLevelEnabled(logLevel: LogLevel): Boolean =
LogInfo || logLevel < Logging.InfoLevel
} }
} }

View file

@ -179,7 +179,7 @@ private[cluster] object InternalClusterAction {
* Supervisor managing the different Cluster daemons. * Supervisor managing the different Cluster daemons.
*/ */
@InternalApi @InternalApi
private[cluster] final class ClusterDaemon(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging private[cluster] final class ClusterDaemon(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor
with RequiresMessageQueue[UnboundedMessageQueueSemantics] { with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._ import InternalClusterAction._
// Important - don't use Cluster(context.system) in constructor because that would // Important - don't use Cluster(context.system) in constructor because that would
@ -243,7 +243,7 @@ private[cluster] final class ClusterDaemon(joinConfigCompatChecker: JoinConfigCo
* would be obsolete. Shutdown the member if any those actors crashed. * would be obsolete. Shutdown the member if any those actors crashed.
*/ */
@InternalApi @InternalApi
private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor
with RequiresMessageQueue[UnboundedMessageQueueSemantics] { with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
// Important - don't use Cluster(context.system) in constructor because that would // Important - don't use Cluster(context.system) in constructor because that would
@ -263,7 +263,8 @@ private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: Join
override val supervisorStrategy = override val supervisorStrategy =
OneForOneStrategy() { OneForOneStrategy() {
case NonFatal(e) case NonFatal(e)
log.error(e, "Cluster node [{}] crashed, [{}] - shutting down...", Cluster(context.system).selfAddress, e.getMessage) Cluster(context.system).ClusterLogger.logError(
e, "crashed, [{}] - shutting down...", e.getMessage)
self ! PoisonPill self ! PoisonPill
Stop Stop
} }
@ -292,7 +293,7 @@ private[cluster] object ClusterCoreDaemon {
* INTERNAL API. * INTERNAL API.
*/ */
@InternalApi @InternalApi
private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor
with RequiresMessageQueue[UnboundedMessageQueueSemantics] { with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._ import InternalClusterAction._
import ClusterCoreDaemon._ import ClusterCoreDaemon._
@ -301,7 +302,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector } import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector }
import cluster.settings._ import cluster.settings._
import cluster.InfoLogger._ import cluster.ClusterLogger._
val selfDc = cluster.selfDataCenter val selfDc = cluster.selfDataCenter
@ -461,7 +462,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
} }
private def joinSeedNodesWasUnsuccessful(): Unit = { private def joinSeedNodesWasUnsuccessful(): Unit = {
log.warning( logWarning(
"Joining of seed-nodes [{}] was unsuccessful after configured " + "Joining of seed-nodes [{}] was unsuccessful after configured " +
"shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.", "shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.",
seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes) seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes)
@ -574,11 +575,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
} }
case Invalid(messages) case Invalid(messages)
// messages are only logged on the cluster side // messages are only logged on the cluster side
log.warning( logWarning(
"Found incompatible settings when [{}] tried to join: {}. " + "Found incompatible settings when [{}] tried to join: {}. " +
"Self version [{}], Joining version [{}].", s"Self version [{}], Joining version [$joiningNodeVersion].",
sender().path.address, messages.mkString(", "), sender().path.address, messages.mkString(", "),
context.system.settings.ConfigVersion, joiningNodeVersion) context.system.settings.ConfigVersion)
if (configCheckUnsupportedByJoiningNode) if (configCheckUnsupportedByJoiningNode)
ConfigCheckUnsupportedByJoiningNode ConfigCheckUnsupportedByJoiningNode
else else
@ -621,11 +622,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
*/ */
def join(address: Address): Unit = { def join(address: Address): Unit = {
if (address.protocol != selfAddress.protocol) if (address.protocol != selfAddress.protocol)
log.warning( logWarning(
"Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]", "Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol, address.protocol) selfAddress.protocol, address.protocol)
else if (address.system != selfAddress.system) else if (address.system != selfAddress.system)
log.warning( logWarning(
"Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", "Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]",
selfAddress.system, address.system) selfAddress.system, address.system)
else { else {
@ -666,11 +667,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
def joining(joiningNode: UniqueAddress, roles: Set[String]): Unit = { def joining(joiningNode: UniqueAddress, roles: Set[String]): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status val selfStatus = latestGossip.member(selfUniqueAddress).status
if (joiningNode.address.protocol != selfAddress.protocol) if (joiningNode.address.protocol != selfAddress.protocol)
log.warning( logWarning(
"Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol, joiningNode.address.protocol) selfAddress.protocol, joiningNode.address.protocol)
else if (joiningNode.address.system != selfAddress.system) else if (joiningNode.address.system != selfAddress.system)
log.warning( logWarning(
"Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.system, joiningNode.address.system) selfAddress.system, joiningNode.address.system)
else if (removeUnreachableWithMemberStatus.contains(selfStatus)) else if (removeUnreachableWithMemberStatus.contains(selfStatus))
@ -854,10 +855,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val newOverview = localGossip.overview copy (reachability = newReachability) val newOverview = localGossip.overview copy (reachability = newReachability)
val newGossip = localGossip copy (overview = newOverview) val newGossip = localGossip copy (overview = newOverview)
updateLatestGossip(newGossip) updateLatestGossip(newGossip)
log.warning( logWarning(
"Cluster Node [{}] - Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]. " + "Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]. " +
"It must still be marked as down before it's removed.", "It must still be marked as down before it's removed.",
selfAddress, node.address, selfRoles.mkString(",")) node.address, selfRoles.mkString(","))
publishMembershipState() publishMembershipState()
} }
} }
@ -897,7 +898,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val localGossip = latestGossip val localGossip = latestGossip
if (remoteGossip eq Gossip.empty) { if (remoteGossip eq Gossip.empty) {
log.debug("Cluster Node [{}] - Ignoring received gossip from [{}] to protect against overload", selfAddress, from) logDebug("Ignoring received gossip from [{}] to protect against overload", from)
Ignored Ignored
} else if (envelope.to != selfUniqueAddress) { } else if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
@ -934,14 +935,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
// Removal of member itself is handled in merge (pickHighestPriority) // Removal of member itself is handled in merge (pickHighestPriority)
val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m)
if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) { if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m) logDebug("Pruned conflicting local gossip: {}", m)
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
} else } else
g g
} }
val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m)
if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) { if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m) logDebug("Pruned conflicting remote gossip: {}", m)
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
} else } else
g g
@ -967,10 +968,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
} }
} }
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) logDebug("Receiving gossip from [{}]", selfAddress, from)
if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) { if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) {
log.debug( logDebug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip) remoteGossip, localGossip, winningGossip)
} }
@ -1048,7 +1049,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
gossipTo(peer) gossipTo(peer)
case None // nothing to see here case None // nothing to see here
if (cluster.settings.Debug.VerboseGossipLogging) if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] dc [{}] will not gossip this round", selfAddress, cluster.settings.SelfDataCenter) logDebug("will not gossip this round")
} }
} }
@ -1060,7 +1061,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
if (membershipState.isLeader(selfUniqueAddress)) { if (membershipState.isLeader(selfUniqueAddress)) {
// only run the leader actions if we are the LEADER of the data center // only run the leader actions if we are the LEADER of the data center
if (!isCurrentlyLeader) { if (!isCurrentlyLeader) {
logInfo("Cluster Node [{}] dc [{}] is the new leader", selfAddress, cluster.settings.SelfDataCenter) logInfo("is the new leader among reachable nodes (more leaders may exist)")
isCurrentlyLeader = true isCurrentlyLeader = true
} }
val firstNotice = 20 val firstNotice = 20
@ -1085,7 +1086,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
}.mkString(", ")) }.mkString(", "))
} }
} else if (isCurrentlyLeader) { } else if (isCurrentlyLeader) {
logInfo("Cluster Node [{}] dc [{}] is no longer the leader", selfAddress, cluster.settings.SelfDataCenter) logInfo("is no longer leader")
isCurrentlyLeader = false isCurrentlyLeader = false
} }
cleanupExitingConfirmed() cleanupExitingConfirmed()
@ -1234,10 +1235,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val targets = membershipState.gossipTargetsForExitingMembers(exitingMembers) val targets = membershipState.gossipTargetsForExitingMembers(exitingMembers)
if (targets.nonEmpty) { if (targets.nonEmpty) {
if (log.isDebugEnabled) if (isDebugEnabled)
log.debug( logDebug(
"Cluster Node [{}] - Gossip exiting members [{}] to the two oldest (per role) [{}] (singleton optimization).", "Gossip exiting members [{}] to the two oldest (per role) [{}] (singleton optimization).",
selfAddress, exitingMembers.mkString(", "), targets.mkString(", ")) exitingMembers.mkString(", "), targets.mkString(", "))
targets.foreach(m gossipTo(m.uniqueAddress)) targets.foreach(m gossipTo(m.uniqueAddress))
} }
@ -1316,7 +1317,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting) val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting)
if (nonExiting.nonEmpty) if (nonExiting.nonEmpty)
log.warning("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]. Node roles [{}]", selfAddress, nonExiting.mkString(", "), selfRoles.mkString(", ")) logWarning("Marking node(s) as UNREACHABLE [{}]. Node roles [{}]", nonExiting.mkString(", "), selfRoles.mkString(", "))
if (exiting.nonEmpty) if (exiting.nonEmpty)
logInfo( logInfo(
"Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", "Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
@ -1384,7 +1385,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
def publishMembershipState(): Unit = { def publishMembershipState(): Unit = {
if (cluster.settings.Debug.VerboseGossipLogging) if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.SelfDataCenter, membershipState.latestGossip) logDebug("New gossip published [{}]", membershipState.latestGossip)
publisher ! PublishChanges(membershipState) publisher ! PublishChanges(membershipState)
if (PublishStatsInterval == Duration.Zero) publishInternalStats() if (PublishStatsInterval == Duration.Zero) publishInternalStats()
@ -1417,13 +1418,13 @@ private[cluster] case object IncompatibleConfigurationDetected extends Reason
* that other seed node to join existing cluster. * that other seed node to join existing cluster.
*/ */
@InternalApi @InternalApi
private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging { private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor {
import InternalClusterAction._ import InternalClusterAction._
import ClusterUserAction.JoinTo import ClusterUserAction.JoinTo
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.settings._ import cluster.settings._
import cluster.InfoLogger._ import cluster.ClusterLogger._
def selfAddress = cluster.selfAddress def selfAddress = cluster.selfAddress
@ -1451,8 +1452,8 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
remainingSeedNodes foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) } remainingSeedNodes foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) }
} else { } else {
// no InitJoinAck received, initialize new cluster by joining myself // no InitJoinAck received, initialize new cluster by joining myself
if (log.isDebugEnabled) if (isDebugEnabled)
log.debug( logDebug(
"Couldn't join other seed nodes, will join myself. seed-nodes=[{}]", "Couldn't join other seed nodes, will join myself. seed-nodes=[{}]",
seedNodes.mkString(", ")) seedNodes.mkString(", "))
context.parent ! JoinTo(selfAddress) context.parent ! JoinTo(selfAddress)
@ -1469,13 +1470,13 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
context.stop(self) context.stop(self)
case Invalid(messages) if ByPassConfigCompatCheck case Invalid(messages) if ByPassConfigCompatCheck
log.warning("Cluster validated this node config, but sent back incompatible settings: {}. " + logWarning("Cluster validated this node config, but sent back incompatible settings: {}. " +
"Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", ")) "Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", "))
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.stop(self) context.stop(self)
case Invalid(messages) case Invalid(messages)
log.error("Cluster validated this node config, but sent back incompatible settings: {}. " + logError("Cluster validated this node config, but sent back incompatible settings: {}. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version. " + "It's recommended to perform a full cluster shutdown in order to deploy this new version. " +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
"'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " + "'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " +
@ -1487,7 +1488,7 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
case InitJoinAck(address, UncheckedConfig) case InitJoinAck(address, UncheckedConfig)
logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
log.warning("Joining a cluster without configuration compatibility check feature.") logWarning("Joining a cluster without configuration compatibility check feature.")
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.stop(self) context.stop(self)
@ -1496,12 +1497,12 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
if (ByPassConfigCompatCheck) { if (ByPassConfigCompatCheck) {
// only join if set to ignore config validation // only join if set to ignore config validation
logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
log.warning("Joining cluster with incompatible configurations. " + logWarning("Joining cluster with incompatible configurations. " +
"Join will be performed because compatibility check is configured to not be enforced.") "Join will be performed because compatibility check is configured to not be enforced.")
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.stop(self) context.stop(self)
} else { } else {
log.error( logError(
"Couldn't join seed nodes because of incompatible cluster configuration. " + "Couldn't join seed nodes because of incompatible cluster configuration. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version." + "It's recommended to perform a full cluster shutdown in order to deploy this new version." +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
@ -1549,12 +1550,13 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
* *
*/ */
@InternalApi @InternalApi
private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging { private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor {
import InternalClusterAction._ import InternalClusterAction._
import ClusterUserAction.JoinTo import ClusterUserAction.JoinTo
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.settings._ import cluster.settings._
import cluster.ClusterLogger._
def selfAddress = cluster.selfAddress def selfAddress = cluster.selfAddress
if (seedNodes.isEmpty || seedNodes.head == selfAddress) if (seedNodes.isEmpty || seedNodes.head == selfAddress)
@ -1579,7 +1581,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
otherSeedNodes.foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) } otherSeedNodes.foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) }
case InitJoinAck(address, CompatibleConfig(clusterConfig)) case InitJoinAck(address, CompatibleConfig(clusterConfig))
log.info("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
// validates config coming from cluster against this node config // validates config coming from cluster against this node config
joinConfigCompatChecker.check(clusterConfig, context.system.settings.config) match { joinConfigCompatChecker.check(clusterConfig, context.system.settings.config) match {
case Valid case Valid
@ -1588,13 +1590,13 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
context.become(done) context.become(done)
case Invalid(messages) if ByPassConfigCompatCheck case Invalid(messages) if ByPassConfigCompatCheck
log.warning("Cluster validated this node config, but sent back incompatible settings: {}. " + logWarning("Cluster validated this node config, but sent back incompatible settings: {}. " +
"Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", ")) "Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", "))
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.become(done) context.become(done)
case Invalid(messages) case Invalid(messages)
log.error("Cluster validated this node config, but sent back incompatible settings: {}. " + logError("Cluster validated this node config, but sent back incompatible settings: {}. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version. " + "It's recommended to perform a full cluster shutdown in order to deploy this new version. " +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
"'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " + "'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " +
@ -1605,21 +1607,21 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
} }
case InitJoinAck(address, UncheckedConfig) case InitJoinAck(address, UncheckedConfig)
log.warning("Joining a cluster without configuration compatibility check feature.") logWarning("Joining a cluster without configuration compatibility check feature.")
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.become(done) context.become(done)
case InitJoinAck(address, IncompatibleConfig) case InitJoinAck(address, IncompatibleConfig)
// first InitJoinAck reply, but incompatible // first InitJoinAck reply, but incompatible
if (ByPassConfigCompatCheck) { if (ByPassConfigCompatCheck) {
log.info("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress)
log.warning("Joining cluster with incompatible configurations. " + logWarning("Joining cluster with incompatible configurations. " +
"Join will be performed because compatibility check is configured to not be enforced.") "Join will be performed because compatibility check is configured to not be enforced.")
// only join if set to ignore config validation // only join if set to ignore config validation
context.parent ! JoinTo(address) context.parent ! JoinTo(address)
context.become(done) context.become(done)
} else { } else {
log.error( logError(
"Couldn't join seed nodes because of incompatible cluster configuration. " + "Couldn't join seed nodes because of incompatible cluster configuration. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version." + "It's recommended to perform a full cluster shutdown in order to deploy this new version." +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
@ -1634,7 +1636,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
case ReceiveTimeout case ReceiveTimeout
if (attempt >= 2) if (attempt >= 2)
log.warning( logWarning(
"Couldn't join seed nodes after [{}] attempts, will try again. seed-nodes=[{}]", "Couldn't join seed nodes after [{}] attempts, will try again. seed-nodes=[{}]",
attempt, seedNodes.filterNot(_ == selfAddress).mkString(", ")) attempt, seedNodes.filterNot(_ == selfAddress).mkString(", "))
// no InitJoinAck received, try again // no InitJoinAck received, try again
@ -1653,9 +1655,11 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
* The supplied callback will be run, once, when current cluster member come up with the same status. * The supplied callback will be run, once, when current cluster member come up with the same status.
*/ */
@InternalApi @InternalApi
private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor with ActorLogging { private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor {
import ClusterEvent._ import ClusterEvent._
private val cluster = Cluster(context.system) private val cluster = Cluster(context.system)
import cluster.ClusterLogger._
private val to = status match { private val to = status match {
case Up classOf[MemberUp] case Up classOf[MemberUp]
case Removed classOf[MemberRemoved] case Removed classOf[MemberRemoved]
@ -1686,7 +1690,7 @@ private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status:
private def done(): Unit = { private def done(): Unit = {
try callback.run() catch { try callback.run() catch {
case NonFatal(e) log.error(e, "[{}] callback failed with [{}]", s"On${to.getSimpleName}", e.getMessage) case NonFatal(e) logError(e, "[{}] callback failed with [{}]", s"On${to.getSimpleName}", e.getMessage)
} finally { } finally {
context stop self context stop self
} }

View file

@ -140,7 +140,7 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
new ObjectName("akka:type=Cluster") new ObjectName("akka:type=Cluster")
private def clusterView = cluster.readView private def clusterView = cluster.readView
import cluster.InfoLogger._ import cluster.ClusterLogger._
/** /**
* Creates the cluster JMX MBean and registers it in the MBean server. * Creates the cluster JMX MBean and registers it in the MBean server.

View file

@ -21,7 +21,7 @@ import akka.util.OptionVal
* cluster events published on the event bus. * cluster events published on the event bus.
*/ */
private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
import cluster.InfoLogger._ import cluster.ClusterLogger._
/** /**
* Current state * Current state

View file

@ -68,7 +68,7 @@ class ClusterLogDefaultSpec extends ClusterLogSpec(ClusterLogSpec.config) {
cluster.settings.LogInfoVerbose should ===(false) cluster.settings.LogInfoVerbose should ===(false)
join("is the new leader") join("is the new leader")
awaitUp() awaitUp()
down("is no longer the leader") down("is no longer leader")
} }
} }
} }