diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index e2b5c8e751..4ea43d50e4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -589,10 +589,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { if (node != selfAddress) failureDetector heartbeat node - - if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners foreach { _ notify newMembers } - } + notifyMembershipChangeListeners(localState, newState) } } } @@ -623,10 +620,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } } - private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = - if (newState.latestGossip != oldState.latestGossip && convergence(newState.latestGossip).isDefined) { + private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = { + val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status)) + val newMembersStatus = newState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status)) + if (newMembersStatus != oldMembersStatus) newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } - } + } /** * State transition to EXITING. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index c87a280e17..9e190050f9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -37,7 +37,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan awaitClusterUp(first) runOn(first, second) { - cluster.join(firstAddress) val latch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { @@ -45,8 +44,13 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan latch.countDown() } }) + testConductor.enter("listener-1-registered") + cluster.join(firstAddress) latch.await - cluster.convergence.isDefined must be(true) + } + + runOn(third) { + testConductor.enter("listener-1-registered") } testConductor.enter("after-1") @@ -54,10 +58,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - runOn(third) { - cluster.join(firstAddress) - } - val latch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { @@ -65,8 +65,13 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan latch.countDown() } }) + testConductor.enter("listener-2-registered") + + runOn(third) { + cluster.join(firstAddress) + } + latch.await - cluster.convergence.isDefined must be(true) testConductor.enter("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index b74fdd09db..f4f42f0117 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -52,7 +52,7 @@ abstract class SunnyWeatherSpec } // add a few more - awaitClusterUp(first, second, third, fourth, fifth) + awaitClusterUp(roles: _*) log.info("5 joined") val unexpected = new AtomicReference[SortedSet[Member]]