From 17f0ce9f89b15a42355d902b610d55bd6988d132 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 5 Jul 2012 11:56:54 +0200 Subject: [PATCH] Add back Cluster JMX, see 2311 * Separate class * Simple test --- .../src/main/scala/akka/cluster/Cluster.scala | 9 +- .../main/scala/akka/cluster/ClusterJmx.scala | 107 ++++++++++++++++++ .../test/scala/akka/cluster/ClusterSpec.scala | 9 ++ 3 files changed, 121 insertions(+), 4 deletions(-) create mode 100644 akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3b292b0d91..da0d7483a8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -19,11 +19,9 @@ import akka.util.duration._ import akka.util.internal.HashedWheelTimer import com.google.protobuf.ByteString import java.io.Closeable -import java.lang.management.ManagementFactory import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import java.util.concurrent.TimeoutException import java.util.concurrent.TimeUnit._ -import javax.management._ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } @@ -1485,6 +1483,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) system.registerOnTermination(shutdown()) + private val clusterJmx = new ClusterJmx(this, log) + clusterJmx.createMBean() + log.info("Cluster Node [{}] - has started up successfully", selfAddress) // ====================================================== @@ -1615,6 +1616,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) clusterScheduler.close() + clusterJmx.unregisterMBean() + log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress) } } @@ -1632,6 +1635,4 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private[cluster] def latestStats: ClusterStats = _latestStats - // FIXME add back JMX - } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala new file mode 100644 index 0000000000..944d90079b --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import java.lang.management.ManagementFactory +import javax.management.StandardMBean +import akka.event.LoggingAdapter +import akka.actor.AddressFromURIString +import javax.management.ObjectName +import javax.management.InstanceAlreadyExistsException +import javax.management.InstanceNotFoundException + +/** + * Interface for the cluster JMX MBean. + */ +trait ClusterNodeMBean { + def getMemberStatus: String + def getClusterStatus: String + def getLeader: String + + def isSingleton: Boolean + def isConvergence: Boolean + def isAvailable: Boolean + def isRunning: Boolean + + def join(address: String) + def leave(address: String) + def down(address: String) +} + +/** + * Internal API + */ +private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { + + private val mBeanServer = ManagementFactory.getPlatformMBeanServer + private val clusterMBeanName = new ObjectName("akka:type=Cluster") + + /** + * Creates the cluster JMX MBean and registers it in the MBean server. + */ + def createMBean() = { + val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean { + + // JMX attributes (bean-style) + + /* + * Sends a string to the JMX client that will list all nodes in the node ring as follows: + * {{{ + * Members: + * Member(address = akka://system0@localhost:5550, status = Up) + * Member(address = akka://system1@localhost:5551, status = Up) + * Unreachable: + * Member(address = akka://system2@localhost:5553, status = Down) + * }}} + */ + def getClusterStatus: String = { + val gossip = clusterNode.latestGossip + val unreachable = gossip.overview.unreachable + val metaData = gossip.meta + "\nMembers:\n\t" + gossip.members.mkString("\n\t") + + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + + { if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" } + } + + def getMemberStatus: String = clusterNode.status.toString + + def getLeader: String = clusterNode.leader.toString + + def isSingleton: Boolean = clusterNode.isSingletonCluster + + def isConvergence: Boolean = clusterNode.convergence.isDefined + + def isAvailable: Boolean = clusterNode.isAvailable + + def isRunning: Boolean = clusterNode.isRunning + + // JMX commands + + def join(address: String) = clusterNode.join(AddressFromURIString(address)) + + def leave(address: String) = clusterNode.leave(AddressFromURIString(address)) + + def down(address: String) = clusterNode.down(AddressFromURIString(address)) + } + try { + mBeanServer.registerMBean(mbean, clusterMBeanName) + log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterNode.selfAddress, clusterMBeanName) + } catch { + case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + } + } + + /** + * Unregisters the cluster JMX MBean from MBean server. + */ + def unregisterMBean(): Unit = { + try { + mBeanServer.unregisterMBean(clusterMBeanName) + } catch { + case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + } + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 5812586a3f..f660af3763 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -13,6 +13,8 @@ import akka.actor.Address import java.util.concurrent.atomic.AtomicInteger import akka.remote.RemoteActorRefProvider import InternalClusterAction._ +import java.lang.management.ManagementFactory +import javax.management.ObjectName object ClusterSpec { val config = """ @@ -57,6 +59,13 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { cluster.selfAddress must be(selfAddress) } + "register jmx mbean" in { + val name = new ObjectName("akka:type=Cluster") + val info = ManagementFactory.getPlatformMBeanServer.getMBeanInfo(name) + info.getAttributes.length must be > (0) + info.getOperations.length must be > (0) + } + "initially become singleton cluster when joining itself and reach convergence" in { cluster.isSingletonCluster must be(false) // auto-join = off cluster.join(selfAddress)