Merge branch 'master' of github.com:akka/akka

This commit is contained in:
Viktor Klang 2012-05-31 22:00:32 +02:00
commit 3b06c0a7c2
5 changed files with 152 additions and 106 deletions

View file

@ -203,7 +203,7 @@ case class Gossip(
}
/**
* Marks the gossip as seen by this node (remoteAddress) by updating the address entry in the 'gossip.overview.seen'
* Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock for the new gossip.
*/
def seen(address: Address): Gossip = {
@ -380,11 +380,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val remoteSettings = new RemoteSettings(system.settings.config, system.name)
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
val remoteAddress = remote.transport.address
val selfAddress = remote.transport.address
val failureDetector = new AccrualFailureDetector(
system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val vclockNode = VectorClock.Node(remoteAddress.toString)
private val vclockNode = VectorClock.Node(selfAddress.toString)
private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay
private val gossipFrequency = clusterSettings.GossipFrequency
@ -396,7 +396,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private val autoDown = clusterSettings.AutoDown
private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes
private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons
private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress)
private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != selfAddress)
private val serialization = remote.serialization
@ -406,7 +406,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
log.info("Cluster Node [{}] - is starting up...", remoteAddress)
log.info("Cluster Node [{}] - is starting up...", selfAddress)
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
@ -418,7 +418,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
}
private val state = {
val member = Member(remoteAddress, MemberStatus.Joining)
val member = Member(selfAddress, MemberStatus.Joining)
val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
new AtomicReference[State](State(gossip))
}
@ -447,15 +447,15 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
createMBean()
log.info("Cluster Node [{}] - has started up successfully", remoteAddress)
log.info("Cluster Node [{}] - has started up successfully", selfAddress)
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
def self: Member = latestGossip.members
.find(_.address == remoteAddress)
.getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + remoteAddress + ") in the cluster membership ring"))
.find(_.address == selfAddress)
.getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + selfAddress + ") in the cluster membership ring"))
/**
* Latest gossip.
@ -472,7 +472,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
*/
def isLeader: Boolean = {
val members = latestGossip.members
!members.isEmpty && (remoteAddress == members.head.address)
members.nonEmpty && (selfAddress == members.head.address)
}
/**
@ -500,9 +500,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*/
def shutdown() {
def shutdown(): Unit = {
if (isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress)
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
gossipCanceller.cancel()
failureDetectorReaperCanceller.cancel()
leaderActionsCanceller.cancel()
@ -519,7 +519,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Registers a listener to subscribe to cluster membership changes.
*/
@tailrec
final def registerListener(listener: MembershipChangeListener) {
final def registerListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners + listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
@ -530,7 +530,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Unsubscribes to cluster membership changes.
*/
@tailrec
final def unregisterListener(listener: MembershipChangeListener) {
final def unregisterListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners - listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
@ -541,31 +541,31 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address) {
def join(address: Address): Unit = {
val connection = clusterCommandConnectionFor(address)
val command = ClusterAction.Join(remoteAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", remoteAddress, address, connection)
val command = ClusterAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
}
/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
*/
def leave(address: Address) {
def leave(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Leave(address)
}
/**
* Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
*/
def down(address: Address) {
def down(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Down(address)
}
/**
* Send command to issue state transition to REMOVED for the node specified by 'address'.
*/
def remove(address: Address) {
def remove(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Remove(address)
}
@ -578,8 +578,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* New node joining.
*/
@tailrec
private[cluster] final def joining(node: Address) {
log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node)
private[cluster] final def joining(node: Address): Unit = {
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
val localState = state.get
val localGossip = localState.latestGossip
@ -595,13 +595,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen remoteAddress
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
failureDetector heartbeat node // update heartbeat in failure detector
if (node != selfAddress) failureDetector heartbeat node
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
}
@ -611,29 +612,29 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* State transition to UP.
*/
private[cluster] final def up(address: Address) {
log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address)
private[cluster] final def up(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as UP", selfAddress, address)
}
/**
* State transition to LEAVING.
*/
private[cluster] final def leaving(address: Address) {
log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
private[cluster] final def leaving(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", selfAddress, address)
}
/**
* State transition to EXITING.
*/
private[cluster] final def exiting(address: Address) {
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
private[cluster] final def exiting(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address)
}
/**
* State transition to REMOVED.
*/
private[cluster] final def removing(address: Address) {
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
private[cluster] final def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address)
}
/**
@ -644,7 +645,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* to this node and it will then go through the normal JOINING procedure.
*/
@tailrec
final private[cluster] def downing(address: Address) {
final private[cluster] def downing(address: Address): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
@ -658,7 +659,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
localMembers
.map { member
if (member.address == address) {
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address)
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, member.address)
val newMember = member copy (status = MemberStatus.Down)
downedMember = Some(newMember)
newMember
@ -672,7 +673,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
.map { member
if (member.address == address) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address)
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
member copy (status = MemberStatus.Down)
} else member
}
@ -691,7 +692,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip + vclockNode
val newState = localState copy (latestGossip = versionedGossip seen remoteAddress)
val newState = localState copy (latestGossip = versionedGossip seen selfAddress)
if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update
else {
@ -705,7 +706,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Receive new gossip.
*/
@tailrec
final private[cluster] def receive(sender: Member, remoteGossip: Gossip) {
final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
@ -730,14 +731,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
remoteGossip
}
val newState = localState copy (latestGossip = winningGossip seen remoteAddress)
val newState = localState copy (latestGossip = winningGossip seen selfAddress)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
else {
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address)
failureDetector heartbeat sender.address // update heartbeat in failure detector
if (sender.address != selfAddress) failureDetector heartbeat sender.address
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
@ -746,14 +747,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
}
/**
* Joins the pre-configured contact point and retrieves current gossip state.
* Joins the pre-configured contact point.
*/
private def autoJoin() = nodeToJoin foreach { address
val connection = clusterCommandConnectionFor(address)
val command = ClusterAction.Join(remoteAddress)
log.info("Cluster Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
connection ! command
}
private def autoJoin(): Unit = nodeToJoin foreach join
/**
* Switches the member status.
@ -763,7 +759,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* @return the updated new state with the new member status
*/
private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = {
log.info("Cluster Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus)
log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus)
val localSelf = self
@ -775,7 +771,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// change my state in 'gossip.members'
val newMembersSet = localMembers map { member
if (member.address == remoteAddress) newSelf
if (member.address == selfAddress) newSelf
else member
}
@ -785,7 +781,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// version my changes
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen remoteAddress
val seenVersionedGossip = versionedGossip seen selfAddress
state copy (latestGossip = seenVersionedGossip)
}
@ -793,9 +789,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Gossips latest gossip to an address.
*/
private def gossipTo(address: Address) {
private def gossipTo(address: Address): Unit = {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection)
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
connection ! GossipEnvelope(self, latestGossip)
}
@ -805,10 +801,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* @return 'true' if it gossiped to a "deputy" member.
*/
private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = {
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", remoteAddress, addresses.mkString(", "))
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", "))
if (addresses.isEmpty) false
else {
val peers = addresses filter (_ != remoteAddress) // filter out myself
val peers = addresses filter (_ != selfAddress) // filter out myself
val peer = selectRandomNode(peers)
gossipTo(peer)
deputyNodes exists (peer == _)
@ -818,15 +814,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Initates a new round of gossip.
*/
private def gossip() {
private def gossip(): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
if (!isSingletonCluster(localState) && isAvailable(localState)) {
// only gossip if we are a non-singleton cluster and available
if (isSingletonCluster(localState)) {
// gossip to myself
// TODO could perhaps be optimized, no need to gossip to myself when Up?
gossipTo(selfAddress)
log.debug("Cluster Node [{}] - Initiating new round of gossip", remoteAddress)
} else if (isAvailable(localState)) {
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
val localGossip = localState.latestGossip
val localMembers = localGossip.members
@ -846,7 +843,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes
if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) {
if ((!gossipedToDeputy || localMembersSize < 1) && deputies.nonEmpty) {
if (localMembersSize == 0) gossipToRandomNodeOf(deputies)
else {
val probability = 1.0 / localMembersSize + localUnreachableSize
@ -860,7 +857,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
@tailrec
final private def reapUnreachableMembers() {
final private def reapUnreachableMembers(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState) && isAvailable(localState)) {
@ -874,7 +871,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val newlyDetectedUnreachableMembers = localMembers filterNot { member failureDetector.isAvailable(member.address) }
if (!newlyDetectedUnreachableMembers.isEmpty) { // we have newly detected members marked as unavailable
if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable
val newMembers = localMembers diff newlyDetectedUnreachableMembers
val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers
@ -884,14 +881,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// updating vclock and 'seen' table
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen remoteAddress
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur
else {
log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", "))
log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
@ -905,19 +902,19 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
*/
@tailrec
final private def leaderActions() {
final private def leaderActions(): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val isLeader = !localMembers.isEmpty && (remoteAddress == localMembers.head.address)
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
if (isLeader && isAvailable(localState)) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localGossip.overview.unreachable
val localUnreachableMembers = localOverview.unreachable
// Leader actions are as follows:
// 1. Move JOINING => UP
@ -936,14 +933,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
localMembers map { member
// 1. Move JOINING => UP
if (member.status == MemberStatus.Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address)
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Up)
} else member
} map { member
// 2. Move EXITING => REMOVED
if (member.status == MemberStatus.Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address)
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Removed)
} else member
@ -959,7 +956,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
localUnreachableMembers
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
.map { member
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address)
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Down)
}
@ -978,7 +975,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val versionedGossip = newGossip + vclockNode
// 5. Updating the 'seen' table
val seenVersionedGossip = versionedGossip seen remoteAddress
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
@ -986,7 +983,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners map { _ notify newGossip.members }
newState.memberMembershipChangeListeners foreach { _ notify newGossip.members }
}
}
}
@ -1013,7 +1010,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val views = Set.empty[VectorClock] ++ seen.values
if (views.size == 1) {
log.debug("Cluster Node [{}] - Cluster convergence reached", remoteAddress)
log.debug("Cluster Node [{}] - Cluster convergence reached", selfAddress)
Some(gossip)
} else None
} else None
@ -1026,7 +1023,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val localOverview = localGossip.overview
val localMembers = localGossip.members
val localUnreachableMembers = localOverview.unreachable
val isUnreachable = localUnreachableMembers exists { _.address == remoteAddress }
val isUnreachable = localUnreachableMembers exists { _.address == selfAddress }
val hasUnavailableMemberStatus = localMembers exists { m (m == self) && MemberStatus.isUnavailable(m.status) }
isUnreachable || hasUnavailableMemberStatus
}
@ -1034,7 +1031,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Looks up and returns the local cluster command connection.
*/
private def clusterCommandDaemon = system.actorFor(RootActorPath(remoteAddress) / "system" / "cluster" / "commands")
private def clusterCommandDaemon = system.actorFor(RootActorPath(selfAddress) / "system" / "cluster" / "commands")
/**
* Looks up and returns the remote cluster command connection for the specific address.
@ -1049,7 +1046,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
*/
private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)
private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != selfAddress)
private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(ThreadLocalRandom.current nextInt addresses.size)
@ -1078,8 +1075,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val unreachable = gossip.overview.unreachable
val metaData = gossip.meta
"\nMembers:\n\t" + gossip.members.mkString("\n\t") +
{ if (!unreachable.isEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } +
{ if (!metaData.isEmpty) "\nMeta Data:\t" + metaData.toString else "" }
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } +
{ if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" }
}
def getMemberStatus: String = clusterNode.status.toString
@ -1104,7 +1101,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
def shutdown() = clusterNode.shutdown()
}
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName)
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName)
try {
mBeanServer.registerMBean(mbean, clusterMBeanName)
} catch {

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object NodeShutdownMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
auto-down = on
failure-detector.threshold = 4
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class NodeShutdownMultiJvmNode1 extends NodeShutdownSpec
class NodeShutdownMultiJvmNode2 extends NodeShutdownSpec
abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
import NodeShutdownMultiJvmSpec._
override def initialParticipants = 2
after {
testConductor.enter("after")
}
"A cluster of 2 nodes" must {
"not be singleton cluster when joined" taggedAs LongRunningTest in {
// make sure that the node-to-join is started before other join
runOn(first) {
cluster.self
}
testConductor.enter("first-started")
runOn(second) {
cluster.join(node(first).address)
}
awaitUpConvergence(numberOfMembers = 2)
cluster.isSingletonCluster must be(false)
}
"become singleton cluster when one node is shutdown" in {
runOn(first) {
val secondAddress = node(second).address
testConductor.shutdown(first, 0)
testConductor.removeNode(first)
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)
cluster.isSingletonCluster must be(true)
cluster.isLeader must be(true)
}
}
}
}

