diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 17988fd7ab..60172ed6a4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -173,8 +173,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto import InternalClusterAction._ var latestGossip: Gossip = Gossip() - var stashedLeaderChanged: Option[LeaderChanged] = None - var publishedLeaderChanged: Option[LeaderChanged] = None + + // Keep track of LeaderChanged event. Should not be published until + // convergence, and it should only be published when leader actually + // changed to another node. 3 states: + // - None: No LeaderChanged detected yet, nothing published yet + // - Some(Left): Stashed LeaderChanged to be published later, when convergence + // - Some(Right): Latest published LeaderChanged + var leaderChangedState: Option[Either[LeaderChanged, LeaderChanged]] = None def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) @@ -204,23 +210,26 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestGossip = newGossip diff(oldGossip, newGossip) foreach { event ⇒ event match { - case x @ LeaderChanged(_) if Some(x) == publishedLeaderChanged ⇒ + case x @ LeaderChanged(_) if leaderChangedState == Some(Right(x)) ⇒ // skip, this leader has already been published case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ - stashedLeaderChanged = None - publishedLeaderChanged = Some(x) + // leader changed and immediate convergence + leaderChangedState = Some(Right(x)) eventStream publish x case x: LeaderChanged ⇒ // publish later, when convergence - stashedLeaderChanged = Some(x) + leaderChangedState = Some(Left(x)) case ConvergenceChanged(true) ⇒ - stashedLeaderChanged foreach { - publishedLeaderChanged = stashedLeaderChanged - stashedLeaderChanged = None - eventStream publish _ + // now it's convergence, publish eventual stashed LeaderChanged event + leaderChangedState match { + case Some(Left(x)) ⇒ + leaderChangedState = Some(Right(x)) + eventStream publish x + + case _ ⇒ // nothing stashed } eventStream publish event @@ -228,7 +237,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto eventStream publish event // notify DeathWatch about unreachable node eventStream publish AddressTerminated(m.address) - case _ ⇒ eventStream publish event + + case _ ⇒ + // all other events + eventStream publish event } } }