Boy scouting
This commit is contained in:
parent
5bc4391e04
commit
dfcdbc5221
1 changed files with 22 additions and 29 deletions
|
|
@ -500,7 +500,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
/**
|
/**
|
||||||
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
|
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
|
||||||
*/
|
*/
|
||||||
def shutdown() {
|
def shutdown(): Unit = {
|
||||||
if (isRunning.compareAndSet(true, false)) {
|
if (isRunning.compareAndSet(true, false)) {
|
||||||
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress)
|
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress)
|
||||||
gossipCanceller.cancel()
|
gossipCanceller.cancel()
|
||||||
|
|
@ -519,7 +519,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* Registers a listener to subscribe to cluster membership changes.
|
* Registers a listener to subscribe to cluster membership changes.
|
||||||
*/
|
*/
|
||||||
@tailrec
|
@tailrec
|
||||||
final def registerListener(listener: MembershipChangeListener) {
|
final def registerListener(listener: MembershipChangeListener): Unit = {
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
val newListeners = localState.memberMembershipChangeListeners + listener
|
val newListeners = localState.memberMembershipChangeListeners + listener
|
||||||
val newState = localState copy (memberMembershipChangeListeners = newListeners)
|
val newState = localState copy (memberMembershipChangeListeners = newListeners)
|
||||||
|
|
@ -530,7 +530,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* Unsubscribes to cluster membership changes.
|
* Unsubscribes to cluster membership changes.
|
||||||
*/
|
*/
|
||||||
@tailrec
|
@tailrec
|
||||||
final def unregisterListener(listener: MembershipChangeListener) {
|
final def unregisterListener(listener: MembershipChangeListener): Unit = {
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
val newListeners = localState.memberMembershipChangeListeners - listener
|
val newListeners = localState.memberMembershipChangeListeners - listener
|
||||||
val newState = localState copy (memberMembershipChangeListeners = newListeners)
|
val newState = localState copy (memberMembershipChangeListeners = newListeners)
|
||||||
|
|
@ -541,7 +541,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* Try to join this cluster node with the node specified by 'address'.
|
* Try to join this cluster node with the node specified by 'address'.
|
||||||
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
||||||
*/
|
*/
|
||||||
def join(address: Address) {
|
def join(address: Address): Unit = {
|
||||||
val connection = clusterCommandConnectionFor(address)
|
val connection = clusterCommandConnectionFor(address)
|
||||||
val command = ClusterAction.Join(remoteAddress)
|
val command = ClusterAction.Join(remoteAddress)
|
||||||
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", remoteAddress, address, connection)
|
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", remoteAddress, address, connection)
|
||||||
|
|
@ -551,21 +551,21 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
/**
|
/**
|
||||||
* Send command to issue state transition to LEAVING for the node specified by 'address'.
|
* Send command to issue state transition to LEAVING for the node specified by 'address'.
|
||||||
*/
|
*/
|
||||||
def leave(address: Address) {
|
def leave(address: Address): Unit = {
|
||||||
clusterCommandDaemon ! ClusterAction.Leave(address)
|
clusterCommandDaemon ! ClusterAction.Leave(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
|
* Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
|
||||||
*/
|
*/
|
||||||
def down(address: Address) {
|
def down(address: Address): Unit = {
|
||||||
clusterCommandDaemon ! ClusterAction.Down(address)
|
clusterCommandDaemon ! ClusterAction.Down(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send command to issue state transition to REMOVED for the node specified by 'address'.
|
* Send command to issue state transition to REMOVED for the node specified by 'address'.
|
||||||
*/
|
*/
|
||||||
def remove(address: Address) {
|
def remove(address: Address): Unit = {
|
||||||
clusterCommandDaemon ! ClusterAction.Remove(address)
|
clusterCommandDaemon ! ClusterAction.Remove(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -578,7 +578,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* New node joining.
|
* New node joining.
|
||||||
*/
|
*/
|
||||||
@tailrec
|
@tailrec
|
||||||
private[cluster] final def joining(node: Address) {
|
private[cluster] final def joining(node: Address): Unit = {
|
||||||
log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node)
|
log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node)
|
||||||
|
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
|
|
@ -611,28 +611,28 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
/**
|
/**
|
||||||
* State transition to UP.
|
* State transition to UP.
|
||||||
*/
|
*/
|
||||||
private[cluster] final def up(address: Address) {
|
private[cluster] final def up(address: Address): Unit = {
|
||||||
log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address)
|
log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State transition to LEAVING.
|
* State transition to LEAVING.
|
||||||
*/
|
*/
|
||||||
private[cluster] final def leaving(address: Address) {
|
private[cluster] final def leaving(address: Address): Unit = {
|
||||||
log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
|
log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State transition to EXITING.
|
* State transition to EXITING.
|
||||||
*/
|
*/
|
||||||
private[cluster] final def exiting(address: Address) {
|
private[cluster] final def exiting(address: Address): Unit = {
|
||||||
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
|
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State transition to REMOVED.
|
* State transition to REMOVED.
|
||||||
*/
|
*/
|
||||||
private[cluster] final def removing(address: Address) {
|
private[cluster] final def removing(address: Address): Unit = {
|
||||||
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
|
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -644,7 +644,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* to this node and it will then go through the normal JOINING procedure.
|
* to this node and it will then go through the normal JOINING procedure.
|
||||||
*/
|
*/
|
||||||
@tailrec
|
@tailrec
|
||||||
final private[cluster] def downing(address: Address) {
|
final private[cluster] def downing(address: Address): Unit = {
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
val localGossip = localState.latestGossip
|
val localGossip = localState.latestGossip
|
||||||
val localMembers = localGossip.members
|
val localMembers = localGossip.members
|
||||||
|
|
@ -705,7 +705,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* Receive new gossip.
|
* Receive new gossip.
|
||||||
*/
|
*/
|
||||||
@tailrec
|
@tailrec
|
||||||
final private[cluster] def receive(sender: Member, remoteGossip: Gossip) {
|
final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = {
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
val localGossip = localState.latestGossip
|
val localGossip = localState.latestGossip
|
||||||
|
|
||||||
|
|
@ -746,14 +746,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Joins the pre-configured contact point and retrieves current gossip state.
|
* Joins the pre-configured contact point.
|
||||||
*/
|
*/
|
||||||
private def autoJoin() = nodeToJoin foreach { address ⇒
|
private def autoJoin(): Unit = nodeToJoin foreach join
|
||||||
val connection = clusterCommandConnectionFor(address)
|
|
||||||
val command = ClusterAction.Join(remoteAddress)
|
|
||||||
log.info("Cluster Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
|
|
||||||
connection ! command
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Switches the member status.
|
* Switches the member status.
|
||||||
|
|
@ -793,7 +788,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
/**
|
/**
|
||||||
* Gossips latest gossip to an address.
|
* Gossips latest gossip to an address.
|
||||||
*/
|
*/
|
||||||
private def gossipTo(address: Address) {
|
private def gossipTo(address: Address): Unit = {
|
||||||
val connection = clusterGossipConnectionFor(address)
|
val connection = clusterGossipConnectionFor(address)
|
||||||
log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection)
|
log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection)
|
||||||
connection ! GossipEnvelope(self, latestGossip)
|
connection ! GossipEnvelope(self, latestGossip)
|
||||||
|
|
@ -818,10 +813,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
/**
|
/**
|
||||||
* Initates a new round of gossip.
|
* Initates a new round of gossip.
|
||||||
*/
|
*/
|
||||||
private def gossip() {
|
private def gossip(): Unit = {
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
val localGossip = localState.latestGossip
|
|
||||||
val localMembers = localGossip.members
|
|
||||||
|
|
||||||
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
||||||
// only gossip if we are a non-singleton cluster and available
|
// only gossip if we are a non-singleton cluster and available
|
||||||
|
|
@ -860,7 +853,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
|
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
|
||||||
*/
|
*/
|
||||||
@tailrec
|
@tailrec
|
||||||
final private def reapUnreachableMembers() {
|
final private def reapUnreachableMembers(): Unit = {
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
|
|
||||||
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
||||||
|
|
@ -905,7 +898,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
|
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
|
||||||
*/
|
*/
|
||||||
@tailrec
|
@tailrec
|
||||||
final private def leaderActions() {
|
final private def leaderActions(): Unit = {
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
val localGossip = localState.latestGossip
|
val localGossip = localState.latestGossip
|
||||||
val localMembers = localGossip.members
|
val localMembers = localGossip.members
|
||||||
|
|
@ -917,7 +910,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
|
|
||||||
val localOverview = localGossip.overview
|
val localOverview = localGossip.overview
|
||||||
val localSeen = localOverview.seen
|
val localSeen = localOverview.seen
|
||||||
val localUnreachableMembers = localGossip.overview.unreachable
|
val localUnreachableMembers = localOverview.unreachable
|
||||||
|
|
||||||
// Leader actions are as follows:
|
// Leader actions are as follows:
|
||||||
// 1. Move JOINING => UP
|
// 1. Move JOINING => UP
|
||||||
|
|
@ -986,7 +979,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
||||||
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
|
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
|
||||||
else {
|
else {
|
||||||
if (convergence(newState.latestGossip).isDefined) {
|
if (convergence(newState.latestGossip).isDefined) {
|
||||||
newState.memberMembershipChangeListeners map { _ notify newGossip.members }
|
newState.memberMembershipChangeListeners foreach { _ notify newGossip.members }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue