Tests for the Cluster JMX API, see #2018
* MBeanSpec * Added Members and Unreachable to JMX API * Removed Convergence from JMX API, because it will not be exposed when ticket #2692 is merged * Updated documentation and akka-cluster script
This commit is contained in:
parent
872d4c531a
commit
a7b7ab040d
6 changed files with 238 additions and 31 deletions
|
|
@ -16,17 +16,65 @@ import javax.management.InstanceNotFoundException
|
||||||
* Interface for the cluster JMX MBean.
|
* Interface for the cluster JMX MBean.
|
||||||
*/
|
*/
|
||||||
trait ClusterNodeMBean {
|
trait ClusterNodeMBean {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Member status for this node.
|
||||||
|
*/
|
||||||
def getMemberStatus: String
|
def getMemberStatus: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comma separated addresses of member nodes, sorted in the cluster ring order.
|
||||||
|
*/
|
||||||
|
def getMembers: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comma separated addresses of unreachable member nodes.
|
||||||
|
*/
|
||||||
|
def getUnreachable: String
|
||||||
|
|
||||||
|
/*
|
||||||
|
* String that will list all nodes in the node ring as follows:
|
||||||
|
* {{{
|
||||||
|
* Members:
|
||||||
|
* Member(address = akka://system0@localhost:5550, status = Up)
|
||||||
|
* Member(address = akka://system1@localhost:5551, status = Up)
|
||||||
|
* Unreachable:
|
||||||
|
* Member(address = akka://system2@localhost:5553, status = Down)
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
def getClusterStatus: String
|
def getClusterStatus: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the address of the current leader.
|
||||||
|
*/
|
||||||
def getLeader: String
|
def getLeader: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does the cluster consist of only one member?
|
||||||
|
*/
|
||||||
def isSingleton: Boolean
|
def isSingleton: Boolean
|
||||||
def isConvergence: Boolean
|
|
||||||
|
/**
|
||||||
|
* Returns true if the node is UP or JOINING.
|
||||||
|
*/
|
||||||
def isAvailable: Boolean
|
def isAvailable: Boolean
|
||||||
|
/**
|
||||||
|
* Returns true if the cluster node is up and running, false if it is shut down.
|
||||||
|
*/
|
||||||
def isRunning: Boolean
|
def isRunning: Boolean
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to join this cluster node with the node specified by 'address'.
|
||||||
|
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
||||||
|
*/
|
||||||
def join(address: String)
|
def join(address: String)
|
||||||
|
/**
|
||||||
|
* Send command to issue state transition to LEAVING for the node specified by 'address'.
|
||||||
|
*/
|
||||||
def leave(address: String)
|
def leave(address: String)
|
||||||
|
/**
|
||||||
|
* Send command to DOWN the node specified by 'address'.
|
||||||
|
*/
|
||||||
def down(address: String)
|
def down(address: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -47,30 +95,24 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
||||||
|
|
||||||
// JMX attributes (bean-style)
|
// JMX attributes (bean-style)
|
||||||
|
|
||||||
/*
|
|
||||||
* Sends a string to the JMX client that will list all nodes in the node ring as follows:
|
|
||||||
* {{{
|
|
||||||
* Members:
|
|
||||||
* Member(address = akka://system0@localhost:5550, status = Up)
|
|
||||||
* Member(address = akka://system1@localhost:5551, status = Up)
|
|
||||||
* Unreachable:
|
|
||||||
* Member(address = akka://system2@localhost:5553, status = Down)
|
|
||||||
* }}}
|
|
||||||
*/
|
|
||||||
def getClusterStatus: String = {
|
def getClusterStatus: String = {
|
||||||
val unreachable = clusterView.unreachableMembers
|
val unreachable = clusterView.unreachableMembers
|
||||||
"\nMembers:\n\t" + clusterView.members.mkString("\n\t") +
|
"\nMembers:\n\t" + clusterView.members.mkString("\n\t") +
|
||||||
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" }
|
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def getMembers: String =
|
||||||
|
clusterView.members.toSeq.map(_.address).mkString(", ")
|
||||||
|
|
||||||
|
def getUnreachable: String =
|
||||||
|
clusterView.unreachableMembers.map(_.address).mkString(", ")
|
||||||
|
|
||||||
def getMemberStatus: String = clusterView.status.toString
|
def getMemberStatus: String = clusterView.status.toString
|
||||||
|
|
||||||
def getLeader: String = clusterView.leader.toString
|
def getLeader: String = clusterView.leader.getOrElse("").toString
|
||||||
|
|
||||||
def isSingleton: Boolean = clusterView.isSingletonCluster
|
def isSingleton: Boolean = clusterView.isSingletonCluster
|
||||||
|
|
||||||
def isConvergence: Boolean = clusterView.convergence
|
|
||||||
|
|
||||||
def isAvailable: Boolean = clusterView.isAvailable
|
def isAvailable: Boolean = clusterView.isAvailable
|
||||||
|
|
||||||
def isRunning: Boolean = clusterView.isRunning
|
def isRunning: Boolean = clusterView.isRunning
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
||||||
def leader: Option[Address] = state.leader
|
def leader: Option[Address] = state.leader
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this node a singleton cluster?
|
* Does the cluster consist of only one member?
|
||||||
*/
|
*/
|
||||||
def isSingletonCluster: Boolean = members.size == 1
|
def isSingletonCluster: Boolean = members.size == 1
|
||||||
|
|
||||||
|
|
|
||||||
149
akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala
Normal file
149
akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala
Normal file
|
|
@ -0,0 +1,149 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import language.postfixOps
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import java.lang.management.ManagementFactory
|
||||||
|
import javax.management.InstanceNotFoundException
|
||||||
|
import javax.management.ObjectName
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.testkit._
|
||||||
|
import scala.util.Try
|
||||||
|
|
||||||
|
object MBeanMultiJvmSpec extends MultiNodeConfig {
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
val third = role("third")
|
||||||
|
val fourth = role("fourth")
|
||||||
|
|
||||||
|
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
|
||||||
|
akka.cluster.jmx.enabled = on
|
||||||
|
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class MBeanMultiJvmNode1 extends MBeanSpec
|
||||||
|
class MBeanMultiJvmNode2 extends MBeanSpec
|
||||||
|
class MBeanMultiJvmNode3 extends MBeanSpec
|
||||||
|
class MBeanMultiJvmNode4 extends MBeanSpec
|
||||||
|
|
||||||
|
abstract class MBeanSpec
|
||||||
|
extends MultiNodeSpec(MBeanMultiJvmSpec)
|
||||||
|
with MultiNodeClusterSpec {
|
||||||
|
|
||||||
|
import MBeanMultiJvmSpec._
|
||||||
|
import ClusterEvent._
|
||||||
|
|
||||||
|
val mbeanName = new ObjectName("akka:type=Cluster")
|
||||||
|
lazy val mbeanServer = ManagementFactory.getPlatformMBeanServer
|
||||||
|
|
||||||
|
"Cluster MBean" must {
|
||||||
|
"expose attributes" taggedAs LongRunningTest in {
|
||||||
|
val info = mbeanServer.getMBeanInfo(mbeanName)
|
||||||
|
info.getAttributes.map(_.getName).toSet must be(Set(
|
||||||
|
"ClusterStatus", "Members", "Unreachable", "MemberStatus", "Leader", "Singleton", "Available", "Running"))
|
||||||
|
enterBarrier("after-1")
|
||||||
|
}
|
||||||
|
|
||||||
|
"expose operations" taggedAs LongRunningTest in {
|
||||||
|
val info = mbeanServer.getMBeanInfo(mbeanName)
|
||||||
|
info.getOperations.map(_.getName).toSet must be(Set(
|
||||||
|
"join", "leave", "down"))
|
||||||
|
enterBarrier("after-2")
|
||||||
|
}
|
||||||
|
|
||||||
|
"change attributes after startup" taggedAs LongRunningTest in {
|
||||||
|
runOn(first) {
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Running").asInstanceOf[Boolean] must be(true)
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] must be(true)
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false)
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Leader") must be("")
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Members") must be("")
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")
|
||||||
|
mbeanServer.getAttribute(mbeanName, "MemberStatus") must be("Removed")
|
||||||
|
}
|
||||||
|
awaitClusterUp(first)
|
||||||
|
runOn(first) {
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up")
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == address(first).toString)
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(true)
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Members") must be(address(first).toString)
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")
|
||||||
|
}
|
||||||
|
enterBarrier("after-3")
|
||||||
|
}
|
||||||
|
|
||||||
|
"support join" taggedAs LongRunningTest in {
|
||||||
|
runOn(second, third, fourth) {
|
||||||
|
mbeanServer.invoke(mbeanName, "join", Array(address(first).toString), Array("java.lang.String"))
|
||||||
|
}
|
||||||
|
enterBarrier("joined")
|
||||||
|
|
||||||
|
awaitUpConvergence(4)
|
||||||
|
assertMembers(clusterView.members, roles.map(address(_)): _*)
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up")
|
||||||
|
val expectedMembers = roles.sorted.map(address(_)).mkString(", ")
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
||||||
|
val expectedLeader = address(roleOfLeader())
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == expectedLeader.toString)
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false)
|
||||||
|
|
||||||
|
enterBarrier("after-4")
|
||||||
|
}
|
||||||
|
|
||||||
|
"support down" taggedAs LongRunningTest in {
|
||||||
|
val fourthAddress = address(fourth)
|
||||||
|
runOn(first) {
|
||||||
|
testConductor.shutdown(fourth, 0).await
|
||||||
|
}
|
||||||
|
enterBarrier("fourth-shutdown")
|
||||||
|
|
||||||
|
runOn(first, second, third) {
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == fourthAddress.toString)
|
||||||
|
val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(", ")
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
||||||
|
}
|
||||||
|
enterBarrier("fourth-unreachable")
|
||||||
|
|
||||||
|
runOn(second) {
|
||||||
|
mbeanServer.invoke(mbeanName, "down", Array(fourthAddress.toString), Array("java.lang.String"))
|
||||||
|
}
|
||||||
|
enterBarrier("fourth-down")
|
||||||
|
|
||||||
|
runOn(first, second, third) {
|
||||||
|
awaitUpConvergence(3, canNotBePartOfMemberRing = List(fourthAddress))
|
||||||
|
assertMembers(clusterView.members, first, second, third)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-5")
|
||||||
|
}
|
||||||
|
|
||||||
|
"support leave" taggedAs LongRunningTest in within(20 seconds) {
|
||||||
|
runOn(second) {
|
||||||
|
mbeanServer.invoke(mbeanName, "leave", Array(address(third).toString), Array("java.lang.String"))
|
||||||
|
}
|
||||||
|
enterBarrier("third-left")
|
||||||
|
runOn(first, second) {
|
||||||
|
awaitUpConvergence(2)
|
||||||
|
assertMembers(clusterView.members, first, second)
|
||||||
|
val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(", ")
|
||||||
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
||||||
|
}
|
||||||
|
runOn(third) {
|
||||||
|
awaitCond(!cluster.isRunning)
|
||||||
|
// mbean should be unregistered, i.e. throw InstanceNotFoundException
|
||||||
|
awaitCond(Try { mbeanServer.getMBeanInfo(mbeanName); false } recover {
|
||||||
|
case e: InstanceNotFoundException ⇒ true
|
||||||
|
case _ ⇒ false
|
||||||
|
} get)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-6")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -573,6 +573,8 @@ Run it without parameters to see instructions about how to use the script::
|
||||||
leave <node-url> - Sends a request for node with URL to LEAVE the cluster
|
leave <node-url> - Sends a request for node with URL to LEAVE the cluster
|
||||||
down <node-url> - Sends a request for marking node with URL as DOWN
|
down <node-url> - Sends a request for marking node with URL as DOWN
|
||||||
member-status - Asks the member node for its current status
|
member-status - Asks the member node for its current status
|
||||||
|
members - Asks the cluster for addresses of current members
|
||||||
|
unreachable - Asks the cluster for addresses of unreachable members
|
||||||
cluster-status - Asks the cluster for its current status (member ring,
|
cluster-status - Asks the cluster for its current status (member ring,
|
||||||
unavailable nodes, meta data etc.)
|
unavailable nodes, meta data etc.)
|
||||||
leader - Asks the cluster who the current leader is
|
leader - Asks the cluster who the current leader is
|
||||||
|
|
@ -580,7 +582,6 @@ Run it without parameters to see instructions about how to use the script::
|
||||||
node cluster)
|
node cluster)
|
||||||
is-available - Checks if the member node is available
|
is-available - Checks if the member node is available
|
||||||
is-running - Checks if the member node is running
|
is-running - Checks if the member node is running
|
||||||
has-convergence - Checks if there is a cluster convergence
|
|
||||||
Where the <node-url> should be on the format of
|
Where the <node-url> should be on the format of
|
||||||
'akka://actor-system-name@hostname:port'
|
'akka://actor-system-name@hostname:port'
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -579,6 +579,8 @@ Run it without parameters to see instructions about how to use the script::
|
||||||
leave <node-url> - Sends a request for node with URL to LEAVE the cluster
|
leave <node-url> - Sends a request for node with URL to LEAVE the cluster
|
||||||
down <node-url> - Sends a request for marking node with URL as DOWN
|
down <node-url> - Sends a request for marking node with URL as DOWN
|
||||||
member-status - Asks the member node for its current status
|
member-status - Asks the member node for its current status
|
||||||
|
members - Asks the cluster for addresses of current members
|
||||||
|
unreachable - Asks the cluster for addresses of unreachable members
|
||||||
cluster-status - Asks the cluster for its current status (member ring,
|
cluster-status - Asks the cluster for its current status (member ring,
|
||||||
unavailable nodes, meta data etc.)
|
unavailable nodes, meta data etc.)
|
||||||
leader - Asks the cluster who the current leader is
|
leader - Asks the cluster who the current leader is
|
||||||
|
|
@ -586,7 +588,6 @@ Run it without parameters to see instructions about how to use the script::
|
||||||
node cluster)
|
node cluster)
|
||||||
is-available - Checks if the member node is available
|
is-available - Checks if the member node is available
|
||||||
is-running - Checks if the member node is running
|
is-running - Checks if the member node is running
|
||||||
has-convergence - Checks if there is a cluster convergence
|
|
||||||
Where the <node-url> should be on the format of
|
Where the <node-url> should be on the format of
|
||||||
'akka://actor-system-name@hostname:port'
|
'akka://actor-system-name@hostname:port'
|
||||||
|
|
||||||
|
|
|
||||||
42
akka-kernel/src/main/dist/bin/akka-cluster
vendored
42
akka-kernel/src/main/dist/bin/akka-cluster
vendored
|
|
@ -103,6 +103,32 @@ case "$2" in
|
||||||
$JMX_CLIENT $HOST akka:type=Cluster ClusterStatus
|
$JMX_CLIENT $HOST akka:type=Cluster ClusterStatus
|
||||||
;;
|
;;
|
||||||
|
|
||||||
|
members)
|
||||||
|
if [ $# -ne 2 ]; then
|
||||||
|
echo "Usage: $SELF <node-hostname:jmx-port> members"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
ensureNodeIsRunningAndAvailable
|
||||||
|
shift
|
||||||
|
|
||||||
|
echo "Querying members"
|
||||||
|
$JMX_CLIENT $HOST akka:type=Cluster Members
|
||||||
|
;;
|
||||||
|
|
||||||
|
unreachable)
|
||||||
|
if [ $# -ne 2 ]; then
|
||||||
|
echo "Usage: $SELF <node-hostname:jmx-port> unreachable"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
ensureNodeIsRunningAndAvailable
|
||||||
|
shift
|
||||||
|
|
||||||
|
echo "Querying unreachable members"
|
||||||
|
$JMX_CLIENT $HOST akka:type=Cluster Unreachable
|
||||||
|
;;
|
||||||
|
|
||||||
leader)
|
leader)
|
||||||
if [ $# -ne 2 ]; then
|
if [ $# -ne 2 ]; then
|
||||||
echo "Usage: $SELF <node-hostname:jmx-port> leader"
|
echo "Usage: $SELF <node-hostname:jmx-port> leader"
|
||||||
|
|
@ -129,19 +155,6 @@ case "$2" in
|
||||||
$JMX_CLIENT $HOST akka:type=Cluster Singleton
|
$JMX_CLIENT $HOST akka:type=Cluster Singleton
|
||||||
;;
|
;;
|
||||||
|
|
||||||
has-convergence)
|
|
||||||
if [ $# -ne 2 ]; then
|
|
||||||
echo "Usage: $SELF <node-hostname:jmx-port> is-convergence"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
ensureNodeIsRunningAndAvailable
|
|
||||||
shift
|
|
||||||
|
|
||||||
echo "Checking for cluster convergence"
|
|
||||||
$JMX_CLIENT $HOST akka:type=Cluster Convergence
|
|
||||||
;;
|
|
||||||
|
|
||||||
is-available)
|
is-available)
|
||||||
if [ $# -ne 2 ]; then
|
if [ $# -ne 2 ]; then
|
||||||
echo "Usage: $SELF <node-hostname:jmx-port> is-available"
|
echo "Usage: $SELF <node-hostname:jmx-port> is-available"
|
||||||
|
|
@ -176,12 +189,13 @@ case "$2" in
|
||||||
printf "%26s - %s\n" "leave <node-url>" "Sends a request for node with URL to LEAVE the cluster"
|
printf "%26s - %s\n" "leave <node-url>" "Sends a request for node with URL to LEAVE the cluster"
|
||||||
printf "%26s - %s\n" "down <node-url>" "Sends a request for marking node with URL as DOWN"
|
printf "%26s - %s\n" "down <node-url>" "Sends a request for marking node with URL as DOWN"
|
||||||
printf "%26s - %s\n" member-status "Asks the member node for its current status"
|
printf "%26s - %s\n" member-status "Asks the member node for its current status"
|
||||||
|
printf "%26s - %s\n" members "Asks the cluster for addresses of current members"
|
||||||
|
printf "%26s - %s\n" unreachable "Asks the cluster for addresses of unreachable members"
|
||||||
printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)"
|
printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)"
|
||||||
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
|
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
|
||||||
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
|
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
|
||||||
printf "%26s - %s\n" is-available "Checks if the member node is available"
|
printf "%26s - %s\n" is-available "Checks if the member node is available"
|
||||||
printf "%26s - %s\n" is-running "Checks if the member node is running"
|
printf "%26s - %s\n" is-running "Checks if the member node is running"
|
||||||
printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence"
|
|
||||||
printf "Where the <node-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
|
printf "Where the <node-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
|
||||||
printf "\n"
|
printf "\n"
|
||||||
printf "Examples: bin/$SELF localhost:9999 is-available\n"
|
printf "Examples: bin/$SELF localhost:9999 is-available\n"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue