diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 78bd91c8e1..98d0a3f11e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -203,7 +203,7 @@ case class Gossip( } /** - * Marks the gossip as seen by this node (remoteAddress) by updating the address entry in the 'gossip.overview.seen' + * Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen' * Map with the VectorClock for the new gossip. */ def seen(address: Address): Gossip = { @@ -380,11 +380,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val remoteSettings = new RemoteSettings(system.settings.config, system.name) val clusterSettings = new ClusterSettings(system.settings.config, system.name) - val remoteAddress = remote.transport.address + val selfAddress = remote.transport.address val failureDetector = new AccrualFailureDetector( - system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) - private val vclockNode = VectorClock.Node(remoteAddress.toString) + private val vclockNode = VectorClock.Node(selfAddress.toString) private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay private val gossipFrequency = clusterSettings.GossipFrequency @@ -396,7 +396,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val autoDown = clusterSettings.AutoDown private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons - private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) + private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != selfAddress) private val serialization = remote.serialization @@ -406,7 +406,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val mBeanServer = ManagementFactory.getPlatformMBeanServer private val clusterMBeanName = new ObjectName("akka:type=Cluster") - log.info("Cluster Node [{}] - is starting up...", remoteAddress) + log.info("Cluster Node [{}] - is starting up...", selfAddress) // create superisor for daemons under path "/system/cluster" private val clusterDaemons = { @@ -418,7 +418,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } private val state = { - val member = Member(remoteAddress, MemberStatus.Joining) + val member = Member(selfAddress, MemberStatus.Joining) val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock new AtomicReference[State](State(gossip)) } @@ -447,15 +447,15 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ createMBean() - log.info("Cluster Node [{}] - has started up successfully", remoteAddress) + log.info("Cluster Node [{}] - has started up successfully", selfAddress) // ====================================================== // ===================== PUBLIC API ===================== // ====================================================== def self: Member = latestGossip.members - .find(_.address == remoteAddress) - .getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + remoteAddress + ") in the cluster membership ring")) + .find(_.address == selfAddress) + .getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + selfAddress + ") in the cluster membership ring")) /** * Latest gossip. @@ -472,7 +472,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ */ def isLeader: Boolean = { val members = latestGossip.members - !members.isEmpty && (remoteAddress == members.head.address) + members.nonEmpty && (selfAddress == members.head.address) } /** @@ -500,9 +500,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. */ - def shutdown() { + def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { - log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress) + log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) gossipCanceller.cancel() failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() @@ -519,7 +519,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Registers a listener to subscribe to cluster membership changes. */ @tailrec - final def registerListener(listener: MembershipChangeListener) { + final def registerListener(listener: MembershipChangeListener): Unit = { val localState = state.get val newListeners = localState.memberMembershipChangeListeners + listener val newState = localState copy (memberMembershipChangeListeners = newListeners) @@ -530,7 +530,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Unsubscribes to cluster membership changes. */ @tailrec - final def unregisterListener(listener: MembershipChangeListener) { + final def unregisterListener(listener: MembershipChangeListener): Unit = { val localState = state.get val newListeners = localState.memberMembershipChangeListeners - listener val newState = localState copy (memberMembershipChangeListeners = newListeners) @@ -541,31 +541,31 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. */ - def join(address: Address) { + def join(address: Address): Unit = { val connection = clusterCommandConnectionFor(address) - val command = ClusterAction.Join(remoteAddress) - log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", remoteAddress, address, connection) + val command = ClusterAction.Join(selfAddress) + log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection) connection ! command } /** * Send command to issue state transition to LEAVING for the node specified by 'address'. */ - def leave(address: Address) { + def leave(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Leave(address) } /** * Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'. */ - def down(address: Address) { + def down(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Down(address) } /** * Send command to issue state transition to REMOVED for the node specified by 'address'. */ - def remove(address: Address) { + def remove(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Remove(address) } @@ -578,8 +578,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * New node joining. */ @tailrec - private[cluster] final def joining(node: Address) { - log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node) + private[cluster] final def joining(node: Address): Unit = { + log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) val localState = state.get val localGossip = localState.latestGossip @@ -595,13 +595,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen remoteAddress + val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { - failureDetector heartbeat node // update heartbeat in failure detector + if (node != selfAddress) failureDetector heartbeat node + if (convergence(newState.latestGossip).isDefined) { newState.memberMembershipChangeListeners foreach { _ notify newMembers } } @@ -611,29 +612,29 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * State transition to UP. */ - private[cluster] final def up(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address) + private[cluster] final def up(address: Address): Unit = { + log.info("Cluster Node [{}] - Marking node [{}] as UP", selfAddress, address) } /** * State transition to LEAVING. */ - private[cluster] final def leaving(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address) + private[cluster] final def leaving(address: Address): Unit = { + log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", selfAddress, address) } /** * State transition to EXITING. */ - private[cluster] final def exiting(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address) + private[cluster] final def exiting(address: Address): Unit = { + log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address) } /** * State transition to REMOVED. */ - private[cluster] final def removing(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address) + private[cluster] final def removing(address: Address): Unit = { + log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address) } /** @@ -644,7 +645,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * to this node and it will then go through the normal JOINING procedure. */ @tailrec - final private[cluster] def downing(address: Address) { + final private[cluster] def downing(address: Address): Unit = { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -658,7 +659,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ localMembers .map { member ⇒ if (member.address == address) { - log.info("Cluster Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address) + log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, member.address) val newMember = member copy (status = MemberStatus.Down) downedMember = Some(newMember) newMember @@ -672,7 +673,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN .map { member ⇒ if (member.address == address) { - log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address) + log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) member copy (status = MemberStatus.Down) } else member } @@ -691,7 +692,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip val versionedGossip = newGossip + vclockNode - val newState = localState copy (latestGossip = versionedGossip seen remoteAddress) + val newState = localState copy (latestGossip = versionedGossip seen selfAddress) if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update else { @@ -705,7 +706,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Receive new gossip. */ @tailrec - final private[cluster] def receive(sender: Member, remoteGossip: Gossip) { + final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = { val localState = state.get val localGossip = localState.latestGossip @@ -730,14 +731,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ remoteGossip } - val newState = localState copy (latestGossip = winningGossip seen remoteAddress) + val newState = localState copy (latestGossip = winningGossip seen selfAddress) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address) + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address) - failureDetector heartbeat sender.address // update heartbeat in failure detector + if (sender.address != selfAddress) failureDetector heartbeat sender.address if (convergence(newState.latestGossip).isDefined) { newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } @@ -746,14 +747,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } /** - * Joins the pre-configured contact point and retrieves current gossip state. + * Joins the pre-configured contact point. */ - private def autoJoin() = nodeToJoin foreach { address ⇒ - val connection = clusterCommandConnectionFor(address) - val command = ClusterAction.Join(remoteAddress) - log.info("Cluster Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection) - connection ! command - } + private def autoJoin(): Unit = nodeToJoin foreach join /** * Switches the member status. @@ -763,7 +759,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * @return the updated new state with the new member status */ private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { - log.info("Cluster Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus) + log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus) val localSelf = self @@ -775,7 +771,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // change my state in 'gossip.members' val newMembersSet = localMembers map { member ⇒ - if (member.address == remoteAddress) newSelf + if (member.address == selfAddress) newSelf else member } @@ -785,7 +781,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // version my changes val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen remoteAddress + val seenVersionedGossip = versionedGossip seen selfAddress state copy (latestGossip = seenVersionedGossip) } @@ -793,9 +789,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Gossips latest gossip to an address. */ - private def gossipTo(address: Address) { + private def gossipTo(address: Address): Unit = { val connection = clusterGossipConnectionFor(address) - log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection) + log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection) connection ! GossipEnvelope(self, latestGossip) } @@ -805,10 +801,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * @return 'true' if it gossiped to a "deputy" member. */ private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = { - log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", remoteAddress, addresses.mkString(", ")) + log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", ")) if (addresses.isEmpty) false else { - val peers = addresses filter (_ != remoteAddress) // filter out myself + val peers = addresses filter (_ != selfAddress) // filter out myself val peer = selectRandomNode(peers) gossipTo(peer) deputyNodes exists (peer == _) @@ -818,15 +814,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Initates a new round of gossip. */ - private def gossip() { + private def gossip(): Unit = { val localState = state.get - val localGossip = localState.latestGossip - val localMembers = localGossip.members - if (!isSingletonCluster(localState) && isAvailable(localState)) { - // only gossip if we are a non-singleton cluster and available + if (isSingletonCluster(localState)) { + // gossip to myself + // TODO could perhaps be optimized, no need to gossip to myself when Up? + gossipTo(selfAddress) - log.debug("Cluster Node [{}] - Initiating new round of gossip", remoteAddress) + } else if (isAvailable(localState)) { + log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -846,7 +843,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // 3. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes - if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) { + if ((!gossipedToDeputy || localMembersSize < 1) && deputies.nonEmpty) { if (localMembersSize == 0) gossipToRandomNodeOf(deputies) else { val probability = 1.0 / localMembersSize + localUnreachableSize @@ -860,7 +857,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ @tailrec - final private def reapUnreachableMembers() { + final private def reapUnreachableMembers(): Unit = { val localState = state.get if (!isSingletonCluster(localState) && isAvailable(localState)) { @@ -874,7 +871,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector.isAvailable(member.address) } - if (!newlyDetectedUnreachableMembers.isEmpty) { // we have newly detected members marked as unavailable + if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable val newMembers = localMembers diff newlyDetectedUnreachableMembers val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers @@ -884,14 +881,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // updating vclock and 'seen' table val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen remoteAddress + val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur else { - log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", ")) + log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) if (convergence(newState.latestGossip).isDefined) { newState.memberMembershipChangeListeners foreach { _ notify newMembers } @@ -905,19 +902,19 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. */ @tailrec - final private def leaderActions() { + final private def leaderActions(): Unit = { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members - val isLeader = !localMembers.isEmpty && (remoteAddress == localMembers.head.address) + val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) if (isLeader && isAvailable(localState)) { // only run the leader actions if we are the LEADER and available val localOverview = localGossip.overview val localSeen = localOverview.seen - val localUnreachableMembers = localGossip.overview.unreachable + val localUnreachableMembers = localOverview.unreachable // Leader actions are as follows: // 1. Move JOINING => UP @@ -936,14 +933,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ localMembers map { member ⇒ // 1. Move JOINING => UP if (member.status == MemberStatus.Joining) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address) + log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Up) } else member } map { member ⇒ // 2. Move EXITING => REMOVED if (member.status == MemberStatus.Exiting) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address) + log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Removed) } else member @@ -959,7 +956,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ localUnreachableMembers .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN .map { member ⇒ - log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address) + log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Down) } @@ -978,7 +975,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val versionedGossip = newGossip + vclockNode // 5. Updating the 'seen' table - val seenVersionedGossip = versionedGossip seen remoteAddress + val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) @@ -986,7 +983,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) leaderActions() // recur else { if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners map { _ notify newGossip.members } + newState.memberMembershipChangeListeners foreach { _ notify newGossip.members } } } } @@ -1013,7 +1010,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val views = Set.empty[VectorClock] ++ seen.values if (views.size == 1) { - log.debug("Cluster Node [{}] - Cluster convergence reached", remoteAddress) + log.debug("Cluster Node [{}] - Cluster convergence reached", selfAddress) Some(gossip) } else None } else None @@ -1026,7 +1023,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val localOverview = localGossip.overview val localMembers = localGossip.members val localUnreachableMembers = localOverview.unreachable - val isUnreachable = localUnreachableMembers exists { _.address == remoteAddress } + val isUnreachable = localUnreachableMembers exists { _.address == selfAddress } val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && MemberStatus.isUnavailable(m.status) } isUnreachable || hasUnavailableMemberStatus } @@ -1034,7 +1031,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Looks up and returns the local cluster command connection. */ - private def clusterCommandDaemon = system.actorFor(RootActorPath(remoteAddress) / "system" / "cluster" / "commands") + private def clusterCommandDaemon = system.actorFor(RootActorPath(selfAddress) / "system" / "cluster" / "commands") /** * Looks up and returns the remote cluster command connection for the specific address. @@ -1049,7 +1046,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ - private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress) + private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != selfAddress) private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(ThreadLocalRandom.current nextInt addresses.size) @@ -1078,8 +1075,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val unreachable = gossip.overview.unreachable val metaData = gossip.meta "\nMembers:\n\t" + gossip.members.mkString("\n\t") + - { if (!unreachable.isEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + - { if (!metaData.isEmpty) "\nMeta Data:\t" + metaData.toString else "" } + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + + { if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" } } def getMemberStatus: String = clusterNode.status.toString @@ -1104,7 +1101,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ def shutdown() = clusterNode.shutdown() } - log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName) + log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName) try { mBeanServer.registerMBean(mbean, clusterMBeanName) } catch { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala new file mode 100644 index 0000000000..61a9c08ceb --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeShutdownMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-down = on + failure-detector.threshold = 4 + } + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class NodeShutdownMultiJvmNode1 extends NodeShutdownSpec +class NodeShutdownMultiJvmNode2 extends NodeShutdownSpec + +abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeShutdownMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + "A cluster of 2 nodes" must { + + "not be singleton cluster when joined" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second) { + cluster.join(node(first).address) + } + awaitUpConvergence(numberOfMembers = 2) + cluster.isSingletonCluster must be(false) + } + + "become singleton cluster when one node is shutdown" in { + runOn(first) { + val secondAddress = node(second).address + testConductor.shutdown(first, 0) + testConductor.removeNode(first) + awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) + cluster.isSingletonCluster must be(true) + cluster.isLeader must be(true) + } + + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index fcbcce746f..44682b81f7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -37,19 +37,8 @@ abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) wi "be a singleton cluster when started up" taggedAs LongRunningTest in { runOn(first) { awaitCond(cluster.isSingletonCluster) - // FIXME #2117 singletonCluster should reach convergence - //awaitCond(cluster.convergence.isDefined) - } - } - - "be in 'Joining' phase when started up" taggedAs LongRunningTest in { - runOn(first) { - val members = cluster.latestGossip.members - members.size must be(1) - - val joiningMember = members find (_.address == firstAddress) - joiningMember must not be (None) - joiningMember.get.status must be(MemberStatus.Joining) + awaitUpConvergence(numberOfMembers = 1) + cluster.isLeader must be(true) } } } diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 231830cecb..a0aca11114 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -81,16 +81,6 @@ can later explicitly send a ``Join`` message to another node to form a N-node cluster. It is also possible to link multiple N-node clusters by ``joining`` them. -Singleton Cluster ------------------ - -If a node does not have a preconfigured contact point to join in the Akka -configuration, then it is considered a singleton cluster (single node cluster) -and will automatically transition from ``joining`` to ``up``. Singleton clusters -can later explicitly send a ``Join`` message to another node to form a N-node -cluster. It is also possible to link multiple N-node clusters by ``joining`` them. - - Gossip ------ diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 1745d15b61..35a9cc14e7 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -44,7 +44,7 @@ abstract class MultiNodeConfig { /** * Include for verbose debug logging - * @param on when `true` debug Config is returned, otherwise empty Config + * @param on when `true` debug Config is returned, otherwise config with info logging */ def debugConfig(on: Boolean): Config = if (on) @@ -59,7 +59,8 @@ abstract class MultiNodeConfig { fsm = on } """) - else ConfigFactory.empty + else + ConfigFactory.parseString("akka.loglevel = INFO") /** * Construct a RoleName and return it, to be used as an identifier in the