diff --git a/akka-cluster/jmx-client/akka-cluster b/akka-cluster/jmx-client/akka-cluster index 1c50e92143..684a3d0514 100755 --- a/akka-cluster/jmx-client/akka-cluster +++ b/akka-cluster/jmx-client/akka-cluster @@ -25,21 +25,30 @@ shift 2 JMX_CLIENT="java -jar $JMXSHJAR -h $HOST -p $PORT /dev/fd/0" + +function mbeanObjectName() { + if [[ -z "$CLUSTER_PORT" ]]; then + echo "akka:type=Cluster" + else + echo "akka:type=Cluster,port=$CLUSTER_PORT" + fi +} + function invoke() { - echo jmx_invoke -m akka:type=Cluster "$@" | $JMX_CLIENT + echo jmx_invoke -m $(mbeanObjectName) "$@" | $JMX_CLIENT } function get() { - echo "puts [jmx_get -m akka:type=Cluster \"$1\"]" | $JMX_CLIENT + echo "puts [jmx_get -m $(mbeanObjectName) \"$1\"]" | $JMX_CLIENT } function ensureNodeIsRunningAndAvailable { REPLY=$(get Available 2>&1) # redirects STDERR to STDOUT before capturing it if [[ "$REPLY" != *true ]]; then if [[ "$REPLY" == *"Cannot convert result to a string." ]]; then - echo "Akka cluster MBean is not available on $HOST:$PORT" + echo "Akka cluster MBean is not available on $HOST:$PORT with MBean name '$(mbeanObjectName)'" else - echo "Akka cluster node is not available on $HOST:$PORT, due to $REPLY" + echo "Akka cluster node is not available on $HOST:$PORT with MBean name '$(mbeanObjectName)', due to $REPLY" fi exit 1 fi @@ -48,22 +57,24 @@ function ensureNodeIsRunningAndAvailable { echo "This jmx-client/akka-cluster tool is deprecated use curl and https://github.com/akka/akka-cluster-management instead, since 2.5.0." >&2 # switch on command -case "$1" in - +while [ $# -gt 0 ]; +do + case "$1" in join) if [ $# -ne 2 ]; then - echo "Usage: $SELF join " + echo "Usage: $SELF join " exit 1 fi ACTOR_SYSTEM_URL=$2 echo "$HOST is JOINING cluster node $ACTOR_SYSTEM_URL" invoke join $ACTOR_SYSTEM_URL + shift 2 ;; leave) if [ $# -ne 2 ]; then - echo "Usage: $SELF leave " + echo "Usage: $SELF leave " exit 1 fi @@ -71,11 +82,12 @@ case "$1" in ACTOR_SYSTEM_URL=$2 echo "Scheduling $ACTOR_SYSTEM_URL to LEAVE cluster" invoke leave $ACTOR_SYSTEM_URL + shift 2 ;; down) if [ $# -ne 2 ]; then - echo "Usage: $SELF down " + echo "Usage: $SELF down " exit 1 fi @@ -84,11 +96,12 @@ case "$1" in ACTOR_SYSTEM_URL=$2 echo "Marking $ACTOR_SYSTEM_URL as DOWN" invoke down $ACTOR_SYSTEM_URL + shift 2 ;; member-status) if [ $# -ne 1 ]; then - echo "Usage: $SELF member-status" + echo "Usage: $SELF member-status" exit 1 fi @@ -96,22 +109,24 @@ case "$1" in echo "Querying member status for $HOST" get MemberStatus + shift 1 ;; cluster-status) if [ $# -ne 1 ]; then - echo "Usage: $SELF cluster-status" + echo "Usage: $SELF cluster-status" exit 1 fi ensureNodeIsRunningAndAvailable get ClusterStatus + shift 1 ;; members) if [ $# -ne 1 ]; then - echo "Usage: $SELF members" + echo "Usage: $SELF members" exit 1 fi @@ -119,11 +134,12 @@ case "$1" in echo "Querying members" get Members + shift 1 ;; unreachable) if [ $# -ne 1 ]; then - echo "Usage: $SELF unreachable" + echo "Usage: $SELF unreachable" exit 1 fi @@ -131,11 +147,12 @@ case "$1" in echo "Querying unreachable members" get Unreachable + shift 1 ;; leader) if [ $# -ne 1 ]; then - echo "Usage: $SELF leader" + echo "Usage: $SELF leader" exit 1 fi @@ -143,11 +160,12 @@ case "$1" in echo "Checking leader status" get Leader + shift 1 ;; is-singleton) if [ $# -ne 1 ]; then - echo "Usage: $SELF is-singleton" + echo "Usage: $SELF is-singleton" exit 1 fi @@ -155,11 +173,12 @@ case "$1" in echo "Checking for singleton cluster" get Singleton + shift 1 ;; is-available) if [ $# -ne 1 ]; then - echo "Usage: $SELF is-available" + echo "Usage: $SELF is-available" exit 1 fi @@ -167,10 +186,22 @@ case "$1" in echo "Checking if member node on $HOST is AVAILABLE" get Available + shift 1 ;; + -p) + if [[ ! $2 =~ ^[0-9]+$ ]]; then + echo "-p option requires a cluster port number in digits" + exit 1 + fi + + CLUSTER_PORT=$2 + shift 2 + ;; *) - printf "Usage: $0 ...\n" + printf "Usage: $0 ...\n" + printf "\n" + printf "-p parameter needs is needed when cluster is run with akka.cluster.jmx.multi-mbeans-in-same-jvm = on.¥n" printf "\n" printf "Supported commands are:\n" printf "%26s - %s\n" "join " "Sends request a JOIN node with the specified URL" @@ -188,6 +219,10 @@ case "$1" in printf "Examples: $0 localhost 9999 is-available\n" printf " $0 localhost 9999 join akka.tcp://MySystem@darkstar:2552\n" printf " $0 localhost 9999 cluster-status\n" + printf " $0 localhost 9999 -p 2551 is-available\n" + printf " $0 localhost 9999 -p 2551 join akka.tcp://MySystem@darkstar:2552\n" + printf " $0 localhost 9999 -p 2551 cluster-status\n" exit 1 ;; esac +done diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 3a04d60cb9..ed2336ec37 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -99,6 +99,11 @@ akka { # Enable or disable JMX MBeans for management of the cluster jmx.enabled = on + # Enable or disable multiple JMX MBeans in the same JVM + # If this is disabled, the MBean Object name is "akka:type=Cluster" + # If this is enabled, them MBean Object names become "akka:type=Cluster,port=$clusterPortNumber" + jmx.multi-mbeans-in-same-jvm = off + # how long should the node wait before starting the periodic tasks # maintenance tasks? periodic-tasks-initial-delay = 1s diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index fb1a959aac..bd828e4e80 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -133,7 +133,12 @@ trait ClusterNodeMBean { private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { private val mBeanServer = ManagementFactory.getPlatformMBeanServer - private val clusterMBeanName = new ObjectName("akka:type=Cluster") + private val clusterMBeanName = + if (cluster.settings.JmxMultiMbeansInSameEnabled) + new ObjectName("akka:type=Cluster,port=" + cluster.selfUniqueAddress.address.port.getOrElse("")) + else + new ObjectName("akka:type=Cluster") + private def clusterView = cluster.readView import cluster.InfoLogger._ @@ -199,7 +204,15 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { mBeanServer.registerMBean(mbean, clusterMBeanName) logInfo("Registered cluster JMX MBean [{}]", clusterMBeanName) } catch { - case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + case e: InstanceAlreadyExistsException ⇒ { + if (cluster.settings.JmxMultiMbeansInSameEnabled) { + log.error(e, s"Failed to register Cluster JMX MBean with name=$clusterMBeanName") + } else { + log.warning( + s"Could not register Cluster JMX MBean with name=$clusterMBeanName as it is already registered. " + + "If you are running multiple clusters in the same JVM, set 'akka.cluster.jmx.multi-mbeans-in-same-jvm = on' in config") + } + } } } @@ -210,7 +223,15 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { - case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + case e: InstanceNotFoundException ⇒ { + if (cluster.settings.JmxMultiMbeansInSameEnabled) { + log.error(e, s"Failed to unregister Cluster JMX MBean with name=$clusterMBeanName") + } else { + log.warning( + s"Could not unregister Cluster JMX MBean with name=$clusterMBeanName as it was not found. " + + "If you are running multiple clusters in the same JVM, set 'akka.cluster.jmx.multi-mbeans-in-same-jvm = on' in config") + } + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index f3a42dae54..80453ce1d8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -105,6 +105,7 @@ final class ClusterSettings(val config: Config, val systemName: String) { } val RunCoordinatedShutdownWhenDown: Boolean = cc.getBoolean("run-coordinated-shutdown-when-down") val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") + val JmxMultiMbeansInSameEnabled: Boolean = cc.getBoolean("jmx.multi-mbeans-in-same-jvm") val UseDispatcher: String = cc.getString("use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId case id ⇒ id diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 481e44a6f3..c1b1d938d7 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -12,6 +12,7 @@ import akka.actor.Address import akka.cluster.InternalClusterAction._ import java.lang.management.ManagementFactory import javax.management.ObjectName + import akka.testkit.TestProbe import akka.actor.ActorSystem import akka.actor.Props @@ -19,6 +20,7 @@ import com.typesafe.config.ConfigFactory import akka.actor.CoordinatedShutdown import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent._ + import scala.concurrent.Await object ClusterSpec { @@ -214,5 +216,37 @@ akka.loglevel=DEBUG shutdown(sys3) } } + + "register multiple cluster JMX MBeans with akka.cluster.jmx.multi-mbeans-in-same-jvm = on" in { + def getConfig = (port: Int) ⇒ ConfigFactory.parseString( + s""" + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + akka.remote.netty.tcp.port = ${port} + akka.remote.artery.canonical.port = ${port} + """ + ).withFallback(ConfigFactory.parseString(ClusterSpec.config)) + + val sys1 = ActorSystem("ClusterSpec4", getConfig(2552)) + val sys2 = ActorSystem("ClusterSpec4", getConfig(2553)) + + try { + Cluster(sys1) + Cluster(sys2) + + val name1 = new ObjectName(s"akka:type=Cluster,port=2552") + val info1 = ManagementFactory.getPlatformMBeanServer.getMBeanInfo(name1) + info1.getAttributes.length should be > (0) + info1.getOperations.length should be > (0) + + val name2 = new ObjectName(s"akka:type=Cluster,port=2553") + val info2 = ManagementFactory.getPlatformMBeanServer.getMBeanInfo(name2) + info2.getAttributes.length should be > (0) + info2.getOperations.length should be > (0) + } finally { + shutdown(sys1) + shutdown(sys2) + } + + } } -} +} \ No newline at end of file