Initial commit of 'akka-cluster' CLI tool working together with the new Cluster JMX API

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-04-14 20:06:03 +02:00
parent e19b258ce0
commit ecc949441a
4 changed files with 329 additions and 3 deletions

View file

@ -20,6 +20,9 @@ import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
import java.security.SecureRandom
import java.lang.management.ManagementFactory
import javax.management._
import scala.collection.immutable.{ Map, SortedSet }
import scala.annotation.tailrec
@ -308,6 +311,27 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
}
/**
* Interface for the cluster JMX MBean.
*/
trait ClusterNodeMBean {
def getMemberStatus: String
def getClusterStatus: String
def isLeader: Boolean
def isSingleton: Boolean
def isConvergence: Boolean
def isAvailable: Boolean
def ping(): String
def join(address: String)
def leave(address: String)
def down(address: String)
def remove(address: String)
def shutdown()
}
/**
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
@ -328,7 +352,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
* if (Cluster(system).isLeader) { ... }
* }}}
*/
class Cluster(system: ExtendedActorSystem) extends Extension {
class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode
/**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
@ -370,7 +394,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
private val log = Logging(system, "Node")
private val random = SecureRandom.getInstance("SHA1PRNG")
log.info("Cluster Node [{}] - is JOINING cluster...", remoteAddress)
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
log.info("Cluster Node [{}] - is starting up...", remoteAddress)
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
@ -409,12 +436,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
leaderActions()
}
log.info("Cluster Node [{}] - has JOINED cluster successfully", remoteAddress)
createMBean()
log.info("Cluster Node [{}] - has started up successfully", remoteAddress)
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
def ping = "pong"
def self: Member = latestGossip.members
.find(_.address == remoteAddress)
.getOrElse(throw new IllegalStateException("Can't find 'this' Member in the cluster membership ring"))
@ -464,6 +495,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
failureDetectorReaperCanceller.cancel()
leaderActionsCanceller.cancel()
system.stop(clusterDaemons)
mBeanServer.unregisterMBean(clusterMBeanName)
}
}
@ -1005,4 +1037,32 @@ class Cluster(system: ExtendedActorSystem) extends Extension {
private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size)
private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1
/**
* Creates the cluster JMX MBean and registers it in the MBean server.
*/
private def createMBean() = {
val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
def getMemberStatus: String = clusterNode.status.toString
// FIXME clean up: Gossip(overview = GossipOverview(seen = [], unreachable = []), members = [Member(address = akka://system0@localhost:5550, status = Joining)], meta = [], version = VectorClock(Node(df2691d6cc6779dc2555316f557b5fa4) -> 00000136b164746d))
def getClusterStatus: String = clusterNode.latestGossip.toString
def isLeader: Boolean = clusterNode.isLeader
def isSingleton: Boolean = clusterNode.isSingletonCluster
def isConvergence: Boolean = clusterNode.convergence.isDefined
def isAvailable: Boolean = clusterNode.isAvailable
def ping(): String = clusterNode.ping
// FIXME return error message if failure
def join(address: String) = clusterNode.join(AddressFromURIString(address))
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
def down(address: String) = clusterNode.down(AddressFromURIString(address))
def remove(address: String) = clusterNode.remove(AddressFromURIString(address))
def shutdown() = clusterNode.shutdown()
}
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName)
mBeanServer.registerMBean(mbean, clusterMBeanName)
}
}