Incorporate review comments and cleanup isAvailable, see #2018
* Renamed isRunning to isTerminated (with negation of course) * Removed Running from JMX API, since the mbean is deregistered anyway * Cleanup isAvailable, isUnavailbe * Misc minor
This commit is contained in:
parent
a7b7ab040d
commit
1df787d0c5
12 changed files with 38 additions and 59 deletions
|
|
@ -67,7 +67,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
format(system, other.getClass.getName))
|
format(system, other.getClass.getName))
|
||||||
}
|
}
|
||||||
|
|
||||||
private val _isRunning = new AtomicBoolean(true)
|
private val _isTerminated = new AtomicBoolean(false)
|
||||||
private val log = Logging(system, "Cluster")
|
private val log = Logging(system, "Cluster")
|
||||||
|
|
||||||
log.info("Cluster Node [{}] - is starting up...", selfAddress)
|
log.info("Cluster Node [{}] - is starting up...", selfAddress)
|
||||||
|
|
@ -169,9 +169,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
// ======================================================
|
// ======================================================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the cluster node is up and running, false if it is shut down.
|
* Returns true if this cluster instance has be shutdown.
|
||||||
*/
|
*/
|
||||||
def isRunning: Boolean = _isRunning.get
|
def isTerminated: Boolean = _isTerminated.get
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribe to cluster domain events.
|
* Subscribe to cluster domain events.
|
||||||
|
|
@ -253,7 +253,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
|
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
|
||||||
*/
|
*/
|
||||||
private[cluster] def shutdown(): Unit = {
|
private[cluster] def shutdown(): Unit = {
|
||||||
if (_isRunning.compareAndSet(true, false)) {
|
if (_isTerminated.compareAndSet(false, true)) {
|
||||||
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
|
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
|
||||||
|
|
||||||
system.stop(clusterDaemons)
|
system.stop(clusterDaemons)
|
||||||
|
|
|
||||||
|
|
@ -763,7 +763,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
|
|
||||||
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
|
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
|
||||||
|
|
||||||
def isAvailable: Boolean = latestGossip.isAvailable(selfAddress)
|
def isAvailable: Boolean = !latestGossip.isUnreachable(selfAddress)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gossips latest gossip to a random member in the set of members passed in as argument.
|
* Gossips latest gossip to a random member in the set of members passed in as argument.
|
||||||
|
|
|
||||||
|
|
@ -24,11 +24,13 @@ trait ClusterNodeMBean {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Comma separated addresses of member nodes, sorted in the cluster ring order.
|
* Comma separated addresses of member nodes, sorted in the cluster ring order.
|
||||||
|
* The address format is `akka://actor-system-name@hostname:port`
|
||||||
*/
|
*/
|
||||||
def getMembers: String
|
def getMembers: String
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Comma separated addresses of unreachable member nodes.
|
* Comma separated addresses of unreachable member nodes.
|
||||||
|
* The address format is `akka://actor-system-name@hostname:port`
|
||||||
*/
|
*/
|
||||||
def getUnreachable: String
|
def getUnreachable: String
|
||||||
|
|
||||||
|
|
@ -46,6 +48,7 @@ trait ClusterNodeMBean {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the address of the current leader.
|
* Get the address of the current leader.
|
||||||
|
* The address format is `akka://actor-system-name@hostname:port`
|
||||||
*/
|
*/
|
||||||
def getLeader: String
|
def getLeader: String
|
||||||
|
|
||||||
|
|
@ -55,25 +58,27 @@ trait ClusterNodeMBean {
|
||||||
def isSingleton: Boolean
|
def isSingleton: Boolean
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the node is UP or JOINING.
|
* Returns true if the node is not unreachable and not `Down`
|
||||||
|
* and not `Removed`.
|
||||||
*/
|
*/
|
||||||
def isAvailable: Boolean
|
def isAvailable: Boolean
|
||||||
/**
|
|
||||||
* Returns true if the cluster node is up and running, false if it is shut down.
|
|
||||||
*/
|
|
||||||
def isRunning: Boolean
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to join this cluster node with the node specified by 'address'.
|
* Try to join this cluster node with the node specified by 'address'.
|
||||||
|
* The address format is `akka://actor-system-name@hostname:port`.
|
||||||
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
||||||
*/
|
*/
|
||||||
def join(address: String)
|
def join(address: String)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send command to issue state transition to LEAVING for the node specified by 'address'.
|
* Send command to issue state transition to LEAVING for the node specified by 'address'.
|
||||||
|
* The address format is `akka://actor-system-name@hostname:port`
|
||||||
*/
|
*/
|
||||||
def leave(address: String)
|
def leave(address: String)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send command to DOWN the node specified by 'address'.
|
* Send command to DOWN the node specified by 'address'.
|
||||||
|
* The address format is `akka://actor-system-name@hostname:port`
|
||||||
*/
|
*/
|
||||||
def down(address: String)
|
def down(address: String)
|
||||||
}
|
}
|
||||||
|
|
@ -102,21 +107,19 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMembers: String =
|
def getMembers: String =
|
||||||
clusterView.members.toSeq.map(_.address).mkString(", ")
|
clusterView.members.toSeq.map(_.address).mkString(",")
|
||||||
|
|
||||||
def getUnreachable: String =
|
def getUnreachable: String =
|
||||||
clusterView.unreachableMembers.map(_.address).mkString(", ")
|
clusterView.unreachableMembers.map(_.address).mkString(",")
|
||||||
|
|
||||||
def getMemberStatus: String = clusterView.status.toString
|
def getMemberStatus: String = clusterView.status.toString
|
||||||
|
|
||||||
def getLeader: String = clusterView.leader.getOrElse("").toString
|
def getLeader: String = clusterView.leader.fold("")(_.toString)
|
||||||
|
|
||||||
def isSingleton: Boolean = clusterView.isSingletonCluster
|
def isSingleton: Boolean = clusterView.isSingletonCluster
|
||||||
|
|
||||||
def isAvailable: Boolean = clusterView.isAvailable
|
def isAvailable: Boolean = clusterView.isAvailable
|
||||||
|
|
||||||
def isRunning: Boolean = clusterView.isRunning
|
|
||||||
|
|
||||||
// JMX commands
|
// JMX commands
|
||||||
|
|
||||||
def join(address: String) = cluster.join(AddressFromURIString(address))
|
def join(address: String) = cluster.join(AddressFromURIString(address))
|
||||||
|
|
|
||||||
|
|
@ -74,9 +74,9 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the cluster node is up and running, false if it is shut down.
|
* Returns true if this cluster instance has be shutdown.
|
||||||
*/
|
*/
|
||||||
def isRunning: Boolean = cluster.isRunning
|
def isTerminated: Boolean = cluster.isTerminated
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Current cluster members, sorted by address.
|
* Current cluster members, sorted by address.
|
||||||
|
|
@ -118,11 +118,14 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
||||||
def convergence: Boolean = state.convergence
|
def convergence: Boolean = state.convergence
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the node is UP or JOINING.
|
* Returns true if the node is not unreachable and not `Down`
|
||||||
|
* and not `Removed`.
|
||||||
*/
|
*/
|
||||||
def isAvailable: Boolean = {
|
def isAvailable: Boolean = {
|
||||||
val myself = self
|
val myself = self
|
||||||
!unreachableMembers.contains(myself) && !myself.status.isUnavailable
|
!unreachableMembers.contains(myself) &&
|
||||||
|
myself.status != MemberStatus.Down &&
|
||||||
|
myself.status != MemberStatus.Removed
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -168,15 +168,10 @@ private[cluster] case class Gossip(
|
||||||
def isSingletonCluster: Boolean = members.size == 1
|
def isSingletonCluster: Boolean = members.size == 1
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the node is UP or JOINING.
|
* Returns true if the node is in the unreachable set
|
||||||
*/
|
*/
|
||||||
def isAvailable(address: Address): Boolean = !isUnavailable(address)
|
def isUnreachable(address: Address): Boolean =
|
||||||
|
overview.unreachable exists { _.address == address }
|
||||||
def isUnavailable(address: Address): Boolean = {
|
|
||||||
val isUnreachable = overview.unreachable exists { _.address == address }
|
|
||||||
val hasUnavailableMemberStatus = members exists { m ⇒ m.status.isUnavailable && m.address == address }
|
|
||||||
isUnreachable || hasUnavailableMemberStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
def member(address: Address): Member = {
|
def member(address: Address): Member = {
|
||||||
members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)).
|
members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)).
|
||||||
|
|
|
||||||
|
|
@ -87,13 +87,7 @@ object Member {
|
||||||
*
|
*
|
||||||
* Can be one of: Joining, Up, Leaving, Exiting and Down.
|
* Can be one of: Joining, Up, Leaving, Exiting and Down.
|
||||||
*/
|
*/
|
||||||
abstract class MemberStatus extends ClusterMessage {
|
abstract class MemberStatus extends ClusterMessage
|
||||||
|
|
||||||
/**
|
|
||||||
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
|
|
||||||
*/
|
|
||||||
def isUnavailable: Boolean = this == Down
|
|
||||||
}
|
|
||||||
|
|
||||||
object MemberStatus {
|
object MemberStatus {
|
||||||
case object Joining extends MemberStatus
|
case object Joining extends MemberStatus
|
||||||
|
|
|
||||||
|
|
@ -57,7 +57,7 @@ abstract class LeaderLeavingSpec
|
||||||
enterBarrier("leader-left")
|
enterBarrier("leader-left")
|
||||||
|
|
||||||
// verify that the LEADER is shut down
|
// verify that the LEADER is shut down
|
||||||
awaitCond(!cluster.isRunning)
|
awaitCond(cluster.isTerminated)
|
||||||
|
|
||||||
// verify that the LEADER is REMOVED
|
// verify that the LEADER is REMOVED
|
||||||
awaitCond(clusterView.status == Removed)
|
awaitCond(clusterView.status == Removed)
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ abstract class MBeanSpec
|
||||||
"expose attributes" taggedAs LongRunningTest in {
|
"expose attributes" taggedAs LongRunningTest in {
|
||||||
val info = mbeanServer.getMBeanInfo(mbeanName)
|
val info = mbeanServer.getMBeanInfo(mbeanName)
|
||||||
info.getAttributes.map(_.getName).toSet must be(Set(
|
info.getAttributes.map(_.getName).toSet must be(Set(
|
||||||
"ClusterStatus", "Members", "Unreachable", "MemberStatus", "Leader", "Singleton", "Available", "Running"))
|
"ClusterStatus", "Members", "Unreachable", "MemberStatus", "Leader", "Singleton", "Available"))
|
||||||
enterBarrier("after-1")
|
enterBarrier("after-1")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -58,8 +58,7 @@ abstract class MBeanSpec
|
||||||
|
|
||||||
"change attributes after startup" taggedAs LongRunningTest in {
|
"change attributes after startup" taggedAs LongRunningTest in {
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
mbeanServer.getAttribute(mbeanName, "Running").asInstanceOf[Boolean] must be(true)
|
mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] must be(false)
|
||||||
mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] must be(true)
|
|
||||||
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false)
|
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false)
|
||||||
mbeanServer.getAttribute(mbeanName, "Leader") must be("")
|
mbeanServer.getAttribute(mbeanName, "Leader") must be("")
|
||||||
mbeanServer.getAttribute(mbeanName, "Members") must be("")
|
mbeanServer.getAttribute(mbeanName, "Members") must be("")
|
||||||
|
|
@ -73,6 +72,7 @@ abstract class MBeanSpec
|
||||||
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(true)
|
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(true)
|
||||||
mbeanServer.getAttribute(mbeanName, "Members") must be(address(first).toString)
|
mbeanServer.getAttribute(mbeanName, "Members") must be(address(first).toString)
|
||||||
mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")
|
mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")
|
||||||
|
mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] must be(true)
|
||||||
}
|
}
|
||||||
enterBarrier("after-3")
|
enterBarrier("after-3")
|
||||||
}
|
}
|
||||||
|
|
@ -86,7 +86,7 @@ abstract class MBeanSpec
|
||||||
awaitUpConvergence(4)
|
awaitUpConvergence(4)
|
||||||
assertMembers(clusterView.members, roles.map(address(_)): _*)
|
assertMembers(clusterView.members, roles.map(address(_)): _*)
|
||||||
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up")
|
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up")
|
||||||
val expectedMembers = roles.sorted.map(address(_)).mkString(", ")
|
val expectedMembers = roles.sorted.map(address(_)).mkString(",")
|
||||||
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
||||||
val expectedLeader = address(roleOfLeader())
|
val expectedLeader = address(roleOfLeader())
|
||||||
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == expectedLeader.toString)
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == expectedLeader.toString)
|
||||||
|
|
@ -104,7 +104,7 @@ abstract class MBeanSpec
|
||||||
|
|
||||||
runOn(first, second, third) {
|
runOn(first, second, third) {
|
||||||
awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == fourthAddress.toString)
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == fourthAddress.toString)
|
||||||
val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(", ")
|
val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(",")
|
||||||
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
||||||
}
|
}
|
||||||
enterBarrier("fourth-unreachable")
|
enterBarrier("fourth-unreachable")
|
||||||
|
|
@ -130,11 +130,11 @@ abstract class MBeanSpec
|
||||||
runOn(first, second) {
|
runOn(first, second) {
|
||||||
awaitUpConvergence(2)
|
awaitUpConvergence(2)
|
||||||
assertMembers(clusterView.members, first, second)
|
assertMembers(clusterView.members, first, second)
|
||||||
val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(", ")
|
val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(",")
|
||||||
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
|
||||||
}
|
}
|
||||||
runOn(third) {
|
runOn(third) {
|
||||||
awaitCond(!cluster.isRunning)
|
awaitCond(cluster.isTerminated)
|
||||||
// mbean should be unregistered, i.e. throw InstanceNotFoundException
|
// mbean should be unregistered, i.e. throw InstanceNotFoundException
|
||||||
awaitCond(Try { mbeanServer.getMBeanInfo(mbeanName); false } recover {
|
awaitCond(Try { mbeanServer.getMBeanInfo(mbeanName); false } recover {
|
||||||
case e: InstanceNotFoundException ⇒ true
|
case e: InstanceNotFoundException ⇒ true
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
// verify that the second node is shut down and has status REMOVED
|
// verify that the second node is shut down and has status REMOVED
|
||||||
awaitCond(!cluster.isRunning, reaperWaitingTime)
|
awaitCond(cluster.isTerminated, reaperWaitingTime)
|
||||||
awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime)
|
awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -581,7 +581,6 @@ Run it without parameters to see instructions about how to use the script::
|
||||||
is-singleton - Checks if the cluster is a singleton cluster (single
|
is-singleton - Checks if the cluster is a singleton cluster (single
|
||||||
node cluster)
|
node cluster)
|
||||||
is-available - Checks if the member node is available
|
is-available - Checks if the member node is available
|
||||||
is-running - Checks if the member node is running
|
|
||||||
Where the <node-url> should be on the format of
|
Where the <node-url> should be on the format of
|
||||||
'akka://actor-system-name@hostname:port'
|
'akka://actor-system-name@hostname:port'
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -587,7 +587,6 @@ Run it without parameters to see instructions about how to use the script::
|
||||||
is-singleton - Checks if the cluster is a singleton cluster (single
|
is-singleton - Checks if the cluster is a singleton cluster (single
|
||||||
node cluster)
|
node cluster)
|
||||||
is-available - Checks if the member node is available
|
is-available - Checks if the member node is available
|
||||||
is-running - Checks if the member node is running
|
|
||||||
Where the <node-url> should be on the format of
|
Where the <node-url> should be on the format of
|
||||||
'akka://actor-system-name@hostname:port'
|
'akka://actor-system-name@hostname:port'
|
||||||
|
|
||||||
|
|
|
||||||
14
akka-kernel/src/main/dist/bin/akka-cluster
vendored
14
akka-kernel/src/main/dist/bin/akka-cluster
vendored
|
|
@ -168,19 +168,6 @@ case "$2" in
|
||||||
$JMX_CLIENT $HOST akka:type=Cluster Available
|
$JMX_CLIENT $HOST akka:type=Cluster Available
|
||||||
;;
|
;;
|
||||||
|
|
||||||
is-running)
|
|
||||||
if [ $# -ne 2 ]; then
|
|
||||||
echo "Usage: $SELF <node-hostname:jmx-port> is-running"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
ensureNodeIsRunningAndAvailable
|
|
||||||
shift
|
|
||||||
|
|
||||||
echo "Checking if member node on $HOST is RUNNING"
|
|
||||||
$JMX_CLIENT $HOST akka:type=Cluster Running
|
|
||||||
;;
|
|
||||||
|
|
||||||
*)
|
*)
|
||||||
printf "Usage: bin/$SELF <node-hostname:jmx-port> <command> ...\n"
|
printf "Usage: bin/$SELF <node-hostname:jmx-port> <command> ...\n"
|
||||||
printf "\n"
|
printf "\n"
|
||||||
|
|
@ -195,7 +182,6 @@ case "$2" in
|
||||||
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
|
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
|
||||||
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
|
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
|
||||||
printf "%26s - %s\n" is-available "Checks if the member node is available"
|
printf "%26s - %s\n" is-available "Checks if the member node is available"
|
||||||
printf "%26s - %s\n" is-running "Checks if the member node is running"
|
|
||||||
printf "Where the <node-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
|
printf "Where the <node-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
|
||||||
printf "\n"
|
printf "\n"
|
||||||
printf "Examples: bin/$SELF localhost:9999 is-available\n"
|
printf "Examples: bin/$SELF localhost:9999 is-available\n"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue