Add back Cluster JMX, see 2311

* Separate class
* Simple test
This commit is contained in:
Patrik Nordwall 2012-07-05 11:56:54 +02:00
parent ce9f530c32
commit 17f0ce9f89
3 changed files with 121 additions and 4 deletions

View file

@ -19,11 +19,9 @@ import akka.util.duration._
import akka.util.internal.HashedWheelTimer
import com.google.protobuf.ByteString
import java.io.Closeable
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit._
import javax.management._
import MemberStatus._
import scala.annotation.tailrec
import scala.collection.immutable.{ Map, SortedSet }
@ -1485,6 +1483,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
system.registerOnTermination(shutdown())
private val clusterJmx = new ClusterJmx(this, log)
clusterJmx.createMBean()
log.info("Cluster Node [{}] - has started up successfully", selfAddress)
// ======================================================
@ -1615,6 +1616,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
clusterScheduler.close()
clusterJmx.unregisterMBean()
log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress)
}
}
@ -1632,6 +1635,4 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
private[cluster] def latestStats: ClusterStats = _latestStats
// FIXME add back JMX
}

View file

@ -0,0 +1,107 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.lang.management.ManagementFactory
import javax.management.StandardMBean
import akka.event.LoggingAdapter
import akka.actor.AddressFromURIString
import javax.management.ObjectName
import javax.management.InstanceAlreadyExistsException
import javax.management.InstanceNotFoundException
/**
* Interface for the cluster JMX MBean.
*/
trait ClusterNodeMBean {
def getMemberStatus: String
def getClusterStatus: String
def getLeader: String
def isSingleton: Boolean
def isConvergence: Boolean
def isAvailable: Boolean
def isRunning: Boolean
def join(address: String)
def leave(address: String)
def down(address: String)
}
/**
* Internal API
*/
private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) {
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
/**
* Creates the cluster JMX MBean and registers it in the MBean server.
*/
def createMBean() = {
val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
// JMX attributes (bean-style)
/*
* Sends a string to the JMX client that will list all nodes in the node ring as follows:
* {{{
* Members:
* Member(address = akka://system0@localhost:5550, status = Up)
* Member(address = akka://system1@localhost:5551, status = Up)
* Unreachable:
* Member(address = akka://system2@localhost:5553, status = Down)
* }}}
*/
def getClusterStatus: String = {
val gossip = clusterNode.latestGossip
val unreachable = gossip.overview.unreachable
val metaData = gossip.meta
"\nMembers:\n\t" + gossip.members.mkString("\n\t") +
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } +
{ if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" }
}
def getMemberStatus: String = clusterNode.status.toString
def getLeader: String = clusterNode.leader.toString
def isSingleton: Boolean = clusterNode.isSingletonCluster
def isConvergence: Boolean = clusterNode.convergence.isDefined
def isAvailable: Boolean = clusterNode.isAvailable
def isRunning: Boolean = clusterNode.isRunning
// JMX commands
def join(address: String) = clusterNode.join(AddressFromURIString(address))
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
def down(address: String) = clusterNode.down(AddressFromURIString(address))
}
try {
mBeanServer.registerMBean(mbean, clusterMBeanName)
log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterNode.selfAddress, clusterMBeanName)
} catch {
case e: InstanceAlreadyExistsException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
}
/**
* Unregisters the cluster JMX MBean from MBean server.
*/
def unregisterMBean(): Unit = {
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {
case e: InstanceNotFoundException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
}
}

View file

@ -13,6 +13,8 @@ import akka.actor.Address
import java.util.concurrent.atomic.AtomicInteger
import akka.remote.RemoteActorRefProvider
import InternalClusterAction._
import java.lang.management.ManagementFactory
import javax.management.ObjectName
object ClusterSpec {
val config = """
@ -57,6 +59,13 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
cluster.selfAddress must be(selfAddress)
}
"register jmx mbean" in {
val name = new ObjectName("akka:type=Cluster")
val info = ManagementFactory.getPlatformMBeanServer.getMBeanInfo(name)
info.getAttributes.length must be > (0)
info.getOperations.length must be > (0)
}
"initially become singleton cluster when joining itself and reach convergence" in {
cluster.isSingletonCluster must be(false) // auto-join = off
cluster.join(selfAddress)