Publish cluster LeaderChanged only when convergence, see #2518

This commit is contained in:
Patrik Nordwall 2012-09-18 14:19:38 +02:00
parent 175dd4c547
commit c0c6cc3931
5 changed files with 127 additions and 22 deletions

View file

@ -94,9 +94,9 @@ object ClusterEvent {
case class ConvergenceChanged(convergence: Boolean) extends ClusterDomainEvent
/**
* Leader of the cluster members changed, and/or convergence status.
* Leader of the cluster members changed. Only published after convergence.
*/
case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent
/**
* INTERNAL API
@ -150,7 +150,7 @@ object ClusterEvent {
val convergenceEvents = if (convergenceChanged) Seq(ConvergenceChanged(newConvergence)) else Seq.empty
val leaderEvents =
if (convergenceChanged || newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader, newConvergence))
if (newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader))
else Seq.empty
val newSeenBy = newGossip.seenBy
@ -159,7 +159,7 @@ object ClusterEvent {
else Seq.empty
memberEvents.toIndexedSeq ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++
convergenceEvents ++ leaderEvents ++ seenEvents
leaderEvents ++ convergenceEvents ++ seenEvents
}
}
@ -173,6 +173,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
import InternalClusterAction._
var latestGossip: Gossip = Gossip()
var stashedLeaderChanged: Option[LeaderChanged] = None
def receive = {
case PublishChanges(oldGossip, newGossip) publishChanges(oldGossip, newGossip)
@ -201,11 +202,22 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
// keep the latestGossip to be sent to new subscribers
latestGossip = newGossip
diff(oldGossip, newGossip) foreach { event
eventStream publish event
// notify DeathWatch about unreachable node
event match {
case MemberUnreachable(m) eventStream publish AddressTerminated(m.address)
case _
case LeaderChanged(_) if oldGossip.convergence && newGossip.convergence
stashedLeaderChanged = None
eventStream publish event
case x: LeaderChanged
// publish later, when convergence
stashedLeaderChanged = Some(x)
case ConvergenceChanged(true)
stashedLeaderChanged foreach { eventStream publish _ }
stashedLeaderChanged = None
eventStream publish event
case MemberUnreachable(m)
eventStream publish event
// notify DeathWatch about unreachable node
eventStream publish AddressTerminated(m.address)
case _ eventStream publish event
}
}
}