This commit is contained in:
Amir Moulavi 2012-04-24 12:28:19 +02:00
commit ae28ffc56f
5 changed files with 370 additions and 18 deletions

View file

@ -20,6 +20,9 @@ import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
import java.security.SecureRandom
import java.lang.management.ManagementFactory
import javax.management._
import scala.collection.immutable.{ Map, SortedSet }
import scala.annotation.tailrec
@ -308,6 +311,26 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
}
/**
* Interface for the cluster JMX MBean.
*/
trait ClusterNodeMBean {
def getMemberStatus: String
def getClusterStatus: String
def getLeader: String
def isSingleton: Boolean
def isConvergence: Boolean
def isAvailable: Boolean
def join(address: String)
def leave(address: String)
def down(address: String)
def remove(address: String)
def shutdown()
}
/**
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
@ -328,7 +351,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
* if (Cluster(system).isLeader) { ... }
* }}}
*/
class Cluster(system: ExtendedActorSystem) extends Extension {
class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode
/**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
@ -370,7 +393,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
private val log = Logging(system, "Node")
private val random = SecureRandom.getInstance("SHA1PRNG")
log.info("Cluster Node [{}] - is JOINING cluster...", remoteAddress)
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
log.info("Cluster Node [{}] - is starting up...", remoteAddress)
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
@ -409,7 +435,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
leaderActions()
}
log.info("Cluster Node [{}] - has JOINED cluster successfully", remoteAddress)
createMBean()
log.info("Cluster Node [{}] - has started up successfully", remoteAddress)
// ======================================================
// ===================== PUBLIC API =====================
@ -417,7 +445,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
def self: Member = latestGossip.members
.find(_.address == remoteAddress)
.getOrElse(throw new IllegalStateException("Can't find 'this' Member in the cluster membership ring"))
.getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + remoteAddress + ") in the cluster membership ring"))
/**
* Latest gossip.
@ -437,6 +465,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
!members.isEmpty && (remoteAddress == members.head.address)
}
/**
* Get the address of the current leader.
*/
def leader: Address = latestGossip.members.head.address
/**
* Is this node a singleton cluster?
*/
@ -464,6 +497,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
failureDetectorReaperCanceller.cancel()
leaderActionsCanceller.cancel()
system.stop(clusterDaemons)
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {
case e: InstanceNotFoundException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
}
}
@ -491,7 +529,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)'' command is sent to the node to join.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address) {
val connection = clusterCommandConnectionFor(address)
@ -1005,4 +1043,61 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size)
private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1
/**
* Creates the cluster JMX MBean and registers it in the MBean server.
*/
private def createMBean() = {
val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
// JMX attributes (bean-style)
/*
* Sends a string to the JMX client that will list all nodes in the node ring as follows:
* {{{
* Members:
* Member(address = akka://system0@localhost:5550, status = Up)
* Member(address = akka://system1@localhost:5551, status = Up)
* Unreachable:
* Member(address = akka://system2@localhost:5553, status = Down)
* }}}
*/
def getClusterStatus: String = {
val gossip = clusterNode.latestGossip
val unreachable = gossip.overview.unreachable
val metaData = gossip.meta
"\nMembers:\n\t" + gossip.members.mkString("\n\t") +
{ if (!unreachable.isEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } +
{ if (!metaData.isEmpty) "\nMeta Data:\t" + metaData.toString else "" }
}
def getMemberStatus: String = clusterNode.status.toString
def getLeader: String = clusterNode.leader.toString
def isSingleton: Boolean = clusterNode.isSingletonCluster
def isConvergence: Boolean = clusterNode.convergence.isDefined
def isAvailable: Boolean = clusterNode.isAvailable
// JMX commands
def join(address: String) = clusterNode.join(AddressFromURIString(address))
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
def down(address: String) = clusterNode.down(AddressFromURIString(address))
def remove(address: String) = clusterNode.remove(AddressFromURIString(address))
def shutdown() = clusterNode.shutdown()
}
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName)
try {
mBeanServer.registerMBean(mbean, clusterMBeanName)
} catch {
case e: InstanceAlreadyExistsException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
}
}

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.net.InetSocketAddress
import akka.testkit._
import akka.dispatch._
import akka.actor._
import akka.remote._
import akka.util.duration._
import com.typesafe.config._
/**
* *****************************************************
*
* Add these options to the sbt script:
* java \
* -Dcom.sun.management.jmxremote.port=9999 \
* -Dcom.sun.management.jmxremote.ssl=false \
* -Dcom.sun.management.jmxremote.authenticate=false \
* -Dcom.sun.management.jmxremote.password.file=<path to file>
*
* Use the jmx/cmdline-jmxclient to invoke queries and operations:
* java -jar jmx/cmdline-jmxclient-0.10.3.jar - localhost:9999 akka:type=Cluster join=akka://system0@localhost:5550
*
* *****************************************************
*/
class ClusterMBeanSpec extends ClusterSpec("akka.loglevel = DEBUG") with ImplicitSender {
var node0: Cluster = _
var system0: ActorSystemImpl = _
try {
"A cluster node" must {
System.setProperty("com.sun.management.jmxremote.port", "9999")
system0 = ActorSystem("system0", ConfigFactory
.parseString("akka.remote.netty.port=5550")
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
node0 = Cluster(system0)
"be able to communicate over JMX through its ClusterMBean" taggedAs LongRunningTest in {
Thread.sleep(60.seconds.dilated.toMillis)
// FIXME test JMX API
}
}
} catch {
case e: Exception
e.printStackTrace
fail(e.toString)
}
override def atTermination() {
if (node0 ne null) node0.shutdown()
if (system0 ne null) system0.shutdown()
}
}

View file

@ -13,7 +13,7 @@ Contents
--------
- README - this document
- bin - start scripts for the Akka Microkernel
- bin - start scripts for the Akka Microkernel and Akka Cluster admin tool
- config - config files for microkernel applications
- deploy - deploy dir for microkernel applications
- doc - Akka documentation and Scaladoc API
@ -32,3 +32,12 @@ There is a sample microkernel application included in this download.
Start this application with the following command:
bin/akka sample.kernel.hello.HelloKernel
Cluster Administration Tool
---------------------------
The 'akka-cluster' tool is an administration tool for managing Akka cluster nodes.
Learn more by invoking:
bin/akka-cluster --help

191
akka-kernel/src/main/dist/bin/akka-cluster vendored Executable file
View file

@ -0,0 +1,191 @@
#!/bin/bash
# ============== Akka Cluster Administration Tool ==============
#
# This script is meant to be used from within the Akka distribution.
# Requires setting $AKKA_HOME to the root of the distribution.
#
# Add these options to the sbt or startup script:
# java \
# -Dcom.sun.management.jmxremote.port=9999 \
# -Dcom.sun.management.jmxremote.ssl=false \
# -Dcom.sun.management.jmxremote.authenticate=false \
# ...
# ==============================================================
# FIXME support authentication? if so add: -Dcom.sun.management.jmxremote.password.file=<path to file> AND tweak this script to support it (arg need 'user:passwd' instead of '-')
# NOTE: The 'cmdline-jmxclient' JAR is available as part of the Akka distribution.
# Provided by Typesafe Maven Repository: http://repo.typesafe.com/typesafe/releases/cmdline-jmxclient.
JMX_CLIENT="java -jar $AKKA_HOME/lib/akka/cmdline-jmxclient-0.10.3.jar -"
SELF=`basename $0` # script name
HOST=$1 # cluster node:port to talk to through JMX
function ensureNodeIsRunningAndAvailable {
REPLY=$($JMX_CLIENT $HOST akka:type=Cluster Available 2>&1 >/dev/null) # redirects STDERR to STDOUT before capturing it
if [[ "$REPLY" != *true ]]; then
echo "Akka cluster node is not available on $HOST"
exit 1
fi
}
# switch on command
case "$2" in
join)
if [ $# -ne 3 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> join <actor-system-url-to-join>"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
ACTOR_SYSTEM_URL=$2
echo "$HOST is JOINING cluster node $ACTOR_SYSTEM_URL"
$JMX_CLIENT $HOST akka:type=Cluster join=$ACTOR_SYSTEM_URL
;;
leave)
if [ $# -ne 3 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> leave <actor-system-url-to-join>"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
ACTOR_SYSTEM_URL=$2
echo "Scheduling $ACTOR_SYSTEM_URL to LEAVE cluster"
$JMX_CLIENT $HOST akka:type=Cluster leave=$ACTOR_SYSTEM_URL
;;
remove)
if [ $# -ne 3 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> remove <actor-system-url-to-join>"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
ACTOR_SYSTEM_URL=$2
echo "Scheduling $ACTOR_SYSTEM_URL to REMOVE"
$JMX_CLIENT $HOST akka:type=Cluster remove=$ACTOR_SYSTEM_URL
;;
down)
if [ $# -ne 3 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> down <actor-system-url-to-join>"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
ACTOR_SYSTEM_URL=$2
echo "Marking $ACTOR_SYSTEM_URL as DOWN"
$JMX_CLIENT $HOST akka:type=Cluster down=$ACTOR_SYSTEM_URL
;;
member-status)
if [ $# -ne 2 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> member-status"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
echo "Querying member status for $HOST"
$JMX_CLIENT $HOST akka:type=Cluster MemberStatus
;;
cluster-status)
if [ $# -ne 2 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> cluster-status"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
echo "Querying cluster status"
$JMX_CLIENT $HOST akka:type=Cluster ClusterStatus
;;
leader)
if [ $# -ne 2 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> leader"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
echo "Checking leader status"
$JMX_CLIENT $HOST akka:type=Cluster Leader
;;
is-singleton)
if [ $# -ne 2 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> is-singleton"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
echo "Checking for singleton cluster"
$JMX_CLIENT $HOST akka:type=Cluster Singleton
;;
has-convergence)
if [ $# -ne 2 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> is-convergence"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
echo "Checking for cluster convergence"
$JMX_CLIENT $HOST akka:type=Cluster Convergence
;;
is-available)
if [ $# -ne 2 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> is-available"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
echo "Checking if member node on $HOST is AVAILABLE"
$JMX_CLIENT $HOST akka:type=Cluster Available
;;
*)
printf "Usage: $SELF <node-hostname:jmx-port> <command> ...\n"
printf "\n"
printf "Supported commands are:\n"
printf "%26s - %s\n" "join <actor-system-url>" "Sends request a JOIN node with the specified URL"
printf "%26s - %s\n" "leave <actor-system-url>" "Sends a request for node with URL to LEAVE the cluster"
printf "%26s - %s\n" "remove <actor-system-url>" "Sends a request for node with URL to be instantly REMOVED from the cluster"
printf "%26s - %s\n" "down <actor-system-url>" "Sends a request for marking node with URL as DOWN"
printf "%26s - %s\n" member-status "Asks the member node for its current status"
printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)"
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
printf "%26s - %s\n" is-available "Checks if the member node is available"
printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence"
printf "Where the <actor-system-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
printf "\n"
printf "Examples: $SELF localhost:9999 is-available\n"
printf " $SELF localhost:9999 join akka://MySystem@darkstar:2552\n"
printf " $SELF localhost:9999 cluster-status\n"
exit 1
;;
esac

