Notify MembershipChangeListeners when 'members' change
This commit is contained in:
parent
fcd08ed2b9
commit
6a380550f9
3 changed files with 19 additions and 15 deletions
|
|
@ -589,10 +589,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
|
||||
else {
|
||||
if (node != selfAddress) failureDetector heartbeat node
|
||||
|
||||
if (convergence(newState.latestGossip).isDefined) {
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
||||
}
|
||||
notifyMembershipChangeListeners(localState, newState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -623,10 +620,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
}
|
||||
}
|
||||
|
||||
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit =
|
||||
if (newState.latestGossip != oldState.latestGossip && convergence(newState.latestGossip).isDefined) {
|
||||
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
|
||||
val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status))
|
||||
val newMembersStatus = newState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status))
|
||||
if (newMembersStatus != oldMembersStatus)
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* State transition to EXITING.
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
|
|||
awaitClusterUp(first)
|
||||
|
||||
runOn(first, second) {
|
||||
cluster.join(firstAddress)
|
||||
val latch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
|
|
@ -45,8 +44,13 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
|
|||
latch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("listener-1-registered")
|
||||
cluster.join(firstAddress)
|
||||
latch.await
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
testConductor.enter("listener-1-registered")
|
||||
}
|
||||
|
||||
testConductor.enter("after-1")
|
||||
|
|
@ -54,10 +58,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
|
|||
|
||||
"(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(third) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
val latch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
|
|
@ -65,8 +65,13 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
|
|||
latch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("listener-2-registered")
|
||||
|
||||
runOn(third) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
latch.await
|
||||
cluster.convergence.isDefined must be(true)
|
||||
|
||||
testConductor.enter("after-2")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ abstract class SunnyWeatherSpec
|
|||
}
|
||||
|
||||
// add a few more
|
||||
awaitClusterUp(first, second, third, fourth, fifth)
|
||||
awaitClusterUp(roles: _*)
|
||||
log.info("5 joined")
|
||||
|
||||
val unexpected = new AtomicReference[SortedSet[Member]]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue