Additions for Java API of cluster, see #2502

This commit is contained in:
Patrik Nordwall 2012-10-04 10:55:22 +02:00
parent af184250cc
commit acdafa0cd3
3 changed files with 92 additions and 17 deletions

View file

@ -32,7 +32,38 @@ object ClusterEvent {
unreachable: Set[Member] = Set.empty,
convergence: Boolean = false,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None) extends ClusterDomainEvent
leader: Option[Address] = None) extends ClusterDomainEvent {
/**
* Java API
*/
def getMembers: java.lang.Iterable[Member] = {
import scala.collection.JavaConverters._
members.asJava
}
/**
* Java API
*/
def getUnreachable: java.util.Set[Member] = {
import scala.collection.JavaConverters._
unreachable.asJava
}
/**
* Java API
*/
def getSeenBy: java.util.Set[Address] = {
import scala.collection.JavaConverters._
seenBy.asJava
}
/**
* Java API
* @return address of current leader, or null if none
*/
def getLeader: Address = leader orNull
}
/**
* Marker interface for member related events.
@ -88,11 +119,6 @@ object ClusterEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
}
/**
* Current snapshot of cluster member metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent
/**
* Cluster convergence state changed.
*/
@ -101,7 +127,20 @@ object ClusterEvent {
/**
* Leader of the cluster members changed. Only published after convergence.
*/
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
/**
* Java API
* @return address of current leader, or null if none
*/
def getLeader: Address = leader orNull
}
/**
* INTERNAL API
*
* Current snapshot of cluster member metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent
/**
* INTERNAL API

View file

@ -22,6 +22,8 @@ import java.lang.reflect.Method
import java.lang.System.{ currentTimeMillis newTimestamp }
/**
* INTERNAL API.
*
* This strategy is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities,
* and publishes the latest cluster metrics data around the node ring to assist in determining
@ -32,8 +34,6 @@ import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
* Calculation of statistical data for each monitored process is delegated to the
* [[akka.cluster.DataStream]] for exponential smoothing, with additional decay factor.
*
* INTERNAL API.
*
* @author Helena Edelson
*/
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
@ -119,8 +119,6 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
* Samples the latest metrics for the node, updates metrics statistics in
* [[akka.cluster.MetricsGossip]], and publishes the change to the event bus.
*
* INTERNAL API
*
* @see [[akka.cluster.ClusterMetricsCollector.collect( )]]
*/
def collect(): Unit = {
@ -220,8 +218,8 @@ private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetri
}
/**
* Envelope adding a sender address to the gossip.
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
@ -277,6 +275,8 @@ private[cluster] case class DataStream(decay: Int, ewma: ScalaNumber, startTime:
}
/**
* INTERNAL API
*
* Companion object of DataStream class.
*
* @author Helena Edelson
@ -348,6 +348,8 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumber], ave
}
/**
* INTERNAL API
*
* Companion object of Metric class.
*
* @author Helena Edelson
@ -372,6 +374,8 @@ private[cluster] object Metric extends MetricNumericConverter {
}
/**
* INTERNAL API
*
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
@ -409,11 +413,11 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
}
/**
* INTERNAL API
*
* Encapsulates evaluation of validity of metric values, conversion of an actual metric value to
* a [[akka.cluster.Metric]] for consumption by subscribed cluster entities.
*
* INTERNAL API
*
* @author Helena Edelson
*/
private[cluster] trait MetricNumericConverter {
@ -439,13 +443,13 @@ private[cluster] trait MetricNumericConverter {
}
/**
* INTERNAL API
*
* Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this
* loads wider and more accurate range of metrics in combination with SIGAR's native OS library.
*
* FIXME switch to Scala reflection
*
* INTERNAL API
*
* @param sigar the optional org.hyperic.Sigar instance
*
* @param address The [[akka.actor.Address]] of the node being sampled
@ -562,6 +566,8 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
}
/**
* INTERNAL API
*
* Companion object of MetricsCollector class.
*
* @author Helena Edelson

View file

@ -87,7 +87,7 @@ object Member {
*
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage {
abstract class MemberStatus extends ClusterMessage {
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
@ -102,4 +102,34 @@ object MemberStatus {
case object Exiting extends MemberStatus
case object Down extends MemberStatus
case object Removed extends MemberStatus
/**
* JAVA API
*/
def joining: Object = Joining
/**
* JAVA API
*/
def up: Object = Up
/**
* JAVA API
*/
def leaving: Object = Leaving
/**
* JAVA API
*/
def exiting: Object = Exiting
/**
* JAVA API
*/
def down: Object = Down
/**
* JAVA API
*/
def removed: Object = Removed
}