View file

@ -37,19 +37,8 @@ abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) wi
"be a singleton cluster when started up" taggedAs LongRunningTest in {
runOn(first) {
awaitCond(cluster.isSingletonCluster)
// FIXME #2117 singletonCluster should reach convergence
//awaitCond(cluster.convergence.isDefined)
}
}
"be in 'Joining' phase when started up" taggedAs LongRunningTest in {
runOn(first) {
val members = cluster.latestGossip.members
members.size must be(1)
val joiningMember = members find (_.address == firstAddress)
joiningMember must not be (None)
joiningMember.get.status must be(MemberStatus.Joining)
awaitUpConvergence(numberOfMembers = 1)
cluster.isLeader must be(true)
}
}
}

View file

@ -81,16 +81,6 @@ can later explicitly send a ``Join`` message to another node to form a N-node
cluster. It is also possible to link multiple N-node clusters by ``joining`` them.
Singleton Cluster
-----------------
If a node does not have a preconfigured contact point to join in the Akka
configuration, then it is considered a singleton cluster (single node cluster)
and will automatically transition from ``joining`` to ``up``. Singleton clusters
can later explicitly send a ``Join`` message to another node to form a N-node
cluster. It is also possible to link multiple N-node clusters by ``joining`` them.
Gossip
------

View file

@ -44,7 +44,7 @@ abstract class MultiNodeConfig {
/**
* Include for verbose debug logging
* @param on when `true` debug Config is returned, otherwise empty Config
* @param on when `true` debug Config is returned, otherwise config with info logging
*/
def debugConfig(on: Boolean): Config =
if (on)
@ -59,7 +59,8 @@ abstract class MultiNodeConfig {
fsm = on
}
""")
else ConfigFactory.empty
else
ConfigFactory.parseString("akka.loglevel = INFO")
/**
* Construct a RoleName and return it, to be used as an identifier in the