diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 78bd91c8e1..a26befb875 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -500,7 +500,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. */ - def shutdown() { + def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress) gossipCanceller.cancel() @@ -519,7 +519,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Registers a listener to subscribe to cluster membership changes. */ @tailrec - final def registerListener(listener: MembershipChangeListener) { + final def registerListener(listener: MembershipChangeListener): Unit = { val localState = state.get val newListeners = localState.memberMembershipChangeListeners + listener val newState = localState copy (memberMembershipChangeListeners = newListeners) @@ -530,7 +530,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Unsubscribes to cluster membership changes. */ @tailrec - final def unregisterListener(listener: MembershipChangeListener) { + final def unregisterListener(listener: MembershipChangeListener): Unit = { val localState = state.get val newListeners = localState.memberMembershipChangeListeners - listener val newState = localState copy (memberMembershipChangeListeners = newListeners) @@ -541,7 +541,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. */ - def join(address: Address) { + def join(address: Address): Unit = { val connection = clusterCommandConnectionFor(address) val command = ClusterAction.Join(remoteAddress) log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", remoteAddress, address, connection) @@ -551,21 +551,21 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Send command to issue state transition to LEAVING for the node specified by 'address'. */ - def leave(address: Address) { + def leave(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Leave(address) } /** * Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'. */ - def down(address: Address) { + def down(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Down(address) } /** * Send command to issue state transition to REMOVED for the node specified by 'address'. */ - def remove(address: Address) { + def remove(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Remove(address) } @@ -578,7 +578,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * New node joining. */ @tailrec - private[cluster] final def joining(node: Address) { + private[cluster] final def joining(node: Address): Unit = { log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node) val localState = state.get @@ -611,28 +611,28 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * State transition to UP. */ - private[cluster] final def up(address: Address) { + private[cluster] final def up(address: Address): Unit = { log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address) } /** * State transition to LEAVING. */ - private[cluster] final def leaving(address: Address) { + private[cluster] final def leaving(address: Address): Unit = { log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address) } /** * State transition to EXITING. */ - private[cluster] final def exiting(address: Address) { + private[cluster] final def exiting(address: Address): Unit = { log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address) } /** * State transition to REMOVED. */ - private[cluster] final def removing(address: Address) { + private[cluster] final def removing(address: Address): Unit = { log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address) } @@ -644,7 +644,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * to this node and it will then go through the normal JOINING procedure. */ @tailrec - final private[cluster] def downing(address: Address) { + final private[cluster] def downing(address: Address): Unit = { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -705,7 +705,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Receive new gossip. */ @tailrec - final private[cluster] def receive(sender: Member, remoteGossip: Gossip) { + final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = { val localState = state.get val localGossip = localState.latestGossip @@ -746,14 +746,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } /** - * Joins the pre-configured contact point and retrieves current gossip state. + * Joins the pre-configured contact point. */ - private def autoJoin() = nodeToJoin foreach { address ⇒ - val connection = clusterCommandConnectionFor(address) - val command = ClusterAction.Join(remoteAddress) - log.info("Cluster Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection) - connection ! command - } + private def autoJoin(): Unit = nodeToJoin foreach join /** * Switches the member status. @@ -793,7 +788,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Gossips latest gossip to an address. */ - private def gossipTo(address: Address) { + private def gossipTo(address: Address): Unit = { val connection = clusterGossipConnectionFor(address) log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection) connection ! GossipEnvelope(self, latestGossip) @@ -818,10 +813,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Initates a new round of gossip. */ - private def gossip() { + private def gossip(): Unit = { val localState = state.get - val localGossip = localState.latestGossip - val localMembers = localGossip.members if (!isSingletonCluster(localState) && isAvailable(localState)) { // only gossip if we are a non-singleton cluster and available @@ -860,7 +853,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ @tailrec - final private def reapUnreachableMembers() { + final private def reapUnreachableMembers(): Unit = { val localState = state.get if (!isSingletonCluster(localState) && isAvailable(localState)) { @@ -905,7 +898,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. */ @tailrec - final private def leaderActions() { + final private def leaderActions(): Unit = { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -917,7 +910,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val localOverview = localGossip.overview val localSeen = localOverview.seen - val localUnreachableMembers = localGossip.overview.unreachable + val localUnreachableMembers = localOverview.unreachable // Leader actions are as follows: // 1. Move JOINING => UP @@ -986,7 +979,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) leaderActions() // recur else { if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners map { _ notify newGossip.members } + newState.memberMembershipChangeListeners foreach { _ notify newGossip.members } } } }