From 1e4b2585c7fceae8b4c409dfae0faa0b9c37fffc Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Mar 2013 12:41:15 +0100 Subject: [PATCH] Publish LeaderChanged when first seen, see #3131 * The problem in ClusterSingletonManagerChaosSpec was that node 4 doesn't publish LeaderChanged, because there is never convergence on node 4 of the new Up state for the three new nodes before they are shutdown. When it becomes convergence on node 4 prevConvergedGossip and newGossip have same leader (i.e. no change). * LeaderChanged is now published when the new leader is first seen, i.e. same as member events. This makes sense now when leader can't be in Joining state. --- .../scala/akka/cluster/ClusterEvent.scala | 20 +++--------- .../ClusterDomainEventPublisherSpec.scala | 31 +++---------------- 2 files changed, 9 insertions(+), 42 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index bf27cb186f..6b61a5c7d8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -96,7 +96,8 @@ object ClusterEvent { } /** - * Leader of the cluster members changed. Only published after convergence. + * Leader of the cluster members changed. Published when the state change + * is first seen on a node. */ case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent { /** @@ -210,8 +211,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto import InternalClusterAction._ var latestGossip: Gossip = Gossip.empty - var latestConvergedGossip: Gossip = Gossip.empty - var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty override def preRestart(reason: Throwable, message: Option[Any]) { // don't postStop when restarted, no children to stop @@ -243,7 +242,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto members = latestGossip.members, unreachable = latestGossip.overview.unreachable, seenBy = latestGossip.seenBy, - leader = latestConvergedGossip.leader) + leader = latestGossip.leader) receiver match { case Some(ref) ⇒ ref ! state case None ⇒ publish(state) @@ -275,15 +274,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case _ ⇒ publish(event) } } - // buffer up the LeaderChanged waiting for convergence - bufferedEvents ++= diffLeader(oldGossip, newGossip) - // if we have convergence then publish the MemberEvents and LeaderChanged - if (newGossip.convergence) { - val previousConvergedGossip = latestConvergedGossip - latestConvergedGossip = newGossip - bufferedEvents foreach publish - bufferedEvents = Vector.empty - } + diffLeader(oldGossip, newGossip) foreach publish // publish internal SeenState for testing purposes diffSeen(oldGossip, newGossip) foreach publish } @@ -293,13 +284,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def publish(event: AnyRef): Unit = eventStream publish event def publishStart(): Unit = - if ((latestGossip ne Gossip.empty) || (latestConvergedGossip ne Gossip.empty)) { + if (latestGossip ne Gossip.empty) { clearState() publishCurrentClusterState(None) } def clearState(): Unit = { latestGossip = Gossip.empty - latestConvergedGossip = Gossip.empty } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 196308e2e6..3d3ce0ac3e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -67,25 +67,13 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectMsg(MemberUp(cUp)) } - "publish leader changed when new leader after convergence" in { + "publish leader changed" in { publisher ! PublishChanges(g4) memberSubscriber.expectMsg(MemberUp(dUp)) memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberUp(cUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) memberSubscriber.expectNoMsg(1 second) - - publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) - } - - "publish leader changed when new leader and convergence both before and after" in { - // convergence both before and after - publisher ! PublishChanges(g3) - memberSubscriber.expectMsg(MemberUp(bUp)) - memberSubscriber.expectMsg(MemberUp(cUp)) - publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(dUp)) - memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) } "publish leader changed when old leader leaves and is removed" in { @@ -96,34 +84,23 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectNoMsg(1 second) publisher ! PublishChanges(g7) memberSubscriber.expectMsg(MemberExited(aExiting)) + memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address))) memberSubscriber.expectNoMsg(1 second) // at the removed member a an empty gossip is the last thing publisher ! PublishChanges(Gossip.empty) memberSubscriber.expectMsg(MemberRemoved(aRemoved)) memberSubscriber.expectMsg(MemberRemoved(bRemoved)) memberSubscriber.expectMsg(MemberRemoved(cRemoved)) - memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address))) memberSubscriber.expectMsg(LeaderChanged(None)) } - "not publish leader changed when not convergence" in { + "not publish leader changed when same leader" in { publisher ! PublishChanges(g4) memberSubscriber.expectMsg(MemberUp(dUp)) memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberUp(cUp)) - memberSubscriber.expectNoMsg(1 second) - } - - "not publish leader changed when changed convergence but still same leader" in { - publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(dUp)) - memberSubscriber.expectMsg(MemberUp(bUp)) - memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) - publisher ! PublishChanges(g4) - memberSubscriber.expectNoMsg(1 second) - publisher ! PublishChanges(g5) memberSubscriber.expectNoMsg(1 second) }