diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala index fe0c53c066..a2ac9a0978 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala @@ -130,7 +130,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { import context.dispatcher val cluster = Cluster(context.system) import cluster.{ selfAddress, scheduler } - import cluster.InfoLogger._ + import cluster.ClusterLogger._ val metrics = ClusterMetricsExtension(context.system) import metrics.settings._ diff --git a/akka-cluster/src/main/mima-filters/2.5.19.backwards.excludes b/akka-cluster/src/main/mima-filters/2.5.19.backwards.excludes index 07d2563e55..ceb557ce8b 100644 --- a/akka-cluster/src/main/mima-filters/2.5.19.backwards.excludes +++ b/akka-cluster/src/main/mima-filters/2.5.19.backwards.excludes @@ -2,3 +2,19 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterDaemon.this") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterDaemon.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterRemoteWatcher#DelayedQuarantine.this") + +# cleanup Cluster logging (internals) +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.JoinSeedNodeProcess") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.JoinSeedNodeProcess.log") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.OnMemberStatusChangedListener") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.OnMemberStatusChangedListener.log") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Cluster.InfoLogger") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterCoreSupervisor") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreSupervisor.log") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.FirstSeedNodeProcess") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.FirstSeedNodeProcess.log") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterDaemon") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterDaemon.log") +ProblemFilters.exclude[MissingClassProblem]("akka.cluster.Cluster$InfoLogger$") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterCoreDaemon") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.log") diff --git a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala index 5cd151be97..bf50a53f3d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala @@ -56,7 +56,7 @@ private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration) extends AutoDownBase(autoDownUnreachableAfter) with ActorLogging { val cluster = Cluster(context.system) - import cluster.InfoLogger._ + import cluster.ClusterLogger._ override def selfAddress = cluster.selfAddress diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 264e161de0..d28564e4c7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -18,13 +18,14 @@ import akka.japi.Util import akka.pattern._ import akka.remote.{ UniqueAddress ⇒ _, _ } import com.typesafe.config.{ Config, ConfigFactory } - import scala.annotation.varargs import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ Await, ExecutionContext } import scala.util.control.NonFatal +import akka.event.Logging.LogLevel + /** * Cluster Extension Id and factory for creating Cluster extension. */ @@ -59,7 +60,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { import ClusterEvent._ val settings = new ClusterSettings(system.settings.config, system.name) - import InfoLogger._ + import ClusterLogger._ import settings._ private val joinConfigCompatChecker: JoinConfigCompatChecker = JoinConfigCompatChecker.load(system, settings) @@ -437,35 +438,118 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { /** * INTERNAL API */ - private[cluster] object InfoLogger { + private[cluster] object ClusterLogger { + def isDebugEnabled: Boolean = + log.isDebugEnabled + + def logDebug(message: String): Unit = + logAtLevel(Logging.DebugLevel, message) + + def logDebug(template: String, arg1: Any): Unit = + logAtLevel(Logging.InfoLevel, template, arg1) + + def logDebug(template: String, arg1: Any, arg2: Any): Unit = + logAtLevel(Logging.InfoLevel, template, arg1, arg2) + + def logDebug(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3) def logInfo(message: String): Unit = - if (LogInfo) - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.info("Cluster Node [{}] - {}", selfAddress, message) - else - log.info("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message) + logAtLevel(Logging.InfoLevel, message) def logInfo(template: String, arg1: Any): Unit = - if (LogInfo) - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.info("Cluster Node [{}] - " + template, selfAddress, arg1) - else - log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1) + logAtLevel(Logging.InfoLevel, template, arg1) def logInfo(template: String, arg1: Any, arg2: Any): Unit = - if (LogInfo) - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2) - else - log.info("Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2) + logAtLevel(Logging.InfoLevel, template, arg1, arg2) def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = - if (LogInfo) + logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3) + + def logWarning(message: String): Unit = + logAtLevel(Logging.WarningLevel, message) + + def logWarning(template: String, arg1: Any): Unit = + logAtLevel(Logging.WarningLevel, template, arg1) + + def logWarning(template: String, arg1: Any, arg2: Any): Unit = + logAtLevel(Logging.WarningLevel, template, arg1, arg2) + + def logWarning(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + logAtLevel(Logging.WarningLevel, template, arg1, arg2, arg3) + + def logError(message: String): Unit = + logAtLevel(Logging.ErrorLevel, message) + + def logError(template: String, arg1: Any): Unit = + logAtLevel(Logging.ErrorLevel, template, arg1) + + def logError(template: String, arg1: Any, arg2: Any): Unit = + logAtLevel(Logging.ErrorLevel, template, arg1, arg2) + + def logError(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + logAtLevel(Logging.ErrorLevel, template, arg1, arg2, arg3) + + def logError(cause: Throwable, message: String): Unit = { + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.error(cause, "Cluster Node [{}] - {}", selfAddress, message) + else + log.error(cause, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message) + } + + def logError(cause: Throwable, template: String, arg1: Any): Unit = { + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1) + else + log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1) + } + + def logError(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = { + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2) + else + log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2) + } + + def logError(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3) + else + log.error(cause, "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3) + } + + private def logAtLevel(logLevel: LogLevel, message: String): Unit = { + if (isLevelEnabled(logLevel)) if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3) + log.log(logLevel, "Cluster Node [{}] - {}", selfAddress, message) else - log.info("Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3) + log.log(logLevel, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message) + } + + private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any): Unit = { + if (isLevelEnabled(logLevel)) + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1) + else + log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1) + } + + private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any): Unit = + if (isLevelEnabled(logLevel)) + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2) + else + log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2) + + private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (isLevelEnabled(logLevel)) + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3) + else + log.log(logLevel, "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3) + + private def isLevelEnabled(logLevel: LogLevel): Boolean = + LogInfo || logLevel < Logging.InfoLevel } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 4c4394f9c7..939e88e638 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -179,7 +179,7 @@ private[cluster] object InternalClusterAction { * Supervisor managing the different Cluster daemons. */ @InternalApi -private[cluster] final class ClusterDaemon(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging +private[cluster] final class ClusterDaemon(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ // Important - don't use Cluster(context.system) in constructor because that would @@ -243,7 +243,7 @@ private[cluster] final class ClusterDaemon(joinConfigCompatChecker: JoinConfigCo * would be obsolete. Shutdown the member if any those actors crashed. */ @InternalApi -private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging +private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { // Important - don't use Cluster(context.system) in constructor because that would @@ -263,7 +263,8 @@ private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: Join override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ - log.error(e, "Cluster node [{}] crashed, [{}] - shutting down...", Cluster(context.system).selfAddress, e.getMessage) + Cluster(context.system).ClusterLogger.logError( + e, "crashed, [{}] - shutting down...", e.getMessage) self ! PoisonPill Stop } @@ -292,7 +293,7 @@ private[cluster] object ClusterCoreDaemon { * INTERNAL API. */ @InternalApi -private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging +private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ import ClusterCoreDaemon._ @@ -301,7 +302,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val cluster = Cluster(context.system) import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector } import cluster.settings._ - import cluster.InfoLogger._ + import cluster.ClusterLogger._ val selfDc = cluster.selfDataCenter @@ -461,7 +462,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } private def joinSeedNodesWasUnsuccessful(): Unit = { - log.warning( + logWarning( "Joining of seed-nodes [{}] was unsuccessful after configured " + "shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.", seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes) @@ -574,11 +575,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } case Invalid(messages) ⇒ // messages are only logged on the cluster side - log.warning( + logWarning( "Found incompatible settings when [{}] tried to join: {}. " + - "Self version [{}], Joining version [{}].", + s"Self version [{}], Joining version [$joiningNodeVersion].", sender().path.address, messages.mkString(", "), - context.system.settings.ConfigVersion, joiningNodeVersion) + context.system.settings.ConfigVersion) if (configCheckUnsupportedByJoiningNode) ConfigCheckUnsupportedByJoiningNode else @@ -621,11 +622,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh */ def join(address: Address): Unit = { if (address.protocol != selfAddress.protocol) - log.warning( + logWarning( "Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]", selfAddress.protocol, address.protocol) else if (address.system != selfAddress.system) - log.warning( + logWarning( "Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", selfAddress.system, address.system) else { @@ -666,11 +667,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def joining(joiningNode: UniqueAddress, roles: Set[String]): Unit = { val selfStatus = latestGossip.member(selfUniqueAddress).status if (joiningNode.address.protocol != selfAddress.protocol) - log.warning( + logWarning( "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", selfAddress.protocol, joiningNode.address.protocol) else if (joiningNode.address.system != selfAddress.system) - log.warning( + logWarning( "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", selfAddress.system, joiningNode.address.system) else if (removeUnreachableWithMemberStatus.contains(selfStatus)) @@ -854,10 +855,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val newOverview = localGossip.overview copy (reachability = newReachability) val newGossip = localGossip copy (overview = newOverview) updateLatestGossip(newGossip) - log.warning( - "Cluster Node [{}] - Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]. " + + logWarning( + "Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]. " + "It must still be marked as down before it's removed.", - selfAddress, node.address, selfRoles.mkString(",")) + node.address, selfRoles.mkString(",")) publishMembershipState() } } @@ -897,7 +898,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val localGossip = latestGossip if (remoteGossip eq Gossip.empty) { - log.debug("Cluster Node [{}] - Ignoring received gossip from [{}] to protect against overload", selfAddress, from) + logDebug("Ignoring received gossip from [{}] to protect against overload", from) Ignored } else if (envelope.to != selfUniqueAddress) { logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) @@ -934,14 +935,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // Removal of member itself is handled in merge (pickHighestPriority) val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) ⇒ if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) { - log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m) + logDebug("Pruned conflicting local gossip: {}", m) g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) } else g } val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) ⇒ if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) { - log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m) + logDebug("Pruned conflicting remote gossip: {}", m) g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) } else g @@ -967,10 +968,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } } - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + logDebug("Receiving gossip from [{}]", selfAddress, from) if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) { - log.debug( + logDebug( """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", remoteGossip, localGossip, winningGossip) } @@ -1048,7 +1049,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh gossipTo(peer) case None ⇒ // nothing to see here if (cluster.settings.Debug.VerboseGossipLogging) - log.debug("Cluster Node [{}] dc [{}] will not gossip this round", selfAddress, cluster.settings.SelfDataCenter) + logDebug("will not gossip this round") } } @@ -1060,7 +1061,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh if (membershipState.isLeader(selfUniqueAddress)) { // only run the leader actions if we are the LEADER of the data center if (!isCurrentlyLeader) { - logInfo("Cluster Node [{}] dc [{}] is the new leader", selfAddress, cluster.settings.SelfDataCenter) + logInfo("is the new leader among reachable nodes (more leaders may exist)") isCurrentlyLeader = true } val firstNotice = 20 @@ -1085,7 +1086,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh }.mkString(", ")) } } else if (isCurrentlyLeader) { - logInfo("Cluster Node [{}] dc [{}] is no longer the leader", selfAddress, cluster.settings.SelfDataCenter) + logInfo("is no longer leader") isCurrentlyLeader = false } cleanupExitingConfirmed() @@ -1234,10 +1235,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val targets = membershipState.gossipTargetsForExitingMembers(exitingMembers) if (targets.nonEmpty) { - if (log.isDebugEnabled) - log.debug( - "Cluster Node [{}] - Gossip exiting members [{}] to the two oldest (per role) [{}] (singleton optimization).", - selfAddress, exitingMembers.mkString(", "), targets.mkString(", ")) + if (isDebugEnabled) + logDebug( + "Gossip exiting members [{}] to the two oldest (per role) [{}] (singleton optimization).", + exitingMembers.mkString(", "), targets.mkString(", ")) targets.foreach(m ⇒ gossipTo(m.uniqueAddress)) } @@ -1316,7 +1317,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting) if (nonExiting.nonEmpty) - log.warning("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]. Node roles [{}]", selfAddress, nonExiting.mkString(", "), selfRoles.mkString(", ")) + logWarning("Marking node(s) as UNREACHABLE [{}]. Node roles [{}]", nonExiting.mkString(", "), selfRoles.mkString(", ")) if (exiting.nonEmpty) logInfo( "Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", @@ -1384,7 +1385,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def publishMembershipState(): Unit = { if (cluster.settings.Debug.VerboseGossipLogging) - log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.SelfDataCenter, membershipState.latestGossip) + logDebug("New gossip published [{}]", membershipState.latestGossip) publisher ! PublishChanges(membershipState) if (PublishStatsInterval == Duration.Zero) publishInternalStats() @@ -1417,13 +1418,13 @@ private[cluster] case object IncompatibleConfigurationDetected extends Reason * that other seed node to join existing cluster. */ @InternalApi -private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging { +private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor { import InternalClusterAction._ import ClusterUserAction.JoinTo val cluster = Cluster(context.system) import cluster.settings._ - import cluster.InfoLogger._ + import cluster.ClusterLogger._ def selfAddress = cluster.selfAddress @@ -1451,8 +1452,8 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe remainingSeedNodes foreach { a ⇒ context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) } } else { // no InitJoinAck received, initialize new cluster by joining myself - if (log.isDebugEnabled) - log.debug( + if (isDebugEnabled) + logDebug( "Couldn't join other seed nodes, will join myself. seed-nodes=[{}]", seedNodes.mkString(", ")) context.parent ! JoinTo(selfAddress) @@ -1469,13 +1470,13 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe context.stop(self) case Invalid(messages) if ByPassConfigCompatCheck ⇒ - log.warning("Cluster validated this node config, but sent back incompatible settings: {}. " + + logWarning("Cluster validated this node config, but sent back incompatible settings: {}. " + "Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", ")) context.parent ! JoinTo(address) context.stop(self) case Invalid(messages) ⇒ - log.error("Cluster validated this node config, but sent back incompatible settings: {}. " + + logError("Cluster validated this node config, but sent back incompatible settings: {}. " + "It's recommended to perform a full cluster shutdown in order to deploy this new version. " + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " + "'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " + @@ -1487,7 +1488,7 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe case InitJoinAck(address, UncheckedConfig) ⇒ logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) - log.warning("Joining a cluster without configuration compatibility check feature.") + logWarning("Joining a cluster without configuration compatibility check feature.") context.parent ! JoinTo(address) context.stop(self) @@ -1496,12 +1497,12 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe if (ByPassConfigCompatCheck) { // only join if set to ignore config validation logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) - log.warning("Joining cluster with incompatible configurations. " + + logWarning("Joining cluster with incompatible configurations. " + "Join will be performed because compatibility check is configured to not be enforced.") context.parent ! JoinTo(address) context.stop(self) } else { - log.error( + logError( "Couldn't join seed nodes because of incompatible cluster configuration. " + "It's recommended to perform a full cluster shutdown in order to deploy this new version." + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " + @@ -1549,12 +1550,13 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe * */ @InternalApi -private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging { +private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address], joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor { import InternalClusterAction._ import ClusterUserAction.JoinTo val cluster = Cluster(context.system) import cluster.settings._ + import cluster.ClusterLogger._ def selfAddress = cluster.selfAddress if (seedNodes.isEmpty || seedNodes.head == selfAddress) @@ -1579,7 +1581,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq otherSeedNodes.foreach { a ⇒ context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) } case InitJoinAck(address, CompatibleConfig(clusterConfig)) ⇒ - log.info("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) + logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) // validates config coming from cluster against this node config joinConfigCompatChecker.check(clusterConfig, context.system.settings.config) match { case Valid ⇒ @@ -1588,13 +1590,13 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq context.become(done) case Invalid(messages) if ByPassConfigCompatCheck ⇒ - log.warning("Cluster validated this node config, but sent back incompatible settings: {}. " + + logWarning("Cluster validated this node config, but sent back incompatible settings: {}. " + "Join will be performed because compatibility check is configured to not be enforced.", messages.mkString(", ")) context.parent ! JoinTo(address) context.become(done) case Invalid(messages) ⇒ - log.error("Cluster validated this node config, but sent back incompatible settings: {}. " + + logError("Cluster validated this node config, but sent back incompatible settings: {}. " + "It's recommended to perform a full cluster shutdown in order to deploy this new version. " + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " + "'akka.cluster.configuration-compatibility-check.enforce-on-join = off'. " + @@ -1605,21 +1607,21 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq } case InitJoinAck(address, UncheckedConfig) ⇒ - log.warning("Joining a cluster without configuration compatibility check feature.") + logWarning("Joining a cluster without configuration compatibility check feature.") context.parent ! JoinTo(address) context.become(done) case InitJoinAck(address, IncompatibleConfig) ⇒ // first InitJoinAck reply, but incompatible if (ByPassConfigCompatCheck) { - log.info("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) - log.warning("Joining cluster with incompatible configurations. " + + logInfo("Received InitJoinAck message from [{}] to [{}]", sender(), selfAddress) + logWarning("Joining cluster with incompatible configurations. " + "Join will be performed because compatibility check is configured to not be enforced.") // only join if set to ignore config validation context.parent ! JoinTo(address) context.become(done) } else { - log.error( + logError( "Couldn't join seed nodes because of incompatible cluster configuration. " + "It's recommended to perform a full cluster shutdown in order to deploy this new version." + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " + @@ -1634,7 +1636,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq case ReceiveTimeout ⇒ if (attempt >= 2) - log.warning( + logWarning( "Couldn't join seed nodes after [{}] attempts, will try again. seed-nodes=[{}]", attempt, seedNodes.filterNot(_ == selfAddress).mkString(", ")) // no InitJoinAck received, try again @@ -1653,9 +1655,11 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq * The supplied callback will be run, once, when current cluster member come up with the same status. */ @InternalApi -private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor with ActorLogging { +private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor { import ClusterEvent._ private val cluster = Cluster(context.system) + import cluster.ClusterLogger._ + private val to = status match { case Up ⇒ classOf[MemberUp] case Removed ⇒ classOf[MemberRemoved] @@ -1686,7 +1690,7 @@ private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: private def done(): Unit = { try callback.run() catch { - case NonFatal(e) ⇒ log.error(e, "[{}] callback failed with [{}]", s"On${to.getSimpleName}", e.getMessage) + case NonFatal(e) ⇒ logError(e, "[{}] callback failed with [{}]", s"On${to.getSimpleName}", e.getMessage) } finally { context stop self } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index f3ef7613da..0e30739c57 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -140,7 +140,7 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { new ObjectName("akka:type=Cluster") private def clusterView = cluster.readView - import cluster.InfoLogger._ + import cluster.ClusterLogger._ /** * Creates the cluster JMX MBean and registers it in the MBean server. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index c1773a6901..2d664cdc25 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -21,7 +21,7 @@ import akka.util.OptionVal * cluster events published on the event bus. */ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { - import cluster.InfoLogger._ + import cluster.ClusterLogger._ /** * Current state diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterLogSpec.scala index 8eb5cc3886..acfbc33cf0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterLogSpec.scala @@ -68,7 +68,7 @@ class ClusterLogDefaultSpec extends ClusterLogSpec(ClusterLogSpec.config) { cluster.settings.LogInfoVerbose should ===(false) join("is the new leader") awaitUp() - down("is no longer the leader") + down("is no longer leader") } } }