View file

@ -239,7 +239,7 @@ object AkkaBuild extends Build {
libraryDependencies ++= Dependencies.kernel
)
)
lazy val camel = Project(
id = "akka-camel",
base = file("akka-camel"),
@ -316,13 +316,6 @@ object AkkaBuild extends Build {
)
)
// lazy val secondTutorial = Project(
// id = "akka-tutorial-second",
// base = file("akka-tutorials/akka-tutorial-second"),
// dependencies = Seq(actor),
// settings = defaultSettings
// )
lazy val docs = Project(
id = "akka-docs",
base = file("akka-docs"),
@ -439,7 +432,7 @@ object Dependencies {
Test.zookeeper, Test.log4j // needed for ZkBarrier in multi-jvm tests
)
val cluster = Seq(Test.junit, Test.scalatest)
val cluster = Seq(Test.junit, Test.scalatest)
val slf4j = Seq(slf4jApi, Test.logback)
@ -474,9 +467,9 @@ object Dependencies {
val spring = Seq(springBeans, springContext, Test.junit, Test.scalatest)
val kernel = Seq(Test.scalatest, Test.junit)
val kernel = Seq(jmxClient, Test.scalatest, Test.junit)
val camel = Seq(Test.scalatest, Test.junit, Test.mockito, camelCore)
val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito)
// TODO: resolve Jetty version conflict
// val sampleCamel = Seq(camelCore, camelSpring, commonsCodec, Runtime.camelJms, Runtime.activemq, Runtime.springJms,
@ -486,7 +479,7 @@ object Dependencies {
val docs = Seq(Test.scalatest, Test.junit)
val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ)
val zeroMQ = Seq(protobuf, Dependency.zeroMQ, Test.scalatest, Test.junit)
}
object Dependency {
@ -514,6 +507,7 @@ object Dependency {
val commonsCodec = "commons-codec" % "commons-codec" % "1.4" // ApacheV2
val commonsIo = "commons-io" % "commons-io" % "2.0.1" // ApacheV2
val commonsPool = "commons-pool" % "commons-pool" % "1.5.6" // ApacheV2
val jmxClient = "cmdline-jmxclient" % "cmdline-jmxclient" % "0.10.3" // LGPL
val mongoAsync = "com.mongodb.async" % "mongo-driver_2.9.0-1" % "0.2.9-1" // ApacheV2
val netty = "io.netty" % "netty" % V.Netty // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD