From 846820447e52fbf74b2c5a42e4fcb43b2414d331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 16 Apr 2012 16:58:19 +0200 Subject: [PATCH] Changed output of the 'cluster-status' command to be nicely formatted. Added error handling to JMX registration. Added more doc to 'akka-cluster' script. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Cluster.scala | 43 +++++++++++++++++-- .../scala/akka/cluster/ClusterMBeanSpec.scala | 6 ++- akka-kernel/src/main/dist/bin/akka-cluster | 9 +++- 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index cae25adb8f..af62906946 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -500,7 +500,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() system.stop(clusterDaemons) - mBeanServer.unregisterMBean(clusterMBeanName) + try { + mBeanServer.unregisterMBean(clusterMBeanName) + } catch { + case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + } } } @@ -1050,24 +1054,55 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean { // JMX attributes (bean-style) + + /* + * Sends a string to the JMX client that will list all nodes in the node ring as follows: + * {{{ + * Members: + * Member(address = akka://system0@localhost:5550, status = Up) + * Member(address = akka://system1@localhost:5551, status = Up) + * Unreachable: + * Member(address = akka://system2@localhost:5553, status = Down) + * }}} + */ + def getClusterStatus: String = { + val gossip = clusterNode.latestGossip + val unreachable = gossip.overview.unreachable + val metaData = gossip.meta + "\nMembers:\n\t" + gossip.members.mkString("\n\t") + + { if (!unreachable.isEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + + { if (!metaData.isEmpty) "\nMeta Data:\t" + metaData.toString else "" } + } + def getMemberStatus: String = clusterNode.status.toString - // FIXME clean up: Gossip(overview = GossipOverview(seen = [], unreachable = []), members = [Member(address = akka://system0@localhost:5550, status = Joining)], meta = [], version = VectorClock(Node(df2691d6cc6779dc2555316f557b5fa4) -> 00000136b164746d)) - def getClusterStatus: String = clusterNode.latestGossip.toString + def getLeader: String = clusterNode.leader.toString def isSingleton: Boolean = clusterNode.isSingletonCluster + def isConvergence: Boolean = clusterNode.convergence.isDefined + def isAvailable: Boolean = clusterNode.isAvailable // JMX commands + def ping(): String = clusterNode.ping + def join(address: String) = clusterNode.join(AddressFromURIString(address)) + def leave(address: String) = clusterNode.leave(AddressFromURIString(address)) + def down(address: String) = clusterNode.down(AddressFromURIString(address)) + def remove(address: String) = clusterNode.remove(AddressFromURIString(address)) + def shutdown() = clusterNode.shutdown() } log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName) - mBeanServer.registerMBean(mbean, clusterMBeanName) + try { + mBeanServer.registerMBean(mbean, clusterMBeanName) + } catch { + case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala index 9e9e89347e..8e4827cc09 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala @@ -44,8 +44,10 @@ class ClusterMBeanSpec extends ClusterSpec("akka.loglevel = DEBUG") with Implici val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] node0 = Cluster(system0) - "be able to communicate over JMX through its ClusterMBean" in { //taggedAs LongRunningTest in { - Thread.sleep(120.seconds.dilated.toMillis) + "be able to communicate over JMX through its ClusterMBean" taggedAs LongRunningTest in { + Thread.sleep(60.seconds.dilated.toMillis) + + // FIXME test JMX API } } } catch { diff --git a/akka-kernel/src/main/dist/bin/akka-cluster b/akka-kernel/src/main/dist/bin/akka-cluster index 23a41e3a5e..149de5e692 100755 --- a/akka-kernel/src/main/dist/bin/akka-cluster +++ b/akka-kernel/src/main/dist/bin/akka-cluster @@ -1,6 +1,13 @@ #!/bin/bash -# FIXME support authentication? + # Add these options to the sbt or startup script: + # java \ + # -Dcom.sun.management.jmxremote.port=9999 \ + # -Dcom.sun.management.jmxremote.ssl=false \ + # -Dcom.sun.management.jmxremote.authenticate=false \ + # ... + +# FIXME support authentication? if so add: -Dcom.sun.management.jmxremote.password.file= JMX_CLIENT="java -jar $AKKA_HOME/lib/cmdline-jmxclient-0.10.3.jar -"