diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 1d49d79425..963d1527c1 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -51,6 +51,9 @@ akka { # until the cluster has reached a certain size. min-nr-of-members = 1 + # Enable/disable info level logging of cluster events + log-info = on + # Enable or disable JMX MBeans for management of the cluster jmx.enabled = on diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c3716ca2b6..9125e10222 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -56,6 +56,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ + import InfoLogger._ /** * INTERNAL API @@ -88,7 +89,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { // ClusterJmx is initialized as the last thing in the constructor private var clusterJmx: Option[ClusterJmx] = None - log.info("Cluster Node [{}] - is starting up...", selfAddress) + logInfo("Starting up...") val failureDetector: FailureDetectorRegistry[Address] = { def createFailureDetector(): FailureDetector = @@ -107,7 +108,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { private[cluster] val scheduler: Scheduler = { if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) { import scala.collection.JavaConverters._ - log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + + logInfo("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", (1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis) @@ -179,7 +180,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { Some(jmx) } - log.info("Cluster Node [{}] - has started up successfully", selfAddress) + logInfo("Started up successfully") // ====================================================== // ===================== PUBLIC API ===================== @@ -290,7 +291,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { */ private[cluster] def shutdown(): Unit = { if (_isTerminated.compareAndSet(false, true)) { - log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) + logInfo("Shutting down...") system.stop(clusterDaemons) if (readViewStarted) readView.close() @@ -299,7 +300,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { clusterJmx foreach { _.unregisterMBean() } - log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress) + logInfo("Successfully shut down") } } @@ -308,4 +309,19 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { case _ ⇒ } + /** + * INTERNAL API + */ + private[cluster] object InfoLogger { + + def logInfo(message: String): Unit = + if (LogInfo) log.info("Cluster Node [{}] - {}", selfAddress, message) + + def logInfo(template: String, arg1: Any): Unit = + if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1) + + def logInfo(template: String, arg1: Any, arg2: Any): Unit = + if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2) + } + } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index c0d6ac9b4c..192e1c8861 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -221,6 +221,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val cluster = Cluster(context.system) import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector } import cluster.settings._ + import cluster.InfoLogger._ val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 @@ -265,12 +266,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick)) } - override def preStart(): Unit = { + override def preStart(): Unit = if (SeedNodes.isEmpty) - log.info("No seed-nodes configured, manual cluster join required") + logInfo("No seed-nodes configured, manual cluster join required") else self ! JoinSeedNodes(SeedNodes) - } override def postStop(): Unit = { gossipTask.cancel() @@ -318,7 +318,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case SendGossipTo(address) ⇒ sendGossipTo(address) case msg: SubscriptionMessage ⇒ publisher forward msg case ClusterUserAction.JoinTo(address) ⇒ - log.info("Trying to join [{}] when already part of a cluster, ignoring", address) + logInfo("Trying to join [{}] when already part of a cluster, ignoring", address) } @@ -419,9 +419,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val isUnreachable = localUnreachable.exists(_.address == node.address) if (alreadyMember) - log.info("Existing member [{}] is trying to join, ignoring", node) + logInfo("Existing member [{}] is trying to join, ignoring", node) else if (isUnreachable) - log.info("Unreachable member [{}] is trying to join, ignoring", node) + logInfo("Unreachable member [{}] is trying to join, ignoring", node) else { // remove the node from the failure detector @@ -434,7 +434,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto updateLatestGossip(newGossip) - log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", ")) + logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", ")) if (node != selfUniqueAddress) { sender ! Welcome(selfUniqueAddress, latestGossip) } @@ -450,9 +450,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = { require(latestGossip.members.isEmpty, "Join can only be done from empty state") if (joinWith != from.address) - log.info("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith) + logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith) else { - log.info("Cluster Node [{}] - Welcome from [{}]", selfAddress, from.address) + logInfo("Welcome from [{}]", from.address) latestGossip = gossip seen selfUniqueAddress publish(latestGossip) if (from != selfUniqueAddress) @@ -474,7 +474,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto updateLatestGossip(newGossip) - log.info("Cluster Node [{}] - Marked address [{}] as [{}]", selfAddress, address, Leaving) + logInfo("Marked address [{}] as [{}]", address, Leaving) publish(latestGossip) } } @@ -482,10 +482,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto /** * This method is called when a member sees itself as Exiting. */ - def shutdown(): Unit = { - log.info("Cluster Node [{}] - Node shutting down...", latestGossip.member(selfUniqueAddress)) - cluster.shutdown() - } + def shutdown(): Unit = cluster.shutdown() /** * State transition to DOW. @@ -508,7 +505,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val newMembers = downedMember match { case Some(m) ⇒ - log.info("Cluster Node [{}] - Marking node [{}] as [{}]", selfAddress, m.address, Down) + logInfo("Marking node [{}] as [{}]", m.address, Down) localMembers - m case None ⇒ localMembers } @@ -518,7 +515,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto localUnreachableMembers.map { member ⇒ // no need to DOWN members already DOWN if (member.address == address && member.status != Down) { - log.info("Cluster Node [{}] - Marking unreachable node [{}] as [{}]", selfAddress, member.address, Down) + logInfo("Marking unreachable node [{}] as [{}]", member.address, Down) member copy (status = Down) } else member } @@ -540,9 +537,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def receiveGossipStatus(status: GossipStatus): Unit = { val from = status.from if (latestGossip.overview.unreachable.exists(_.uniqueAddress == from)) - log.info("Ignoring received gossip status from unreachable [{}] ", from) + logInfo("Ignoring received gossip status from unreachable [{}] ", from) else if (latestGossip.members.forall(_.uniqueAddress != from)) - log.info("Ignoring received gossip status from unknown [{}]", from) + logInfo("Ignoring received gossip status from unknown [{}]", from) else { (status.version tryCompareTo latestGossip.version) match { case Some(0) ⇒ // same version @@ -561,15 +558,15 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localGossip = latestGossip if (envelope.to != selfUniqueAddress) - log.info("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) + logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) - log.info("Ignoring received gossip with myself as unreachable, from [{}]", selfAddress, from.address) + logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address) else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) - log.info("Ignoring received gossip from unreachable [{}] ", from) + logInfo("Ignoring received gossip from unreachable [{}] ", from) else if (localGossip.members.forall(_.uniqueAddress != from)) - log.info("Ignoring received gossip from unknown [{}]", from) + logInfo("Ignoring received gossip from unknown [{}]", from) else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) - log.info("Ignoring received gossip that does not contain myself, from [{}]", from) + logInfo("Ignoring received gossip that does not contain myself, from [{}]", from) else { val comparison = remoteGossip.version tryCompareTo localGossip.version @@ -749,14 +746,13 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // log status changes changedMembers foreach { m ⇒ - log.info("Cluster Node [{}] - Leader is moving node [{}] to [{}]", - selfAddress, m.address, m.status) + logInfo("Leader is moving node [{}] to [{}]", m.address, m.status) } // log the removal of the unreachable nodes removedUnreachable foreach { m ⇒ val status = if (m.status == Exiting) "exiting" else "unreachable" - log.info("Cluster Node [{}] - Leader is removing {} node [{}]", selfAddress, status, m.address) + logInfo("Leader is removing {} node [{}]", status, m.address) } publish(latestGossip) @@ -807,7 +803,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // log the auto-downing of the unreachable nodes changedUnreachableMembers foreach { m ⇒ - log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, m.address, m.status) + logInfo("Leader is marking unreachable node [{}] as [{}]", m.address, m.status) } publish(latestGossip) @@ -844,8 +840,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if (nonExiting.nonEmpty) log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", ")) if (exiting.nonEmpty) - log.info("Cluster Node [{}] - Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", - selfAddress, exiting.mkString(", ")) + logInfo("Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", + exiting.mkString(", ")) publish(latestGossip) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 04deb749a8..7bc91d4516 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -88,6 +88,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg val cluster = Cluster(context.system) import cluster.{ selfAddress, scheduler } import cluster.settings._ + import cluster.InfoLogger._ import context.dispatcher val selfHeartbeat = Heartbeat(selfAddress) @@ -170,7 +171,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def triggerFirstHeartbeat(address: Address): Unit = if (!cluster.failureDetector.isMonitoring(address)) { - log.info("Trigger extra expected heartbeat from [{}]", address) + logInfo("Trigger extra expected heartbeat from [{}]", address) cluster.failureDetector.heartbeat(address) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index ef74909b48..53feb35beb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -91,6 +91,7 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { private val mBeanServer = ManagementFactory.getPlatformMBeanServer private val clusterMBeanName = new ObjectName("akka:type=Cluster") private def clusterView = cluster.readView + import cluster.InfoLogger._ /** * Creates the cluster JMX MBean and registers it in the MBean server. @@ -130,7 +131,7 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { } try { mBeanServer.registerMBean(mbean, clusterMBeanName) - log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterView.selfAddress, clusterMBeanName) + logInfo("Registered cluster JMX MBean [{}]", clusterMBeanName) } catch { case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index a666886c47..ad988460b5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -46,7 +46,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto import context.dispatcher val cluster = Cluster(context.system) import cluster.{ selfAddress, scheduler, settings } - import settings._ + import cluster.settings._ + import cluster.InfoLogger._ /** * The node ring gossipped that contains only members that are Up. @@ -78,7 +79,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[UnreachableMember]) - log.info("Metrics collection has started successfully on node [{}]", selfAddress) + logInfo("Metrics collection has started successfully") } def receive = { @@ -782,10 +783,11 @@ private[cluster] object MetricsCollector { Try(new SigarMetricsCollector(system)) match { case Success(sigarCollector) ⇒ sigarCollector case Failure(e) ⇒ - log.info("Metrics will be retreived from MBeans, and may be incorrect on some platforms. " + - "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " + - "platform-specific native libary to 'java.library.path'. Reason: " + - e.toString) + Cluster(system).InfoLogger.logInfo( + "Metrics will be retreived from MBeans, and may be incorrect on some platforms. " + + "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " + + "platform-specific native libary to 'java.library.path'. Reason: " + + e.toString) new JmxMetricsCollector(system) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index d23065c4ed..176cfc5ec9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -16,78 +16,79 @@ import akka.util.Helpers.Requiring import scala.concurrent.duration.FiniteDuration import akka.japi.Util.immutableSeq -class ClusterSettings(val config: Config, val systemName: String) { +final class ClusterSettings(val config: Config, val systemName: String) { private val cc = config.getConfig("akka.cluster") - final val FailureDetectorConfig: Config = cc.getConfig("failure-detector") - final val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class") - final val HeartbeatInterval: FiniteDuration = { + val LogInfo: Boolean = cc.getBoolean("log-info") + val FailureDetectorConfig: Config = cc.getConfig("failure-detector") + val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class") + val HeartbeatInterval: FiniteDuration = { Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0") - final val HeartbeatRequestDelay: FiniteDuration = { + val HeartbeatRequestDelay: FiniteDuration = { Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.grace-period"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0") - final val HeartbeatExpectedResponseAfter: FiniteDuration = { + val HeartbeatExpectedResponseAfter: FiniteDuration = { Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.expected-response-after"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0") - final val HeartbeatRequestTimeToLive: FiniteDuration = { + val HeartbeatRequestTimeToLive: FiniteDuration = { Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS) } requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0") - final val NumberOfEndHeartbeats: Int = { + val NumberOfEndHeartbeats: Int = { FailureDetectorConfig.getInt("nr-of-end-heartbeats") } requiring (_ > 0, "failure-detector.nr-of-end-heartbeats must be > 0") - final val MonitoredByNrOfMembers: Int = { + val MonitoredByNrOfMembers: Int = { FailureDetectorConfig.getInt("monitored-by-nr-of-members") } requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0") - final val SeedNodes: immutable.IndexedSeq[Address] = + val SeedNodes: immutable.IndexedSeq[Address] = immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector - final val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS) - final val RetryUnsuccessfulJoinAfter: Duration = { + val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS) + val RetryUnsuccessfulJoinAfter: Duration = { val key = "retry-unsuccessful-join-after" cc.getString(key).toLowerCase match { case "off" ⇒ Duration.Undefined case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ > Duration.Zero, key + " > 0s, or off") } } - final val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS) - final val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS) - final val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS) - final val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) - final val PublishStatsInterval: Duration = { + val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS) + val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS) + val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS) + val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) + val PublishStatsInterval: Duration = { val key = "publish-stats-interval" cc.getString(key).toLowerCase match { case "off" ⇒ Duration.Undefined case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ >= Duration.Zero, key + " >= 0s, or off") } } - final val AutoDown: Boolean = cc.getBoolean("auto-down") - final val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet - final val MinNrOfMembers: Int = { + val AutoDown: Boolean = cc.getBoolean("auto-down") + val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet + val MinNrOfMembers: Int = { cc.getInt("min-nr-of-members") } requiring (_ > 0, "min-nr-of-members must be > 0") - final val MinNrOfMembersOfRole: Map[String, Int] = { + val MinNrOfMembersOfRole: Map[String, Int] = { import scala.collection.JavaConverters._ cc.getConfig("role").root.asScala.collect { case (key, value: ConfigObject) ⇒ (key -> value.toConfig.getInt("min-nr-of-members")) }.toMap } - final val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") - final val UseDispatcher: String = cc.getString("use-dispatcher") match { + val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") + val UseDispatcher: String = cc.getString("use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId case id ⇒ id } - final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability") - final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS) - final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") - final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled") - final val MetricsCollectorClass: String = cc.getString("metrics.collector-class") - final val MetricsInterval: FiniteDuration = { + val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability") + val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS) + val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") + val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled") + val MetricsCollectorClass: String = cc.getString("metrics.collector-class") + val MetricsInterval: FiniteDuration = { Duration(cc.getMilliseconds("metrics.collect-interval"), MILLISECONDS) } requiring (_ > Duration.Zero, "metrics.collect-interval must be > 0") - final val MetricsGossipInterval: FiniteDuration = Duration(cc.getMilliseconds("metrics.gossip-interval"), MILLISECONDS) - final val MetricsMovingAverageHalfLife: FiniteDuration = { + val MetricsGossipInterval: FiniteDuration = Duration(cc.getMilliseconds("metrics.gossip-interval"), MILLISECONDS) + val MetricsMovingAverageHalfLife: FiniteDuration = { Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS) } requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 36c40ce8e6..5294a604c1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -18,6 +18,7 @@ class ClusterConfigSpec extends AkkaSpec { "be able to parse generic cluster config elements" in { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ + LogInfo must be(true) FailureDetectorConfig.getDouble("threshold") must be(8.0 plusOrMinus 0.0001) FailureDetectorConfig.getInt("max-sample-size") must be(1000) Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index d179342cda..33d45b376d 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -33,10 +33,9 @@ object ClusterSingletonManager { role: Option[String], maxHandOverRetries: Int = 20, maxTakeOverRetries: Int = 15, - retryInterval: FiniteDuration = 1.second, - loggingEnabled: Boolean = true): Props = + retryInterval: FiniteDuration = 1.second): Props = Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role, - maxHandOverRetries, maxTakeOverRetries, retryInterval, loggingEnabled) + maxHandOverRetries, maxTakeOverRetries, retryInterval) /** * Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]]. @@ -48,7 +47,6 @@ object ClusterSingletonManager { maxHandOverRetries: Int, maxTakeOverRetries: Int, retryInterval: FiniteDuration, - loggingEnabled: Boolean, singletonPropsFactory: ClusterSingletonPropsFactory): Props = props(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage, ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval) @@ -368,8 +366,6 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to * ensure that new oldest doesn't start singleton actor before previous is * stopped for certain corner cases. - * - * '''''loggingEnabled''''' Logging of what is going on at info log level. */ class ClusterSingletonManager( singletonProps: Option[Any] ⇒ Props, @@ -378,8 +374,7 @@ class ClusterSingletonManager( role: Option[String], maxHandOverRetries: Int, maxTakeOverRetries: Int, - retryInterval: FiniteDuration, - loggingEnabled: Boolean) + retryInterval: FiniteDuration) extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] { // to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases @@ -392,6 +387,7 @@ class ClusterSingletonManager( val cluster = Cluster(context.system) val selfAddressOption = Some(cluster.selfAddress) + import cluster.settings.LogInfo require(role.forall(cluster.selfRoles.contains), s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") @@ -414,13 +410,13 @@ class ClusterSingletonManager( } def logInfo(message: String): Unit = - if (loggingEnabled) log.info(message) + if (LogInfo) log.info(message) def logInfo(template: String, arg1: Any): Unit = - if (loggingEnabled) log.info(template, arg1) + if (LogInfo) log.info(template, arg1) def logInfo(template: String, arg1: Any, arg2: Any): Unit = - if (loggingEnabled) log.info(template, arg1, arg2) + if (LogInfo) log.info(template, arg1, arg2) override def preStart(): Unit = { super.preStart() diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index c3e0db8858..ab89a54755 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -711,6 +711,13 @@ reference file for more information: .. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf :language: none +Cluster Info Logging +-------------------- + +You can silence the logging of cluster events at info level with configuration property:: + + akka.cluster.log-info = off + Cluster Scheduler ----------------- diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index f0e973576a..ac3e19a0a4 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -731,6 +731,13 @@ reference file for more information: .. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf :language: none +Cluster Info Logging +-------------------- + +You can silence the logging of cluster events at info level with configuration property:: + + akka.cluster.log-info = off + Cluster Scheduler -----------------