Config of cluster info logging, see #3225
This commit is contained in:
parent
ba88c44b6f
commit
18a3b3facf
11 changed files with 115 additions and 84 deletions
|
|
@ -51,6 +51,9 @@ akka {
|
||||||
# until the cluster has reached a certain size.
|
# until the cluster has reached a certain size.
|
||||||
min-nr-of-members = 1
|
min-nr-of-members = 1
|
||||||
|
|
||||||
|
# Enable/disable info level logging of cluster events
|
||||||
|
log-info = on
|
||||||
|
|
||||||
# Enable or disable JMX MBeans for management of the cluster
|
# Enable or disable JMX MBeans for management of the cluster
|
||||||
jmx.enabled = on
|
jmx.enabled = on
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -56,6 +56,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
val settings = new ClusterSettings(system.settings.config, system.name)
|
val settings = new ClusterSettings(system.settings.config, system.name)
|
||||||
import settings._
|
import settings._
|
||||||
|
import InfoLogger._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -88,7 +89,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
// ClusterJmx is initialized as the last thing in the constructor
|
// ClusterJmx is initialized as the last thing in the constructor
|
||||||
private var clusterJmx: Option[ClusterJmx] = None
|
private var clusterJmx: Option[ClusterJmx] = None
|
||||||
|
|
||||||
log.info("Cluster Node [{}] - is starting up...", selfAddress)
|
logInfo("Starting up...")
|
||||||
|
|
||||||
val failureDetector: FailureDetectorRegistry[Address] = {
|
val failureDetector: FailureDetectorRegistry[Address] = {
|
||||||
def createFailureDetector(): FailureDetector =
|
def createFailureDetector(): FailureDetector =
|
||||||
|
|
@ -107,7 +108,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
private[cluster] val scheduler: Scheduler = {
|
private[cluster] val scheduler: Scheduler = {
|
||||||
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
|
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
|
logInfo("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
|
||||||
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
|
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
|
||||||
(1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis)
|
(1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis)
|
||||||
|
|
||||||
|
|
@ -179,7 +180,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
Some(jmx)
|
Some(jmx)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Cluster Node [{}] - has started up successfully", selfAddress)
|
logInfo("Started up successfully")
|
||||||
|
|
||||||
// ======================================================
|
// ======================================================
|
||||||
// ===================== PUBLIC API =====================
|
// ===================== PUBLIC API =====================
|
||||||
|
|
@ -290,7 +291,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
*/
|
*/
|
||||||
private[cluster] def shutdown(): Unit = {
|
private[cluster] def shutdown(): Unit = {
|
||||||
if (_isTerminated.compareAndSet(false, true)) {
|
if (_isTerminated.compareAndSet(false, true)) {
|
||||||
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
|
logInfo("Shutting down...")
|
||||||
|
|
||||||
system.stop(clusterDaemons)
|
system.stop(clusterDaemons)
|
||||||
if (readViewStarted) readView.close()
|
if (readViewStarted) readView.close()
|
||||||
|
|
@ -299,7 +300,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
clusterJmx foreach { _.unregisterMBean() }
|
clusterJmx foreach { _.unregisterMBean() }
|
||||||
|
|
||||||
log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress)
|
logInfo("Successfully shut down")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -308,4 +309,19 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[cluster] object InfoLogger {
|
||||||
|
|
||||||
|
def logInfo(message: String): Unit =
|
||||||
|
if (LogInfo) log.info("Cluster Node [{}] - {}", selfAddress, message)
|
||||||
|
|
||||||
|
def logInfo(template: String, arg1: Any): Unit =
|
||||||
|
if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1)
|
||||||
|
|
||||||
|
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
|
||||||
|
if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -221,6 +221,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector }
|
import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector }
|
||||||
import cluster.settings._
|
import cluster.settings._
|
||||||
|
import cluster.InfoLogger._
|
||||||
|
|
||||||
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
|
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
|
||||||
|
|
||||||
|
|
@ -265,12 +266,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick))
|
Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def preStart(): Unit = {
|
override def preStart(): Unit =
|
||||||
if (SeedNodes.isEmpty)
|
if (SeedNodes.isEmpty)
|
||||||
log.info("No seed-nodes configured, manual cluster join required")
|
logInfo("No seed-nodes configured, manual cluster join required")
|
||||||
else
|
else
|
||||||
self ! JoinSeedNodes(SeedNodes)
|
self ! JoinSeedNodes(SeedNodes)
|
||||||
}
|
|
||||||
|
|
||||||
override def postStop(): Unit = {
|
override def postStop(): Unit = {
|
||||||
gossipTask.cancel()
|
gossipTask.cancel()
|
||||||
|
|
@ -318,7 +318,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
case SendGossipTo(address) ⇒ sendGossipTo(address)
|
case SendGossipTo(address) ⇒ sendGossipTo(address)
|
||||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||||
case ClusterUserAction.JoinTo(address) ⇒
|
case ClusterUserAction.JoinTo(address) ⇒
|
||||||
log.info("Trying to join [{}] when already part of a cluster, ignoring", address)
|
logInfo("Trying to join [{}] when already part of a cluster, ignoring", address)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -419,9 +419,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
val isUnreachable = localUnreachable.exists(_.address == node.address)
|
val isUnreachable = localUnreachable.exists(_.address == node.address)
|
||||||
|
|
||||||
if (alreadyMember)
|
if (alreadyMember)
|
||||||
log.info("Existing member [{}] is trying to join, ignoring", node)
|
logInfo("Existing member [{}] is trying to join, ignoring", node)
|
||||||
else if (isUnreachable)
|
else if (isUnreachable)
|
||||||
log.info("Unreachable member [{}] is trying to join, ignoring", node)
|
logInfo("Unreachable member [{}] is trying to join, ignoring", node)
|
||||||
else {
|
else {
|
||||||
|
|
||||||
// remove the node from the failure detector
|
// remove the node from the failure detector
|
||||||
|
|
@ -434,7 +434,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
|
|
||||||
updateLatestGossip(newGossip)
|
updateLatestGossip(newGossip)
|
||||||
|
|
||||||
log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", "))
|
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
|
||||||
if (node != selfUniqueAddress) {
|
if (node != selfUniqueAddress) {
|
||||||
sender ! Welcome(selfUniqueAddress, latestGossip)
|
sender ! Welcome(selfUniqueAddress, latestGossip)
|
||||||
}
|
}
|
||||||
|
|
@ -450,9 +450,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = {
|
def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = {
|
||||||
require(latestGossip.members.isEmpty, "Join can only be done from empty state")
|
require(latestGossip.members.isEmpty, "Join can only be done from empty state")
|
||||||
if (joinWith != from.address)
|
if (joinWith != from.address)
|
||||||
log.info("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
|
logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
|
||||||
else {
|
else {
|
||||||
log.info("Cluster Node [{}] - Welcome from [{}]", selfAddress, from.address)
|
logInfo("Welcome from [{}]", from.address)
|
||||||
latestGossip = gossip seen selfUniqueAddress
|
latestGossip = gossip seen selfUniqueAddress
|
||||||
publish(latestGossip)
|
publish(latestGossip)
|
||||||
if (from != selfUniqueAddress)
|
if (from != selfUniqueAddress)
|
||||||
|
|
@ -474,7 +474,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
|
|
||||||
updateLatestGossip(newGossip)
|
updateLatestGossip(newGossip)
|
||||||
|
|
||||||
log.info("Cluster Node [{}] - Marked address [{}] as [{}]", selfAddress, address, Leaving)
|
logInfo("Marked address [{}] as [{}]", address, Leaving)
|
||||||
publish(latestGossip)
|
publish(latestGossip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -482,10 +482,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
/**
|
/**
|
||||||
* This method is called when a member sees itself as Exiting.
|
* This method is called when a member sees itself as Exiting.
|
||||||
*/
|
*/
|
||||||
def shutdown(): Unit = {
|
def shutdown(): Unit = cluster.shutdown()
|
||||||
log.info("Cluster Node [{}] - Node shutting down...", latestGossip.member(selfUniqueAddress))
|
|
||||||
cluster.shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State transition to DOW.
|
* State transition to DOW.
|
||||||
|
|
@ -508,7 +505,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
|
|
||||||
val newMembers = downedMember match {
|
val newMembers = downedMember match {
|
||||||
case Some(m) ⇒
|
case Some(m) ⇒
|
||||||
log.info("Cluster Node [{}] - Marking node [{}] as [{}]", selfAddress, m.address, Down)
|
logInfo("Marking node [{}] as [{}]", m.address, Down)
|
||||||
localMembers - m
|
localMembers - m
|
||||||
case None ⇒ localMembers
|
case None ⇒ localMembers
|
||||||
}
|
}
|
||||||
|
|
@ -518,7 +515,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
localUnreachableMembers.map { member ⇒
|
localUnreachableMembers.map { member ⇒
|
||||||
// no need to DOWN members already DOWN
|
// no need to DOWN members already DOWN
|
||||||
if (member.address == address && member.status != Down) {
|
if (member.address == address && member.status != Down) {
|
||||||
log.info("Cluster Node [{}] - Marking unreachable node [{}] as [{}]", selfAddress, member.address, Down)
|
logInfo("Marking unreachable node [{}] as [{}]", member.address, Down)
|
||||||
member copy (status = Down)
|
member copy (status = Down)
|
||||||
} else member
|
} else member
|
||||||
}
|
}
|
||||||
|
|
@ -540,9 +537,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
def receiveGossipStatus(status: GossipStatus): Unit = {
|
def receiveGossipStatus(status: GossipStatus): Unit = {
|
||||||
val from = status.from
|
val from = status.from
|
||||||
if (latestGossip.overview.unreachable.exists(_.uniqueAddress == from))
|
if (latestGossip.overview.unreachable.exists(_.uniqueAddress == from))
|
||||||
log.info("Ignoring received gossip status from unreachable [{}] ", from)
|
logInfo("Ignoring received gossip status from unreachable [{}] ", from)
|
||||||
else if (latestGossip.members.forall(_.uniqueAddress != from))
|
else if (latestGossip.members.forall(_.uniqueAddress != from))
|
||||||
log.info("Ignoring received gossip status from unknown [{}]", from)
|
logInfo("Ignoring received gossip status from unknown [{}]", from)
|
||||||
else {
|
else {
|
||||||
(status.version tryCompareTo latestGossip.version) match {
|
(status.version tryCompareTo latestGossip.version) match {
|
||||||
case Some(0) ⇒ // same version
|
case Some(0) ⇒ // same version
|
||||||
|
|
@ -561,15 +558,15 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
val localGossip = latestGossip
|
val localGossip = latestGossip
|
||||||
|
|
||||||
if (envelope.to != selfUniqueAddress)
|
if (envelope.to != selfUniqueAddress)
|
||||||
log.info("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
|
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
|
||||||
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress))
|
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress))
|
||||||
log.info("Ignoring received gossip with myself as unreachable, from [{}]", selfAddress, from.address)
|
logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address)
|
||||||
else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from))
|
else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from))
|
||||||
log.info("Ignoring received gossip from unreachable [{}] ", from)
|
logInfo("Ignoring received gossip from unreachable [{}] ", from)
|
||||||
else if (localGossip.members.forall(_.uniqueAddress != from))
|
else if (localGossip.members.forall(_.uniqueAddress != from))
|
||||||
log.info("Ignoring received gossip from unknown [{}]", from)
|
logInfo("Ignoring received gossip from unknown [{}]", from)
|
||||||
else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress))
|
else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress))
|
||||||
log.info("Ignoring received gossip that does not contain myself, from [{}]", from)
|
logInfo("Ignoring received gossip that does not contain myself, from [{}]", from)
|
||||||
else {
|
else {
|
||||||
|
|
||||||
val comparison = remoteGossip.version tryCompareTo localGossip.version
|
val comparison = remoteGossip.version tryCompareTo localGossip.version
|
||||||
|
|
@ -749,14 +746,13 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
|
|
||||||
// log status changes
|
// log status changes
|
||||||
changedMembers foreach { m ⇒
|
changedMembers foreach { m ⇒
|
||||||
log.info("Cluster Node [{}] - Leader is moving node [{}] to [{}]",
|
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
|
||||||
selfAddress, m.address, m.status)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// log the removal of the unreachable nodes
|
// log the removal of the unreachable nodes
|
||||||
removedUnreachable foreach { m ⇒
|
removedUnreachable foreach { m ⇒
|
||||||
val status = if (m.status == Exiting) "exiting" else "unreachable"
|
val status = if (m.status == Exiting) "exiting" else "unreachable"
|
||||||
log.info("Cluster Node [{}] - Leader is removing {} node [{}]", selfAddress, status, m.address)
|
logInfo("Leader is removing {} node [{}]", status, m.address)
|
||||||
}
|
}
|
||||||
|
|
||||||
publish(latestGossip)
|
publish(latestGossip)
|
||||||
|
|
@ -807,7 +803,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
|
|
||||||
// log the auto-downing of the unreachable nodes
|
// log the auto-downing of the unreachable nodes
|
||||||
changedUnreachableMembers foreach { m ⇒
|
changedUnreachableMembers foreach { m ⇒
|
||||||
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, m.address, m.status)
|
logInfo("Leader is marking unreachable node [{}] as [{}]", m.address, m.status)
|
||||||
}
|
}
|
||||||
|
|
||||||
publish(latestGossip)
|
publish(latestGossip)
|
||||||
|
|
@ -844,8 +840,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
if (nonExiting.nonEmpty)
|
if (nonExiting.nonEmpty)
|
||||||
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", "))
|
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", "))
|
||||||
if (exiting.nonEmpty)
|
if (exiting.nonEmpty)
|
||||||
log.info("Cluster Node [{}] - Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
|
logInfo("Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
|
||||||
selfAddress, exiting.mkString(", "))
|
exiting.mkString(", "))
|
||||||
|
|
||||||
publish(latestGossip)
|
publish(latestGossip)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -88,6 +88,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
import cluster.{ selfAddress, scheduler }
|
import cluster.{ selfAddress, scheduler }
|
||||||
import cluster.settings._
|
import cluster.settings._
|
||||||
|
import cluster.InfoLogger._
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
|
|
||||||
val selfHeartbeat = Heartbeat(selfAddress)
|
val selfHeartbeat = Heartbeat(selfAddress)
|
||||||
|
|
@ -170,7 +171,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
|
|
||||||
def triggerFirstHeartbeat(address: Address): Unit =
|
def triggerFirstHeartbeat(address: Address): Unit =
|
||||||
if (!cluster.failureDetector.isMonitoring(address)) {
|
if (!cluster.failureDetector.isMonitoring(address)) {
|
||||||
log.info("Trigger extra expected heartbeat from [{}]", address)
|
logInfo("Trigger extra expected heartbeat from [{}]", address)
|
||||||
cluster.failureDetector.heartbeat(address)
|
cluster.failureDetector.heartbeat(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -91,6 +91,7 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
||||||
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
|
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
|
||||||
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
|
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
|
||||||
private def clusterView = cluster.readView
|
private def clusterView = cluster.readView
|
||||||
|
import cluster.InfoLogger._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the cluster JMX MBean and registers it in the MBean server.
|
* Creates the cluster JMX MBean and registers it in the MBean server.
|
||||||
|
|
@ -130,7 +131,7 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
mBeanServer.registerMBean(mbean, clusterMBeanName)
|
mBeanServer.registerMBean(mbean, clusterMBeanName)
|
||||||
log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterView.selfAddress, clusterMBeanName)
|
logInfo("Registered cluster JMX MBean [{}]", clusterMBeanName)
|
||||||
} catch {
|
} catch {
|
||||||
case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
import cluster.{ selfAddress, scheduler, settings }
|
import cluster.{ selfAddress, scheduler, settings }
|
||||||
import settings._
|
import cluster.settings._
|
||||||
|
import cluster.InfoLogger._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The node ring gossipped that contains only members that are Up.
|
* The node ring gossipped that contains only members that are Up.
|
||||||
|
|
@ -78,7 +79,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
||||||
override def preStart(): Unit = {
|
override def preStart(): Unit = {
|
||||||
cluster.subscribe(self, classOf[MemberEvent])
|
cluster.subscribe(self, classOf[MemberEvent])
|
||||||
cluster.subscribe(self, classOf[UnreachableMember])
|
cluster.subscribe(self, classOf[UnreachableMember])
|
||||||
log.info("Metrics collection has started successfully on node [{}]", selfAddress)
|
logInfo("Metrics collection has started successfully")
|
||||||
}
|
}
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
@ -782,10 +783,11 @@ private[cluster] object MetricsCollector {
|
||||||
Try(new SigarMetricsCollector(system)) match {
|
Try(new SigarMetricsCollector(system)) match {
|
||||||
case Success(sigarCollector) ⇒ sigarCollector
|
case Success(sigarCollector) ⇒ sigarCollector
|
||||||
case Failure(e) ⇒
|
case Failure(e) ⇒
|
||||||
log.info("Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
|
Cluster(system).InfoLogger.logInfo(
|
||||||
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
|
"Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
|
||||||
"platform-specific native libary to 'java.library.path'. Reason: " +
|
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
|
||||||
e.toString)
|
"platform-specific native libary to 'java.library.path'. Reason: " +
|
||||||
|
e.toString)
|
||||||
new JmxMetricsCollector(system)
|
new JmxMetricsCollector(system)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,78 +16,79 @@ import akka.util.Helpers.Requiring
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import akka.japi.Util.immutableSeq
|
import akka.japi.Util.immutableSeq
|
||||||
|
|
||||||
class ClusterSettings(val config: Config, val systemName: String) {
|
final class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
|
|
||||||
private val cc = config.getConfig("akka.cluster")
|
private val cc = config.getConfig("akka.cluster")
|
||||||
|
|
||||||
final val FailureDetectorConfig: Config = cc.getConfig("failure-detector")
|
val LogInfo: Boolean = cc.getBoolean("log-info")
|
||||||
final val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class")
|
val FailureDetectorConfig: Config = cc.getConfig("failure-detector")
|
||||||
final val HeartbeatInterval: FiniteDuration = {
|
val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class")
|
||||||
|
val HeartbeatInterval: FiniteDuration = {
|
||||||
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
|
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
|
||||||
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
|
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
|
||||||
final val HeartbeatRequestDelay: FiniteDuration = {
|
val HeartbeatRequestDelay: FiniteDuration = {
|
||||||
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.grace-period"), MILLISECONDS)
|
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.grace-period"), MILLISECONDS)
|
||||||
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0")
|
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0")
|
||||||
final val HeartbeatExpectedResponseAfter: FiniteDuration = {
|
val HeartbeatExpectedResponseAfter: FiniteDuration = {
|
||||||
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.expected-response-after"), MILLISECONDS)
|
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.expected-response-after"), MILLISECONDS)
|
||||||
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0")
|
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0")
|
||||||
final val HeartbeatRequestTimeToLive: FiniteDuration = {
|
val HeartbeatRequestTimeToLive: FiniteDuration = {
|
||||||
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS)
|
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS)
|
||||||
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0")
|
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0")
|
||||||
final val NumberOfEndHeartbeats: Int = {
|
val NumberOfEndHeartbeats: Int = {
|
||||||
FailureDetectorConfig.getInt("nr-of-end-heartbeats")
|
FailureDetectorConfig.getInt("nr-of-end-heartbeats")
|
||||||
} requiring (_ > 0, "failure-detector.nr-of-end-heartbeats must be > 0")
|
} requiring (_ > 0, "failure-detector.nr-of-end-heartbeats must be > 0")
|
||||||
final val MonitoredByNrOfMembers: Int = {
|
val MonitoredByNrOfMembers: Int = {
|
||||||
FailureDetectorConfig.getInt("monitored-by-nr-of-members")
|
FailureDetectorConfig.getInt("monitored-by-nr-of-members")
|
||||||
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")
|
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")
|
||||||
|
|
||||||
final val SeedNodes: immutable.IndexedSeq[Address] =
|
val SeedNodes: immutable.IndexedSeq[Address] =
|
||||||
immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector
|
immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector
|
||||||
final val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS)
|
val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS)
|
||||||
final val RetryUnsuccessfulJoinAfter: Duration = {
|
val RetryUnsuccessfulJoinAfter: Duration = {
|
||||||
val key = "retry-unsuccessful-join-after"
|
val key = "retry-unsuccessful-join-after"
|
||||||
cc.getString(key).toLowerCase match {
|
cc.getString(key).toLowerCase match {
|
||||||
case "off" ⇒ Duration.Undefined
|
case "off" ⇒ Duration.Undefined
|
||||||
case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ > Duration.Zero, key + " > 0s, or off")
|
case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ > Duration.Zero, key + " > 0s, or off")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS)
|
val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS)
|
||||||
final val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS)
|
val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS)
|
||||||
final val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS)
|
val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS)
|
||||||
final val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS)
|
val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS)
|
||||||
final val PublishStatsInterval: Duration = {
|
val PublishStatsInterval: Duration = {
|
||||||
val key = "publish-stats-interval"
|
val key = "publish-stats-interval"
|
||||||
cc.getString(key).toLowerCase match {
|
cc.getString(key).toLowerCase match {
|
||||||
case "off" ⇒ Duration.Undefined
|
case "off" ⇒ Duration.Undefined
|
||||||
case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ >= Duration.Zero, key + " >= 0s, or off")
|
case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ >= Duration.Zero, key + " >= 0s, or off")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final val AutoDown: Boolean = cc.getBoolean("auto-down")
|
val AutoDown: Boolean = cc.getBoolean("auto-down")
|
||||||
final val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet
|
val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet
|
||||||
final val MinNrOfMembers: Int = {
|
val MinNrOfMembers: Int = {
|
||||||
cc.getInt("min-nr-of-members")
|
cc.getInt("min-nr-of-members")
|
||||||
} requiring (_ > 0, "min-nr-of-members must be > 0")
|
} requiring (_ > 0, "min-nr-of-members must be > 0")
|
||||||
final val MinNrOfMembersOfRole: Map[String, Int] = {
|
val MinNrOfMembersOfRole: Map[String, Int] = {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
cc.getConfig("role").root.asScala.collect {
|
cc.getConfig("role").root.asScala.collect {
|
||||||
case (key, value: ConfigObject) ⇒ (key -> value.toConfig.getInt("min-nr-of-members"))
|
case (key, value: ConfigObject) ⇒ (key -> value.toConfig.getInt("min-nr-of-members"))
|
||||||
}.toMap
|
}.toMap
|
||||||
}
|
}
|
||||||
final val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
|
val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
|
||||||
final val UseDispatcher: String = cc.getString("use-dispatcher") match {
|
val UseDispatcher: String = cc.getString("use-dispatcher") match {
|
||||||
case "" ⇒ Dispatchers.DefaultDispatcherId
|
case "" ⇒ Dispatchers.DefaultDispatcherId
|
||||||
case id ⇒ id
|
case id ⇒ id
|
||||||
}
|
}
|
||||||
final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability")
|
val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability")
|
||||||
final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS)
|
val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS)
|
||||||
final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
|
val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
|
||||||
final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled")
|
val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled")
|
||||||
final val MetricsCollectorClass: String = cc.getString("metrics.collector-class")
|
val MetricsCollectorClass: String = cc.getString("metrics.collector-class")
|
||||||
final val MetricsInterval: FiniteDuration = {
|
val MetricsInterval: FiniteDuration = {
|
||||||
Duration(cc.getMilliseconds("metrics.collect-interval"), MILLISECONDS)
|
Duration(cc.getMilliseconds("metrics.collect-interval"), MILLISECONDS)
|
||||||
} requiring (_ > Duration.Zero, "metrics.collect-interval must be > 0")
|
} requiring (_ > Duration.Zero, "metrics.collect-interval must be > 0")
|
||||||
final val MetricsGossipInterval: FiniteDuration = Duration(cc.getMilliseconds("metrics.gossip-interval"), MILLISECONDS)
|
val MetricsGossipInterval: FiniteDuration = Duration(cc.getMilliseconds("metrics.gossip-interval"), MILLISECONDS)
|
||||||
final val MetricsMovingAverageHalfLife: FiniteDuration = {
|
val MetricsMovingAverageHalfLife: FiniteDuration = {
|
||||||
Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS)
|
Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS)
|
||||||
} requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0")
|
} requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ class ClusterConfigSpec extends AkkaSpec {
|
||||||
"be able to parse generic cluster config elements" in {
|
"be able to parse generic cluster config elements" in {
|
||||||
val settings = new ClusterSettings(system.settings.config, system.name)
|
val settings = new ClusterSettings(system.settings.config, system.name)
|
||||||
import settings._
|
import settings._
|
||||||
|
LogInfo must be(true)
|
||||||
FailureDetectorConfig.getDouble("threshold") must be(8.0 plusOrMinus 0.0001)
|
FailureDetectorConfig.getDouble("threshold") must be(8.0 plusOrMinus 0.0001)
|
||||||
FailureDetectorConfig.getInt("max-sample-size") must be(1000)
|
FailureDetectorConfig.getInt("max-sample-size") must be(1000)
|
||||||
Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis)
|
Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis)
|
||||||
|
|
|
||||||
|
|
@ -33,10 +33,9 @@ object ClusterSingletonManager {
|
||||||
role: Option[String],
|
role: Option[String],
|
||||||
maxHandOverRetries: Int = 20,
|
maxHandOverRetries: Int = 20,
|
||||||
maxTakeOverRetries: Int = 15,
|
maxTakeOverRetries: Int = 15,
|
||||||
retryInterval: FiniteDuration = 1.second,
|
retryInterval: FiniteDuration = 1.second): Props =
|
||||||
loggingEnabled: Boolean = true): Props =
|
|
||||||
Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role,
|
Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role,
|
||||||
maxHandOverRetries, maxTakeOverRetries, retryInterval, loggingEnabled)
|
maxHandOverRetries, maxTakeOverRetries, retryInterval)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]].
|
* Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]].
|
||||||
|
|
@ -48,7 +47,6 @@ object ClusterSingletonManager {
|
||||||
maxHandOverRetries: Int,
|
maxHandOverRetries: Int,
|
||||||
maxTakeOverRetries: Int,
|
maxTakeOverRetries: Int,
|
||||||
retryInterval: FiniteDuration,
|
retryInterval: FiniteDuration,
|
||||||
loggingEnabled: Boolean,
|
|
||||||
singletonPropsFactory: ClusterSingletonPropsFactory): Props =
|
singletonPropsFactory: ClusterSingletonPropsFactory): Props =
|
||||||
props(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
|
props(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
|
||||||
ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval)
|
ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval)
|
||||||
|
|
@ -368,8 +366,6 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
|
||||||
* stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to
|
* stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to
|
||||||
* ensure that new oldest doesn't start singleton actor before previous is
|
* ensure that new oldest doesn't start singleton actor before previous is
|
||||||
* stopped for certain corner cases.
|
* stopped for certain corner cases.
|
||||||
*
|
|
||||||
* '''''loggingEnabled''''' Logging of what is going on at info log level.
|
|
||||||
*/
|
*/
|
||||||
class ClusterSingletonManager(
|
class ClusterSingletonManager(
|
||||||
singletonProps: Option[Any] ⇒ Props,
|
singletonProps: Option[Any] ⇒ Props,
|
||||||
|
|
@ -378,8 +374,7 @@ class ClusterSingletonManager(
|
||||||
role: Option[String],
|
role: Option[String],
|
||||||
maxHandOverRetries: Int,
|
maxHandOverRetries: Int,
|
||||||
maxTakeOverRetries: Int,
|
maxTakeOverRetries: Int,
|
||||||
retryInterval: FiniteDuration,
|
retryInterval: FiniteDuration)
|
||||||
loggingEnabled: Boolean)
|
|
||||||
extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] {
|
extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] {
|
||||||
|
|
||||||
// to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases
|
// to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases
|
||||||
|
|
@ -392,6 +387,7 @@ class ClusterSingletonManager(
|
||||||
|
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
val selfAddressOption = Some(cluster.selfAddress)
|
val selfAddressOption = Some(cluster.selfAddress)
|
||||||
|
import cluster.settings.LogInfo
|
||||||
|
|
||||||
require(role.forall(cluster.selfRoles.contains),
|
require(role.forall(cluster.selfRoles.contains),
|
||||||
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
|
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
|
||||||
|
|
@ -414,13 +410,13 @@ class ClusterSingletonManager(
|
||||||
}
|
}
|
||||||
|
|
||||||
def logInfo(message: String): Unit =
|
def logInfo(message: String): Unit =
|
||||||
if (loggingEnabled) log.info(message)
|
if (LogInfo) log.info(message)
|
||||||
|
|
||||||
def logInfo(template: String, arg1: Any): Unit =
|
def logInfo(template: String, arg1: Any): Unit =
|
||||||
if (loggingEnabled) log.info(template, arg1)
|
if (LogInfo) log.info(template, arg1)
|
||||||
|
|
||||||
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
|
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
|
||||||
if (loggingEnabled) log.info(template, arg1, arg2)
|
if (LogInfo) log.info(template, arg1, arg2)
|
||||||
|
|
||||||
override def preStart(): Unit = {
|
override def preStart(): Unit = {
|
||||||
super.preStart()
|
super.preStart()
|
||||||
|
|
|
||||||
|
|
@ -711,6 +711,13 @@ reference file for more information:
|
||||||
.. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf
|
.. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf
|
||||||
:language: none
|
:language: none
|
||||||
|
|
||||||
|
Cluster Info Logging
|
||||||
|
--------------------
|
||||||
|
|
||||||
|
You can silence the logging of cluster events at info level with configuration property::
|
||||||
|
|
||||||
|
akka.cluster.log-info = off
|
||||||
|
|
||||||
Cluster Scheduler
|
Cluster Scheduler
|
||||||
-----------------
|
-----------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -731,6 +731,13 @@ reference file for more information:
|
||||||
.. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf
|
.. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf
|
||||||
:language: none
|
:language: none
|
||||||
|
|
||||||
|
Cluster Info Logging
|
||||||
|
--------------------
|
||||||
|
|
||||||
|
You can silence the logging of cluster events at info level with configuration property::
|
||||||
|
|
||||||
|
akka.cluster.log-info = off
|
||||||
|
|
||||||
Cluster Scheduler
|
Cluster Scheduler
|
||||||
-----------------
|
-----------------
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue