Allow multiple Cluster JMX MBeans in the same JVM (#22484)
* Allow multiple Cluster JMX MBeans in the same JVM (#18772) * Remove unnecessary whitespace
This commit is contained in:
parent
ba213b7fee
commit
cc1312922c
5 changed files with 117 additions and 21 deletions
|
|
@ -25,21 +25,30 @@ shift 2
|
||||||
|
|
||||||
JMX_CLIENT="java -jar $JMXSHJAR -h $HOST -p $PORT /dev/fd/0"
|
JMX_CLIENT="java -jar $JMXSHJAR -h $HOST -p $PORT /dev/fd/0"
|
||||||
|
|
||||||
|
|
||||||
|
function mbeanObjectName() {
|
||||||
|
if [[ -z "$CLUSTER_PORT" ]]; then
|
||||||
|
echo "akka:type=Cluster"
|
||||||
|
else
|
||||||
|
echo "akka:type=Cluster,port=$CLUSTER_PORT"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
function invoke() {
|
function invoke() {
|
||||||
echo jmx_invoke -m akka:type=Cluster "$@" | $JMX_CLIENT
|
echo jmx_invoke -m $(mbeanObjectName) "$@" | $JMX_CLIENT
|
||||||
}
|
}
|
||||||
|
|
||||||
function get() {
|
function get() {
|
||||||
echo "puts [jmx_get -m akka:type=Cluster \"$1\"]" | $JMX_CLIENT
|
echo "puts [jmx_get -m $(mbeanObjectName) \"$1\"]" | $JMX_CLIENT
|
||||||
}
|
}
|
||||||
|
|
||||||
function ensureNodeIsRunningAndAvailable {
|
function ensureNodeIsRunningAndAvailable {
|
||||||
REPLY=$(get Available 2>&1) # redirects STDERR to STDOUT before capturing it
|
REPLY=$(get Available 2>&1) # redirects STDERR to STDOUT before capturing it
|
||||||
if [[ "$REPLY" != *true ]]; then
|
if [[ "$REPLY" != *true ]]; then
|
||||||
if [[ "$REPLY" == *"Cannot convert result to a string." ]]; then
|
if [[ "$REPLY" == *"Cannot convert result to a string." ]]; then
|
||||||
echo "Akka cluster MBean is not available on $HOST:$PORT"
|
echo "Akka cluster MBean is not available on $HOST:$PORT with MBean name '$(mbeanObjectName)'"
|
||||||
else
|
else
|
||||||
echo "Akka cluster node is not available on $HOST:$PORT, due to $REPLY"
|
echo "Akka cluster node is not available on $HOST:$PORT with MBean name '$(mbeanObjectName)', due to $REPLY"
|
||||||
fi
|
fi
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
@ -48,22 +57,24 @@ function ensureNodeIsRunningAndAvailable {
|
||||||
echo "This jmx-client/akka-cluster tool is deprecated use curl and https://github.com/akka/akka-cluster-management instead, since 2.5.0." >&2
|
echo "This jmx-client/akka-cluster tool is deprecated use curl and https://github.com/akka/akka-cluster-management instead, since 2.5.0." >&2
|
||||||
|
|
||||||
# switch on command
|
# switch on command
|
||||||
case "$1" in
|
while [ $# -gt 0 ];
|
||||||
|
do
|
||||||
|
case "$1" in
|
||||||
join)
|
join)
|
||||||
if [ $# -ne 2 ]; then
|
if [ $# -ne 2 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> join <node-url-to-join>"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> join <node-url-to-join>"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
ACTOR_SYSTEM_URL=$2
|
ACTOR_SYSTEM_URL=$2
|
||||||
echo "$HOST is JOINING cluster node $ACTOR_SYSTEM_URL"
|
echo "$HOST is JOINING cluster node $ACTOR_SYSTEM_URL"
|
||||||
invoke join $ACTOR_SYSTEM_URL
|
invoke join $ACTOR_SYSTEM_URL
|
||||||
|
shift 2
|
||||||
;;
|
;;
|
||||||
|
|
||||||
leave)
|
leave)
|
||||||
if [ $# -ne 2 ]; then
|
if [ $# -ne 2 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> leave <node-url-to-join>"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> leave <node-url-to-join>"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -71,11 +82,12 @@ case "$1" in
|
||||||
ACTOR_SYSTEM_URL=$2
|
ACTOR_SYSTEM_URL=$2
|
||||||
echo "Scheduling $ACTOR_SYSTEM_URL to LEAVE cluster"
|
echo "Scheduling $ACTOR_SYSTEM_URL to LEAVE cluster"
|
||||||
invoke leave $ACTOR_SYSTEM_URL
|
invoke leave $ACTOR_SYSTEM_URL
|
||||||
|
shift 2
|
||||||
;;
|
;;
|
||||||
|
|
||||||
down)
|
down)
|
||||||
if [ $# -ne 2 ]; then
|
if [ $# -ne 2 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> down <node-url-to-join>"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> down <node-url-to-join>"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -84,11 +96,12 @@ case "$1" in
|
||||||
ACTOR_SYSTEM_URL=$2
|
ACTOR_SYSTEM_URL=$2
|
||||||
echo "Marking $ACTOR_SYSTEM_URL as DOWN"
|
echo "Marking $ACTOR_SYSTEM_URL as DOWN"
|
||||||
invoke down $ACTOR_SYSTEM_URL
|
invoke down $ACTOR_SYSTEM_URL
|
||||||
|
shift 2
|
||||||
;;
|
;;
|
||||||
|
|
||||||
member-status)
|
member-status)
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> member-status"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> member-status"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -96,22 +109,24 @@ case "$1" in
|
||||||
|
|
||||||
echo "Querying member status for $HOST"
|
echo "Querying member status for $HOST"
|
||||||
get MemberStatus
|
get MemberStatus
|
||||||
|
shift 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
cluster-status)
|
cluster-status)
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> cluster-status"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> cluster-status"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
ensureNodeIsRunningAndAvailable
|
ensureNodeIsRunningAndAvailable
|
||||||
|
|
||||||
get ClusterStatus
|
get ClusterStatus
|
||||||
|
shift 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
members)
|
members)
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> members"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> members"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -119,11 +134,12 @@ case "$1" in
|
||||||
|
|
||||||
echo "Querying members"
|
echo "Querying members"
|
||||||
get Members
|
get Members
|
||||||
|
shift 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
unreachable)
|
unreachable)
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> unreachable"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> unreachable"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -131,11 +147,12 @@ case "$1" in
|
||||||
|
|
||||||
echo "Querying unreachable members"
|
echo "Querying unreachable members"
|
||||||
get Unreachable
|
get Unreachable
|
||||||
|
shift 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
leader)
|
leader)
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> leader"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> leader"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -143,11 +160,12 @@ case "$1" in
|
||||||
|
|
||||||
echo "Checking leader status"
|
echo "Checking leader status"
|
||||||
get Leader
|
get Leader
|
||||||
|
shift 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
is-singleton)
|
is-singleton)
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> is-singleton"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> is-singleton"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -155,11 +173,12 @@ case "$1" in
|
||||||
|
|
||||||
echo "Checking for singleton cluster"
|
echo "Checking for singleton cluster"
|
||||||
get Singleton
|
get Singleton
|
||||||
|
shift 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
is-available)
|
is-available)
|
||||||
if [ $# -ne 1 ]; then
|
if [ $# -ne 1 ]; then
|
||||||
echo "Usage: $SELF <node-hostname> <jmx-port> is-available"
|
echo "Usage: $SELF <node-hostname> <jmx-port> <optional: -p cluster-port> is-available"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -167,10 +186,22 @@ case "$1" in
|
||||||
|
|
||||||
echo "Checking if member node on $HOST is AVAILABLE"
|
echo "Checking if member node on $HOST is AVAILABLE"
|
||||||
get Available
|
get Available
|
||||||
|
shift 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
|
-p)
|
||||||
|
if [[ ! $2 =~ ^[0-9]+$ ]]; then
|
||||||
|
echo "-p option requires a cluster port number in digits"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
CLUSTER_PORT=$2
|
||||||
|
shift 2
|
||||||
|
;;
|
||||||
*)
|
*)
|
||||||
printf "Usage: $0 <node-hostname> <jmx-port> <command> ...\n"
|
printf "Usage: $0 <node-hostname> <jmx-port> <optional: -p cluster-port> <command> ...\n"
|
||||||
|
printf "\n"
|
||||||
|
printf "-p parameter needs is needed when cluster is run with akka.cluster.jmx.multi-mbeans-in-same-jvm = on.¥n"
|
||||||
printf "\n"
|
printf "\n"
|
||||||
printf "Supported commands are:\n"
|
printf "Supported commands are:\n"
|
||||||
printf "%26s - %s\n" "join <node-url>" "Sends request a JOIN node with the specified URL"
|
printf "%26s - %s\n" "join <node-url>" "Sends request a JOIN node with the specified URL"
|
||||||
|
|
@ -188,6 +219,10 @@ case "$1" in
|
||||||
printf "Examples: $0 localhost 9999 is-available\n"
|
printf "Examples: $0 localhost 9999 is-available\n"
|
||||||
printf " $0 localhost 9999 join akka.tcp://MySystem@darkstar:2552\n"
|
printf " $0 localhost 9999 join akka.tcp://MySystem@darkstar:2552\n"
|
||||||
printf " $0 localhost 9999 cluster-status\n"
|
printf " $0 localhost 9999 cluster-status\n"
|
||||||
|
printf " $0 localhost 9999 -p 2551 is-available\n"
|
||||||
|
printf " $0 localhost 9999 -p 2551 join akka.tcp://MySystem@darkstar:2552\n"
|
||||||
|
printf " $0 localhost 9999 -p 2551 cluster-status\n"
|
||||||
exit 1
|
exit 1
|
||||||
;;
|
;;
|
||||||
esac
|
esac
|
||||||
|
done
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,11 @@ akka {
|
||||||
# Enable or disable JMX MBeans for management of the cluster
|
# Enable or disable JMX MBeans for management of the cluster
|
||||||
jmx.enabled = on
|
jmx.enabled = on
|
||||||
|
|
||||||
|
# Enable or disable multiple JMX MBeans in the same JVM
|
||||||
|
# If this is disabled, the MBean Object name is "akka:type=Cluster"
|
||||||
|
# If this is enabled, them MBean Object names become "akka:type=Cluster,port=$clusterPortNumber"
|
||||||
|
jmx.multi-mbeans-in-same-jvm = off
|
||||||
|
|
||||||
# how long should the node wait before starting the periodic tasks
|
# how long should the node wait before starting the periodic tasks
|
||||||
# maintenance tasks?
|
# maintenance tasks?
|
||||||
periodic-tasks-initial-delay = 1s
|
periodic-tasks-initial-delay = 1s
|
||||||
|
|
|
||||||
|
|
@ -133,7 +133,12 @@ trait ClusterNodeMBean {
|
||||||
private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
||||||
|
|
||||||
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
|
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
|
||||||
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
|
private val clusterMBeanName =
|
||||||
|
if (cluster.settings.JmxMultiMbeansInSameEnabled)
|
||||||
|
new ObjectName("akka:type=Cluster,port=" + cluster.selfUniqueAddress.address.port.getOrElse(""))
|
||||||
|
else
|
||||||
|
new ObjectName("akka:type=Cluster")
|
||||||
|
|
||||||
private def clusterView = cluster.readView
|
private def clusterView = cluster.readView
|
||||||
import cluster.InfoLogger._
|
import cluster.InfoLogger._
|
||||||
|
|
||||||
|
|
@ -199,7 +204,15 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
||||||
mBeanServer.registerMBean(mbean, clusterMBeanName)
|
mBeanServer.registerMBean(mbean, clusterMBeanName)
|
||||||
logInfo("Registered cluster JMX MBean [{}]", clusterMBeanName)
|
logInfo("Registered cluster JMX MBean [{}]", clusterMBeanName)
|
||||||
} catch {
|
} catch {
|
||||||
case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
case e: InstanceAlreadyExistsException ⇒ {
|
||||||
|
if (cluster.settings.JmxMultiMbeansInSameEnabled) {
|
||||||
|
log.error(e, s"Failed to register Cluster JMX MBean with name=$clusterMBeanName")
|
||||||
|
} else {
|
||||||
|
log.warning(
|
||||||
|
s"Could not register Cluster JMX MBean with name=$clusterMBeanName as it is already registered. " +
|
||||||
|
"If you are running multiple clusters in the same JVM, set 'akka.cluster.jmx.multi-mbeans-in-same-jvm = on' in config")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -210,7 +223,15 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
||||||
try {
|
try {
|
||||||
mBeanServer.unregisterMBean(clusterMBeanName)
|
mBeanServer.unregisterMBean(clusterMBeanName)
|
||||||
} catch {
|
} catch {
|
||||||
case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
case e: InstanceNotFoundException ⇒ {
|
||||||
|
if (cluster.settings.JmxMultiMbeansInSameEnabled) {
|
||||||
|
log.error(e, s"Failed to unregister Cluster JMX MBean with name=$clusterMBeanName")
|
||||||
|
} else {
|
||||||
|
log.warning(
|
||||||
|
s"Could not unregister Cluster JMX MBean with name=$clusterMBeanName as it was not found. " +
|
||||||
|
"If you are running multiple clusters in the same JVM, set 'akka.cluster.jmx.multi-mbeans-in-same-jvm = on' in config")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -105,6 +105,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
}
|
}
|
||||||
val RunCoordinatedShutdownWhenDown: Boolean = cc.getBoolean("run-coordinated-shutdown-when-down")
|
val RunCoordinatedShutdownWhenDown: Boolean = cc.getBoolean("run-coordinated-shutdown-when-down")
|
||||||
val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
|
val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
|
||||||
|
val JmxMultiMbeansInSameEnabled: Boolean = cc.getBoolean("jmx.multi-mbeans-in-same-jvm")
|
||||||
val UseDispatcher: String = cc.getString("use-dispatcher") match {
|
val UseDispatcher: String = cc.getString("use-dispatcher") match {
|
||||||
case "" ⇒ Dispatchers.DefaultDispatcherId
|
case "" ⇒ Dispatchers.DefaultDispatcherId
|
||||||
case id ⇒ id
|
case id ⇒ id
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import akka.actor.Address
|
||||||
import akka.cluster.InternalClusterAction._
|
import akka.cluster.InternalClusterAction._
|
||||||
import java.lang.management.ManagementFactory
|
import java.lang.management.ManagementFactory
|
||||||
import javax.management.ObjectName
|
import javax.management.ObjectName
|
||||||
|
|
||||||
import akka.testkit.TestProbe
|
import akka.testkit.TestProbe
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
|
|
@ -19,6 +20,7 @@ import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.CoordinatedShutdown
|
import akka.actor.CoordinatedShutdown
|
||||||
import akka.cluster.ClusterEvent.MemberEvent
|
import akka.cluster.ClusterEvent.MemberEvent
|
||||||
import akka.cluster.ClusterEvent._
|
import akka.cluster.ClusterEvent._
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
|
|
||||||
object ClusterSpec {
|
object ClusterSpec {
|
||||||
|
|
@ -214,5 +216,37 @@ akka.loglevel=DEBUG
|
||||||
shutdown(sys3)
|
shutdown(sys3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"register multiple cluster JMX MBeans with akka.cluster.jmx.multi-mbeans-in-same-jvm = on" in {
|
||||||
|
def getConfig = (port: Int) ⇒ ConfigFactory.parseString(
|
||||||
|
s"""
|
||||||
|
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
|
||||||
|
akka.remote.netty.tcp.port = ${port}
|
||||||
|
akka.remote.artery.canonical.port = ${port}
|
||||||
|
"""
|
||||||
|
).withFallback(ConfigFactory.parseString(ClusterSpec.config))
|
||||||
|
|
||||||
|
val sys1 = ActorSystem("ClusterSpec4", getConfig(2552))
|
||||||
|
val sys2 = ActorSystem("ClusterSpec4", getConfig(2553))
|
||||||
|
|
||||||
|
try {
|
||||||
|
Cluster(sys1)
|
||||||
|
Cluster(sys2)
|
||||||
|
|
||||||
|
val name1 = new ObjectName(s"akka:type=Cluster,port=2552")
|
||||||
|
val info1 = ManagementFactory.getPlatformMBeanServer.getMBeanInfo(name1)
|
||||||
|
info1.getAttributes.length should be > (0)
|
||||||
|
info1.getOperations.length should be > (0)
|
||||||
|
|
||||||
|
val name2 = new ObjectName(s"akka:type=Cluster,port=2553")
|
||||||
|
val info2 = ManagementFactory.getPlatformMBeanServer.getMBeanInfo(name2)
|
||||||
|
info2.getAttributes.length should be > (0)
|
||||||
|
info2.getOperations.length should be > (0)
|
||||||
|
} finally {
|
||||||
|
shutdown(sys1)
|
||||||
|
shutdown(sys2)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue