diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 7944157103..e3429cfdb3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -20,6 +20,9 @@ import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeoutException import java.security.SecureRandom +import java.lang.management.ManagementFactory +import javax.management._ + import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec @@ -308,6 +311,26 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) } +/** + * Interface for the cluster JMX MBean. + */ +trait ClusterNodeMBean { + def getMemberStatus: String + def getClusterStatus: String + def getLeader: String + + def isSingleton: Boolean + def isConvergence: Boolean + def isAvailable: Boolean + + def join(address: String) + def leave(address: String) + def down(address: String) + def remove(address: String) + + def shutdown() +} + /** * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live * and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round @@ -328,7 +351,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem) extends Extension { +class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. @@ -370,7 +393,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { private val log = Logging(system, "Node") private val random = SecureRandom.getInstance("SHA1PRNG") - log.info("Cluster Node [{}] - is JOINING cluster...", remoteAddress) + private val mBeanServer = ManagementFactory.getPlatformMBeanServer + private val clusterMBeanName = new ObjectName("akka:type=Cluster") + + log.info("Cluster Node [{}] - is starting up...", remoteAddress) // create superisor for daemons under path "/system/cluster" private val clusterDaemons = { @@ -409,7 +435,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { leaderActions() } - log.info("Cluster Node [{}] - has JOINED cluster successfully", remoteAddress) + createMBean() + + log.info("Cluster Node [{}] - has started up successfully", remoteAddress) // ====================================================== // ===================== PUBLIC API ===================== @@ -417,7 +445,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { def self: Member = latestGossip.members .find(_.address == remoteAddress) - .getOrElse(throw new IllegalStateException("Can't find 'this' Member in the cluster membership ring")) + .getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + remoteAddress + ") in the cluster membership ring")) /** * Latest gossip. @@ -437,6 +465,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { !members.isEmpty && (remoteAddress == members.head.address) } + /** + * Get the address of the current leader. + */ + def leader: Address = latestGossip.members.head.address + /** * Is this node a singleton cluster? */ @@ -464,6 +497,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() system.stop(clusterDaemons) + try { + mBeanServer.unregisterMBean(clusterMBeanName) + } catch { + case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + } } } @@ -491,7 +529,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { /** * Try to join this cluster node with the node specified by 'address'. - * A 'Join(thisNodeAddress)'' command is sent to the node to join. + * A 'Join(thisNodeAddress)' command is sent to the node to join. */ def join(address: Address) { val connection = clusterCommandConnectionFor(address) @@ -1005,4 +1043,61 @@ class Cluster(system: ExtendedActorSystem) extends Extension { private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size) private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1 + + /** + * Creates the cluster JMX MBean and registers it in the MBean server. + */ + private def createMBean() = { + val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean { + + // JMX attributes (bean-style) + + /* + * Sends a string to the JMX client that will list all nodes in the node ring as follows: + * {{{ + * Members: + * Member(address = akka://system0@localhost:5550, status = Up) + * Member(address = akka://system1@localhost:5551, status = Up) + * Unreachable: + * Member(address = akka://system2@localhost:5553, status = Down) + * }}} + */ + def getClusterStatus: String = { + val gossip = clusterNode.latestGossip + val unreachable = gossip.overview.unreachable + val metaData = gossip.meta + "\nMembers:\n\t" + gossip.members.mkString("\n\t") + + { if (!unreachable.isEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + + { if (!metaData.isEmpty) "\nMeta Data:\t" + metaData.toString else "" } + } + + def getMemberStatus: String = clusterNode.status.toString + + def getLeader: String = clusterNode.leader.toString + + def isSingleton: Boolean = clusterNode.isSingletonCluster + + def isConvergence: Boolean = clusterNode.convergence.isDefined + + def isAvailable: Boolean = clusterNode.isAvailable + + // JMX commands + + def join(address: String) = clusterNode.join(AddressFromURIString(address)) + + def leave(address: String) = clusterNode.leave(AddressFromURIString(address)) + + def down(address: String) = clusterNode.down(AddressFromURIString(address)) + + def remove(address: String) = clusterNode.remove(AddressFromURIString(address)) + + def shutdown() = clusterNode.shutdown() + } + log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName) + try { + mBeanServer.registerMBean(mbean, clusterMBeanName) + } catch { + case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) + } + } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala new file mode 100644 index 0000000000..8e4827cc09 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterMBeanSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.cluster + +import java.net.InetSocketAddress + +import akka.testkit._ +import akka.dispatch._ +import akka.actor._ +import akka.remote._ +import akka.util.duration._ + +import com.typesafe.config._ + +/** + * ***************************************************** + * + * Add these options to the sbt script: + * java \ + * -Dcom.sun.management.jmxremote.port=9999 \ + * -Dcom.sun.management.jmxremote.ssl=false \ + * -Dcom.sun.management.jmxremote.authenticate=false \ + * -Dcom.sun.management.jmxremote.password.file= + * + * Use the jmx/cmdline-jmxclient to invoke queries and operations: + * java -jar jmx/cmdline-jmxclient-0.10.3.jar - localhost:9999 akka:type=Cluster join=akka://system0@localhost:5550 + * + * ***************************************************** + */ + +class ClusterMBeanSpec extends ClusterSpec("akka.loglevel = DEBUG") with ImplicitSender { + + var node0: Cluster = _ + var system0: ActorSystemImpl = _ + + try { + "A cluster node" must { + System.setProperty("com.sun.management.jmxremote.port", "9999") + system0 = ActorSystem("system0", ConfigFactory + .parseString("akka.remote.netty.port=5550") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] + node0 = Cluster(system0) + + "be able to communicate over JMX through its ClusterMBean" taggedAs LongRunningTest in { + Thread.sleep(60.seconds.dilated.toMillis) + + // FIXME test JMX API + } + } + } catch { + case e: Exception ⇒ + e.printStackTrace + fail(e.toString) + } + + override def atTermination() { + if (node0 ne null) node0.shutdown() + if (system0 ne null) system0.shutdown() + } +} diff --git a/akka-kernel/src/main/dist/README b/akka-kernel/src/main/dist/README index b531da6164..d15368f696 100644 --- a/akka-kernel/src/main/dist/README +++ b/akka-kernel/src/main/dist/README @@ -13,7 +13,7 @@ Contents -------- - README - this document -- bin - start scripts for the Akka Microkernel +- bin - start scripts for the Akka Microkernel and Akka Cluster admin tool - config - config files for microkernel applications - deploy - deploy dir for microkernel applications - doc - Akka documentation and Scaladoc API @@ -32,3 +32,12 @@ There is a sample microkernel application included in this download. Start this application with the following command: bin/akka sample.kernel.hello.HelloKernel + + +Cluster Administration Tool +--------------------------- + +The 'akka-cluster' tool is an administration tool for managing Akka cluster nodes. +Learn more by invoking: + + bin/akka-cluster --help diff --git a/akka-kernel/src/main/dist/bin/akka-cluster b/akka-kernel/src/main/dist/bin/akka-cluster new file mode 100755 index 0000000000..ecca52fa9b --- /dev/null +++ b/akka-kernel/src/main/dist/bin/akka-cluster @@ -0,0 +1,191 @@ +#!/bin/bash + +# ============== Akka Cluster Administration Tool ============== +# +# This script is meant to be used from within the Akka distribution. +# Requires setting $AKKA_HOME to the root of the distribution. +# +# Add these options to the sbt or startup script: +# java \ +# -Dcom.sun.management.jmxremote.port=9999 \ +# -Dcom.sun.management.jmxremote.ssl=false \ +# -Dcom.sun.management.jmxremote.authenticate=false \ +# ... +# ============================================================== + +# FIXME support authentication? if so add: -Dcom.sun.management.jmxremote.password.file= AND tweak this script to support it (arg need 'user:passwd' instead of '-') + +# NOTE: The 'cmdline-jmxclient' JAR is available as part of the Akka distribution. +# Provided by Typesafe Maven Repository: http://repo.typesafe.com/typesafe/releases/cmdline-jmxclient. +JMX_CLIENT="java -jar $AKKA_HOME/lib/akka/cmdline-jmxclient-0.10.3.jar -" + +SELF=`basename $0` # script name +HOST=$1 # cluster node:port to talk to through JMX + +function ensureNodeIsRunningAndAvailable { + REPLY=$($JMX_CLIENT $HOST akka:type=Cluster Available 2>&1 >/dev/null) # redirects STDERR to STDOUT before capturing it + if [[ "$REPLY" != *true ]]; then + echo "Akka cluster node is not available on $HOST" + exit 1 + fi +} + +# switch on command +case "$2" in + + join) + if [ $# -ne 3 ]; then + echo "Usage: $SELF join " + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + ACTOR_SYSTEM_URL=$2 + echo "$HOST is JOINING cluster node $ACTOR_SYSTEM_URL" + $JMX_CLIENT $HOST akka:type=Cluster join=$ACTOR_SYSTEM_URL + ;; + + leave) + if [ $# -ne 3 ]; then + echo "Usage: $SELF leave " + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + ACTOR_SYSTEM_URL=$2 + echo "Scheduling $ACTOR_SYSTEM_URL to LEAVE cluster" + $JMX_CLIENT $HOST akka:type=Cluster leave=$ACTOR_SYSTEM_URL + ;; + + remove) + if [ $# -ne 3 ]; then + echo "Usage: $SELF remove " + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + ACTOR_SYSTEM_URL=$2 + echo "Scheduling $ACTOR_SYSTEM_URL to REMOVE" + $JMX_CLIENT $HOST akka:type=Cluster remove=$ACTOR_SYSTEM_URL + ;; + + down) + if [ $# -ne 3 ]; then + echo "Usage: $SELF down " + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + ACTOR_SYSTEM_URL=$2 + echo "Marking $ACTOR_SYSTEM_URL as DOWN" + $JMX_CLIENT $HOST akka:type=Cluster down=$ACTOR_SYSTEM_URL + ;; + + member-status) + if [ $# -ne 2 ]; then + echo "Usage: $SELF member-status" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Querying member status for $HOST" + $JMX_CLIENT $HOST akka:type=Cluster MemberStatus + ;; + + cluster-status) + if [ $# -ne 2 ]; then + echo "Usage: $SELF cluster-status" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Querying cluster status" + $JMX_CLIENT $HOST akka:type=Cluster ClusterStatus + ;; + + leader) + if [ $# -ne 2 ]; then + echo "Usage: $SELF leader" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Checking leader status" + $JMX_CLIENT $HOST akka:type=Cluster Leader + ;; + + is-singleton) + if [ $# -ne 2 ]; then + echo "Usage: $SELF is-singleton" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Checking for singleton cluster" + $JMX_CLIENT $HOST akka:type=Cluster Singleton + ;; + + has-convergence) + if [ $# -ne 2 ]; then + echo "Usage: $SELF is-convergence" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Checking for cluster convergence" + $JMX_CLIENT $HOST akka:type=Cluster Convergence + ;; + + is-available) + if [ $# -ne 2 ]; then + echo "Usage: $SELF is-available" + exit 1 + fi + + ensureNodeIsRunningAndAvailable + shift + + echo "Checking if member node on $HOST is AVAILABLE" + $JMX_CLIENT $HOST akka:type=Cluster Available + ;; + + *) + printf "Usage: $SELF ...\n" + printf "\n" + printf "Supported commands are:\n" + printf "%26s - %s\n" "join " "Sends request a JOIN node with the specified URL" + printf "%26s - %s\n" "leave " "Sends a request for node with URL to LEAVE the cluster" + printf "%26s - %s\n" "remove " "Sends a request for node with URL to be instantly REMOVED from the cluster" + printf "%26s - %s\n" "down " "Sends a request for marking node with URL as DOWN" + printf "%26s - %s\n" member-status "Asks the member node for its current status" + printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)" + printf "%26s - %s\n" leader "Asks the cluster who the current leader is" + printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)" + printf "%26s - %s\n" is-available "Checks if the member node is available" + printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence" + printf "Where the should be on the format of 'akka://actor-system-name@hostname:port'\n" + printf "\n" + printf "Examples: $SELF localhost:9999 is-available\n" + printf " $SELF localhost:9999 join akka://MySystem@darkstar:2552\n" + printf " $SELF localhost:9999 cluster-status\n" + exit 1 + ;; +esac diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 19248d3a79..c21ca76d73 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -239,7 +239,7 @@ object AkkaBuild extends Build { libraryDependencies ++= Dependencies.kernel ) ) - + lazy val camel = Project( id = "akka-camel", base = file("akka-camel"), @@ -316,13 +316,6 @@ object AkkaBuild extends Build { ) ) - // lazy val secondTutorial = Project( - // id = "akka-tutorial-second", - // base = file("akka-tutorials/akka-tutorial-second"), - // dependencies = Seq(actor), - // settings = defaultSettings - // ) - lazy val docs = Project( id = "akka-docs", base = file("akka-docs"), @@ -439,7 +432,7 @@ object Dependencies { Test.zookeeper, Test.log4j // needed for ZkBarrier in multi-jvm tests ) - val cluster = Seq(Test.junit, Test.scalatest) + val cluster = Seq(Test.junit, Test.scalatest) val slf4j = Seq(slf4jApi, Test.logback) @@ -474,9 +467,9 @@ object Dependencies { val spring = Seq(springBeans, springContext, Test.junit, Test.scalatest) - val kernel = Seq(Test.scalatest, Test.junit) + val kernel = Seq(jmxClient, Test.scalatest, Test.junit) - val camel = Seq(Test.scalatest, Test.junit, Test.mockito, camelCore) + val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito) // TODO: resolve Jetty version conflict // val sampleCamel = Seq(camelCore, camelSpring, commonsCodec, Runtime.camelJms, Runtime.activemq, Runtime.springJms, @@ -486,7 +479,7 @@ object Dependencies { val docs = Seq(Test.scalatest, Test.junit) - val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ) + val zeroMQ = Seq(protobuf, Dependency.zeroMQ, Test.scalatest, Test.junit) } object Dependency { @@ -514,6 +507,7 @@ object Dependency { val commonsCodec = "commons-codec" % "commons-codec" % "1.4" // ApacheV2 val commonsIo = "commons-io" % "commons-io" % "2.0.1" // ApacheV2 val commonsPool = "commons-pool" % "commons-pool" % "1.5.6" // ApacheV2 + val jmxClient = "cmdline-jmxclient" % "cmdline-jmxclient" % "0.10.3" // LGPL val mongoAsync = "com.mongodb.async" % "mongo-driver_2.9.0-1" % "0.2.9-1" // ApacheV2 val netty = "io.netty" % "netty" % V.Netty // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD