diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 7944157103..69c2facf14 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -20,6 +20,9 @@ import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeoutException import java.security.SecureRandom +import java.lang.management.ManagementFactory +import javax.management._ + import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec @@ -308,6 +311,27 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) } +/** + * Interface for the cluster JMX MBean. + */ +trait ClusterNodeMBean { + def getMemberStatus: String + def getClusterStatus: String + + def isLeader: Boolean + def isSingleton: Boolean + def isConvergence: Boolean + def isAvailable: Boolean + + def ping(): String + def join(address: String) + def leave(address: String) + def down(address: String) + def remove(address: String) + + def shutdown() +} + /** * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live * and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round @@ -328,7 +352,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem) extends Extension { +class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. @@ -370,7 +394,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { private val log = Logging(system, "Node") private val random = SecureRandom.getInstance("SHA1PRNG") - log.info("Cluster Node [{}] - is JOINING cluster...", remoteAddress) + private val mBeanServer = ManagementFactory.getPlatformMBeanServer + private val clusterMBeanName = new ObjectName("akka:type=Cluster") + + log.info("Cluster Node [{}] - is starting up...", remoteAddress) // create superisor for daemons under path "/system/cluster" private val clusterDaemons = { @@ -409,12 +436,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { leaderActions() } - log.info("Cluster Node [{}] - has JOINED cluster successfully", remoteAddress) + createMBean() + + log.info("Cluster Node [{}] - has started up successfully", remoteAddress) // ====================================================== // ===================== PUBLIC API ===================== // ====================================================== + def ping = "pong" + def self: Member = latestGossip.members .find(_.address == remoteAddress) .getOrElse(throw new IllegalStateException("Can't find 'this' Member in the cluster membership ring")) @@ -464,6 +495,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() system.stop(clusterDaemons) + mBeanServer.unregisterMBean(clusterMBeanName) } } @@ -1005,4 +1037,32 @@ class Cluster(system: ExtendedActorSystem) extends Extension { private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size) private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1 + + /** + * Creates the cluster JMX MBean and registers it in the MBean server. + */ + private def createMBean() = { + val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean { + def getMemberStatus: String = clusterNode.status.toString + + // FIXME clean up: Gossip(overview = GossipOverview(seen = [], unreachable = []), members = [Member(address = akka://system0@localhost:5550, status = Joining)], meta = [], version = VectorClock(Node(df2691d6cc6779dc2555316f557b5fa4) -> 00000136b164746d)) + def getClusterStatus: String = clusterNode.latestGossip.toString + + def isLeader: Boolean = clusterNode.isLeader + def isSingleton: Boolean = clusterNode.isSingletonCluster + def isConvergence: Boolean = clusterNode.convergence.isDefined + def isAvailable: Boolean = clusterNode.isAvailable + + def ping(): String = clusterNode.ping + + // FIXME return error message if failure + def join(address: String) = clusterNode.join(AddressFromURIString(address)) + def leave(address: String) = clusterNode.leave(AddressFromURIString(address)) + def down(address: String) = clusterNode.down(AddressFromURIString(address)) + def remove(address: String) = clusterNode.remove(AddressFromURIString(address)) + def shutdown() = clusterNode.shutdown() + } + log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName) + mBeanServer.registerMBean(mbean, clusterMBeanName) + } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala new file mode 100644 index 0000000000..9e9e89347e --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.cluster + +import java.net.InetSocketAddress + +import akka.testkit._ +import akka.dispatch._ +import akka.actor._ +import akka.remote._ +import akka.util.duration._ + +import com.typesafe.config._ + +/** + * ***************************************************** + * + * Add these options to the sbt script: + * java \ + * -Dcom.sun.management.jmxremote.port=9999 \ + * -Dcom.sun.management.jmxremote.ssl=false \ + * -Dcom.sun.management.jmxremote.authenticate=false \ + * -Dcom.sun.management.jmxremote.password.file= + * + * Use the jmx/cmdline-jmxclient to invoke queries and operations: + * java -jar jmx/cmdline-jmxclient-0.10.3.jar - localhost:9999 akka:type=Cluster join=akka://system0@localhost:5550 + * + * ***************************************************** + */ + +class ClusterMBeanSpec extends ClusterSpec("akka.loglevel = DEBUG") with ImplicitSender { + + var node0: Cluster = _ + var system0: ActorSystemImpl = _ + + try { + "A cluster node" must { + System.setProperty("com.sun.management.jmxremote.port", "9999") + system0 = ActorSystem("system0", ConfigFactory + .parseString("akka.remote.netty.port=5550") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] + node0 = Cluster(system0) + + "be able to communicate over JMX through its ClusterMBean" in { //taggedAs LongRunningTest in { + Thread.sleep(120.seconds.dilated.toMillis) + } + } + } catch { + case e: Exception ⇒ + e.printStackTrace + fail(e.toString) + } + + override def atTermination() { + if (node0 ne null) node0.shutdown() + if (system0 ne null) system0.shutdown() + } +} diff --git a/akka-kernel/src/main/dist/bin/akka-cluster b/akka-kernel/src/main/dist/bin/akka-cluster new file mode 100755 index 0000000000..a99d84de88 --- /dev/null +++ b/akka-kernel/src/main/dist/bin/akka-cluster @@ -0,0 +1,205 @@ +#!/bin/bash + +# FIXME allow passing in a hostname:port to override default +# FIXME leader should return the current leader - not boolean + +DEFAULT_HOST="localhost:9999" +SCRIPT=`basename $0` +JMX_CLIENT="java -jar $AKKA_HOME/jmx/cmdline-jmxclient-0.10.3.jar - $DEFAULT_HOST akka:type=Cluster" + +# Check the first argument for instructions +case "$1" in + join) + if [ $# -ne 2 ]; then + echo "Usage: $SCRIPT join " + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT join=$@ + ;; + + leave) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT leave" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT leave=$@ + ;; + + remove) + if [ $# -ne 2 ]; then + echo "Usage: $SCRIPT remove " + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT remove=$@ + ;; + + down) + if [ $# -ne 2 ]; then + echo "Usage: $SCRIPT down " + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT down=$@ + ;; + + member-status) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT member-status" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT MemberStatus + ;; + + cluster-status) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT cluster-status" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT ClusterStatus + ;; + + ping) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT ping" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT ping + ;; + + leader) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT leader" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT Leader + ;; + + is-singleton) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT is-singleton" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT Singleton + ;; + + has-convergence) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT is-convergence" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT Convergence + ;; + + is-available) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT is-available" + exit 1 + fi + + # Make sure the local node IS running + REPLY=$($JMX_CLIENT ping 2>&1 >/dev/null) # redirect STDERR to STDOUT before capturing it + if [[ "$REPLY" != *pong* ]]; then + echo "Akka cluster node is not running" + exit 1 + fi + + shift + $JMX_CLIENT Available + ;; + + *) + echo "Usage: $SCRIPT { ping | join | leave | remove | down" + echo " member-status | cluster-status | leader" + echo " is-singleton | has-convergence | is-available }" + exit 1 + ;; +esac diff --git a/akka-kernel/src/main/dist/lib/cmdline-jmxclient-0.10.3.jar b/akka-kernel/src/main/dist/lib/cmdline-jmxclient-0.10.3.jar new file mode 100644 index 0000000000..a3454cfe78 Binary files /dev/null and b/akka-kernel/src/main/dist/lib/cmdline-jmxclient-0.10.3.jar differ