Merge pull request #522 from akka/wip-2184-join-again-patriknw

Node that joins again should be ignored, see #2184
This commit is contained in:
patriknw 2012-06-07 08:14:28 -07:00
commit bd979a8845
2 changed files with 45 additions and 16 deletions

View file

@ -571,27 +571,28 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val localState = state.get val localState = state.get
val localGossip = localState.latestGossip val localGossip = localState.latestGossip
val localMembers = localGossip.members val localMembers = localGossip.members
val localOverview = localGossip.overview
val localUnreachableMembers = localOverview.unreachable
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster if (!localMembers.exists(_.address == node)) {
val newUnreachableMembers = localUnreachableMembers filterNot { _.address == node }
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newGossip = localGossip copy (overview = newOverview, members = newMembers) val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
val versionedGossip = newGossip + vclockNode val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
val seenVersionedGossip = versionedGossip seen selfAddress val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val newState = localState copy (latestGossip = seenVersionedGossip) val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update val newState = localState copy (latestGossip = seenVersionedGossip)
else {
if (node != selfAddress) failureDetector heartbeat node
if (convergence(newState.latestGossip).isDefined) { if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
newState.memberMembershipChangeListeners foreach { _ notify newMembers } else {
if (node != selfAddress) failureDetector heartbeat node
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
}
} }
} }
} }

View file

@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import scala.collection.immutable.SortedSet
import java.util.concurrent.atomic.AtomicReference
object NodeUpMultiJvmSpec extends MultiNodeConfig { object NodeUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -33,7 +35,33 @@ abstract class NodeUpSpec
awaitClusterUp(first, second) awaitClusterUp(first, second)
testConductor.enter("after") testConductor.enter("after-1")
}
"be unaffected when joining again" taggedAs LongRunningTest in {
val unexpected = new AtomicReference[SortedSet[Member]]
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size != 2 || members.exists(_.status != MemberStatus.Up))
unexpected.set(members)
}
})
testConductor.enter("listener-registered")
runOn(second) {
cluster.join(node(first).address)
}
testConductor.enter("joined-again")
// let it run for a while to make sure that nothing bad happens
for (n 1 to 20) {
100.millis.dilated.sleep()
unexpected.get must be(null)
cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true)
}
testConductor.enter("after-2")
} }
} }
} }