diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 50644e431c..6db32a7316 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -191,7 +191,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet - var latestGossip: Gossip = Gossip() + var latestGossip: Gossip = Gossip.empty var stats = ClusterStats() @@ -283,7 +283,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def join(address: Address): Unit = { if (!latestGossip.members.exists(_.address == address)) { // wipe our state since a node that joins a cluster must be empty - latestGossip = Gossip() + latestGossip = Gossip.empty // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() // wipe the publisher since we are starting fresh @@ -377,7 +377,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def removing(address: Address): Unit = { log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) // just cleaning up the gossip state - latestGossip = Gossip() + latestGossip = Gossip.empty publish(latestGossip) context.become(removed) // make sure the final (removed) state is published diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 7d1997649d..7e7befeb4e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -238,8 +238,8 @@ object ClusterEvent { private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging { import InternalClusterAction._ - var latestGossip: Gossip = Gossip() - var latestConvergedGossip: Gossip = Gossip() + var latestGossip: Gossip = Gossip.empty + var latestConvergedGossip: Gossip = Gossip.empty var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty def receive = { @@ -313,7 +313,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def publish(event: AnyRef): Unit = eventStream publish event - def publishStart(): Unit = clearState() + def publishStart(): Unit = + if ((latestGossip ne Gossip.empty) || (latestConvergedGossip ne Gossip.empty)) { + clearState() + publishCurrentClusterState(None) + } def publishDone(receiver: ActorRef): Unit = { clearState() @@ -321,7 +325,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } def clearState(): Unit = { - latestGossip = Gossip() - latestConvergedGossip = Gossip() + latestGossip = Gossip.empty + latestConvergedGossip = Gossip.empty } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 1f96434995..9173e977a5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -13,6 +13,10 @@ import MemberStatus._ */ private[cluster] object Gossip { val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty + val empty: Gossip = new Gossip(Gossip.emptyMembers) + + def apply(members: immutable.SortedSet[Member]) = + if (members.isEmpty) empty else empty.copy(members = members) } /** @@ -49,8 +53,8 @@ private[cluster] object Gossip { * removed node telling it to shut itself down. */ private[cluster] case class Gossip( + members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address overview: GossipOverview = GossipOverview(), - members: immutable.SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address version: VectorClock = VectorClock()) // vector clock version extends ClusterMessage // is a serializable cluster message with Versioned[Gossip] { @@ -128,7 +132,7 @@ private[cluster] case class Gossip( // 4. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] - Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedVClock) + Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock) } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 6c13c79f23..37412db930 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -134,5 +134,17 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec expectMsg(MemberUp(c2)) expectMsgType[SeenChanged] } + + "publish clean state when PublishStart" in { + publisher ! PublishChanges(g3) + expectMsg(MemberUp(b1)) + expectMsg(MemberUp(c2)) + expectMsgType[SeenChanged] + + publisher ! PublishStart + expectMsgType[CurrentClusterState] must be(CurrentClusterState()) + + } + } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 704b12dac0..63707558ce 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -29,7 +29,7 @@ class GossipSpec extends WordSpec with MustMatchers { "A Gossip" must { "reach convergence when it's empty" in { - Gossip().convergence must be(true) + Gossip.empty.convergence must be(true) } "merge members by status priority" in {