diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index 4eb27e836e..8c53108a40 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -16,17 +16,65 @@ import javax.management.InstanceNotFoundException * Interface for the cluster JMX MBean. */ trait ClusterNodeMBean { + + /** + * Member status for this node. + */ def getMemberStatus: String + + /** + * Comma separated addresses of member nodes, sorted in the cluster ring order. + */ + def getMembers: String + + /** + * Comma separated addresses of unreachable member nodes. + */ + def getUnreachable: String + + /* + * String that will list all nodes in the node ring as follows: + * {{{ + * Members: + * Member(address = akka://system0@localhost:5550, status = Up) + * Member(address = akka://system1@localhost:5551, status = Up) + * Unreachable: + * Member(address = akka://system2@localhost:5553, status = Down) + * }}} + */ def getClusterStatus: String + + /** + * Get the address of the current leader. + */ def getLeader: String + /** + * Does the cluster consist of only one member? + */ def isSingleton: Boolean - def isConvergence: Boolean + + /** + * Returns true if the node is UP or JOINING. + */ def isAvailable: Boolean + /** + * Returns true if the cluster node is up and running, false if it is shut down. + */ def isRunning: Boolean + /** + * Try to join this cluster node with the node specified by 'address'. + * A 'Join(thisNodeAddress)' command is sent to the node to join. + */ def join(address: String) + /** + * Send command to issue state transition to LEAVING for the node specified by 'address'. + */ def leave(address: String) + /** + * Send command to DOWN the node specified by 'address'. + */ def down(address: String) } @@ -47,30 +95,24 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { // JMX attributes (bean-style) - /* - * Sends a string to the JMX client that will list all nodes in the node ring as follows: - * {{{ - * Members: - * Member(address = akka://system0@localhost:5550, status = Up) - * Member(address = akka://system1@localhost:5551, status = Up) - * Unreachable: - * Member(address = akka://system2@localhost:5553, status = Down) - * }}} - */ def getClusterStatus: String = { val unreachable = clusterView.unreachableMembers "\nMembers:\n\t" + clusterView.members.mkString("\n\t") + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } } + def getMembers: String = + clusterView.members.toSeq.map(_.address).mkString(", ") + + def getUnreachable: String = + clusterView.unreachableMembers.map(_.address).mkString(", ") + def getMemberStatus: String = clusterView.status.toString - def getLeader: String = clusterView.leader.toString + def getLeader: String = clusterView.leader.getOrElse("").toString def isSingleton: Boolean = clusterView.isSingletonCluster - def isConvergence: Boolean = clusterView.convergence - def isAvailable: Boolean = clusterView.isAvailable def isRunning: Boolean = clusterView.isRunning diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index deb817f03c..c1f42048ee 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -108,7 +108,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def leader: Option[Address] = state.leader /** - * Is this node a singleton cluster? + * Does the cluster consist of only one member? */ def isSingletonCluster: Boolean = members.size == 1 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala new file mode 100644 index 0000000000..6b714e03f0 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import language.postfixOps +import com.typesafe.config.ConfigFactory +import scala.concurrent.duration._ +import java.lang.management.ManagementFactory +import javax.management.InstanceNotFoundException +import javax.management.ObjectName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import scala.util.Try + +object MBeanMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka.cluster.jmx.enabled = on + """)).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class MBeanMultiJvmNode1 extends MBeanSpec +class MBeanMultiJvmNode2 extends MBeanSpec +class MBeanMultiJvmNode3 extends MBeanSpec +class MBeanMultiJvmNode4 extends MBeanSpec + +abstract class MBeanSpec + extends MultiNodeSpec(MBeanMultiJvmSpec) + with MultiNodeClusterSpec { + + import MBeanMultiJvmSpec._ + import ClusterEvent._ + + val mbeanName = new ObjectName("akka:type=Cluster") + lazy val mbeanServer = ManagementFactory.getPlatformMBeanServer + + "Cluster MBean" must { + "expose attributes" taggedAs LongRunningTest in { + val info = mbeanServer.getMBeanInfo(mbeanName) + info.getAttributes.map(_.getName).toSet must be(Set( + "ClusterStatus", "Members", "Unreachable", "MemberStatus", "Leader", "Singleton", "Available", "Running")) + enterBarrier("after-1") + } + + "expose operations" taggedAs LongRunningTest in { + val info = mbeanServer.getMBeanInfo(mbeanName) + info.getOperations.map(_.getName).toSet must be(Set( + "join", "leave", "down")) + enterBarrier("after-2") + } + + "change attributes after startup" taggedAs LongRunningTest in { + runOn(first) { + mbeanServer.getAttribute(mbeanName, "Running").asInstanceOf[Boolean] must be(true) + mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] must be(true) + mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false) + mbeanServer.getAttribute(mbeanName, "Leader") must be("") + mbeanServer.getAttribute(mbeanName, "Members") must be("") + mbeanServer.getAttribute(mbeanName, "Unreachable") must be("") + mbeanServer.getAttribute(mbeanName, "MemberStatus") must be("Removed") + } + awaitClusterUp(first) + runOn(first) { + awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up") + awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == address(first).toString) + mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(true) + mbeanServer.getAttribute(mbeanName, "Members") must be(address(first).toString) + mbeanServer.getAttribute(mbeanName, "Unreachable") must be("") + } + enterBarrier("after-3") + } + + "support join" taggedAs LongRunningTest in { + runOn(second, third, fourth) { + mbeanServer.invoke(mbeanName, "join", Array(address(first).toString), Array("java.lang.String")) + } + enterBarrier("joined") + + awaitUpConvergence(4) + assertMembers(clusterView.members, roles.map(address(_)): _*) + awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up") + val expectedMembers = roles.sorted.map(address(_)).mkString(", ") + awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers) + val expectedLeader = address(roleOfLeader()) + awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == expectedLeader.toString) + mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false) + + enterBarrier("after-4") + } + + "support down" taggedAs LongRunningTest in { + val fourthAddress = address(fourth) + runOn(first) { + testConductor.shutdown(fourth, 0).await + } + enterBarrier("fourth-shutdown") + + runOn(first, second, third) { + awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == fourthAddress.toString) + val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(", ") + awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers) + } + enterBarrier("fourth-unreachable") + + runOn(second) { + mbeanServer.invoke(mbeanName, "down", Array(fourthAddress.toString), Array("java.lang.String")) + } + enterBarrier("fourth-down") + + runOn(first, second, third) { + awaitUpConvergence(3, canNotBePartOfMemberRing = List(fourthAddress)) + assertMembers(clusterView.members, first, second, third) + } + + enterBarrier("after-5") + } + + "support leave" taggedAs LongRunningTest in within(20 seconds) { + runOn(second) { + mbeanServer.invoke(mbeanName, "leave", Array(address(third).toString), Array("java.lang.String")) + } + enterBarrier("third-left") + runOn(first, second) { + awaitUpConvergence(2) + assertMembers(clusterView.members, first, second) + val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(", ") + awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers) + } + runOn(third) { + awaitCond(!cluster.isRunning) + // mbean should be unregistered, i.e. throw InstanceNotFoundException + awaitCond(Try { mbeanServer.getMBeanInfo(mbeanName); false } recover { + case e: InstanceNotFoundException ⇒ true + case _ ⇒ false + } get) + } + + enterBarrier("after-6") + } + + } +} diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index c1f00d638e..647a40d8a1 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -573,6 +573,8 @@ Run it without parameters to see instructions about how to use the script:: leave - Sends a request for node with URL to LEAVE the cluster down - Sends a request for marking node with URL as DOWN member-status - Asks the member node for its current status + members - Asks the cluster for addresses of current members + unreachable - Asks the cluster for addresses of unreachable members cluster-status - Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.) leader - Asks the cluster who the current leader is @@ -580,7 +582,6 @@ Run it without parameters to see instructions about how to use the script:: node cluster) is-available - Checks if the member node is available is-running - Checks if the member node is running - has-convergence - Checks if there is a cluster convergence Where the should be on the format of 'akka://actor-system-name@hostname:port' diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index 5e0bc94e2b..0383f38a90 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -579,6 +579,8 @@ Run it without parameters to see instructions about how to use the script:: leave - Sends a request for node with URL to LEAVE the cluster down - Sends a request for marking node with URL as DOWN member-status - Asks the member node for its current status + members - Asks the cluster for addresses of current members + unreachable - Asks the cluster for addresses of unreachable members cluster-status - Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.) leader - Asks the cluster who the current leader is @@ -586,7 +588,6 @@ Run it without parameters to see instructions about how to use the script:: node cluster) is-available - Checks if the member node is available is-running - Checks if the member node is running - has-convergence - Checks if there is a cluster convergence Where the should be on the format of 'akka://actor-system-name@hostname:port' diff --git a/akka-kernel/src/main/dist/bin/akka-cluster b/akka-kernel/src/main/dist/bin/akka-cluster index c7bb125c52..1d3b67bff1 100755 --- a/akka-kernel/src/main/dist/bin/akka-cluster +++ b/akka-kernel/src/main/dist/bin/akka-cluster @@ -103,6 +103,32 @@ case "$2" in $JMX_CLIENT $HOST akka:type=Cluster ClusterStatus ;; + members) + if [ $# -ne 2 ]; then + echo "Usage: $SELF members" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Querying members" + $JMX_CLIENT $HOST akka:type=Cluster Members + ;; + + unreachable) + if [ $# -ne 2 ]; then + echo "Usage: $SELF unreachable" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Querying unreachable members" + $JMX_CLIENT $HOST akka:type=Cluster Unreachable + ;; + leader) if [ $# -ne 2 ]; then echo "Usage: $SELF leader" @@ -129,19 +155,6 @@ case "$2" in $JMX_CLIENT $HOST akka:type=Cluster Singleton ;; - has-convergence) - if [ $# -ne 2 ]; then - echo "Usage: $SELF is-convergence" - exit 1 - fi - - ensureNodeIsRunningAndAvailable - shift - - echo "Checking for cluster convergence" - $JMX_CLIENT $HOST akka:type=Cluster Convergence - ;; - is-available) if [ $# -ne 2 ]; then echo "Usage: $SELF is-available" @@ -176,12 +189,13 @@ case "$2" in printf "%26s - %s\n" "leave " "Sends a request for node with URL to LEAVE the cluster" printf "%26s - %s\n" "down " "Sends a request for marking node with URL as DOWN" printf "%26s - %s\n" member-status "Asks the member node for its current status" + printf "%26s - %s\n" members "Asks the cluster for addresses of current members" + printf "%26s - %s\n" unreachable "Asks the cluster for addresses of unreachable members" printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)" printf "%26s - %s\n" leader "Asks the cluster who the current leader is" printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)" printf "%26s - %s\n" is-available "Checks if the member node is available" printf "%26s - %s\n" is-running "Checks if the member node is running" - printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence" printf "Where the should be on the format of 'akka://actor-system-name@hostname:port'\n" printf "\n" printf "Examples: bin/$SELF localhost:9999 is-available\n"