diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 4bb0105413..fd7bfa0de9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -94,9 +94,9 @@ object ClusterEvent { case class ConvergenceChanged(convergence: Boolean) extends ClusterDomainEvent /** - * Leader of the cluster members changed, and/or convergence status. + * Leader of the cluster members changed. Only published after convergence. */ - case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent + case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent /** * INTERNAL API @@ -150,7 +150,7 @@ object ClusterEvent { val convergenceEvents = if (convergenceChanged) Seq(ConvergenceChanged(newConvergence)) else Seq.empty val leaderEvents = - if (convergenceChanged || newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader, newConvergence)) + if (newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader)) else Seq.empty val newSeenBy = newGossip.seenBy @@ -159,7 +159,7 @@ object ClusterEvent { else Seq.empty memberEvents.toIndexedSeq ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++ - convergenceEvents ++ leaderEvents ++ seenEvents + leaderEvents ++ convergenceEvents ++ seenEvents } } @@ -173,6 +173,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto import InternalClusterAction._ var latestGossip: Gossip = Gossip() + var stashedLeaderChanged: Option[LeaderChanged] = None def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) @@ -201,11 +202,22 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto // keep the latestGossip to be sent to new subscribers latestGossip = newGossip diff(oldGossip, newGossip) foreach { event ⇒ - eventStream publish event - // notify DeathWatch about unreachable node event match { - case MemberUnreachable(m) ⇒ eventStream publish AddressTerminated(m.address) - case _ ⇒ + case LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ + stashedLeaderChanged = None + eventStream publish event + case x: LeaderChanged ⇒ + // publish later, when convergence + stashedLeaderChanged = Some(x) + case ConvergenceChanged(true) ⇒ + stashedLeaderChanged foreach { eventStream publish _ } + stashedLeaderChanged = None + eventStream publish event + case MemberUnreachable(m) ⇒ + eventStream publish event + // notify DeathWatch about unreachable node + eventStream publish AddressTerminated(m.address) + case _ ⇒ eventStream publish event } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index ff827574d7..0aa9e6997e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -52,11 +52,11 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) state = state.copy(members = state.members - event.member + event.member) - case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) - case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case _ ⇒ // ignore, not interesting + case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala new file mode 100644 index 0000000000..d7c76270f3 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import language.postfixOps +import scala.collection.immutable.SortedSet +import scala.concurrent.util.duration._ +import akka.actor.Address +import akka.actor.Props +import akka.cluster.MemberStatus._ +import akka.cluster.InternalClusterAction._ +import akka.cluster.ClusterEvent._ +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +object ClusterDomainEventPublisherSpec { + val config = """ + akka.cluster.auto-join = off + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.remote.netty.port = 0 + """ + + case class GossipTo(address: Address) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) with ImplicitSender { + import ClusterDomainEventPublisherSpec._ + + val publisher = system.actorOf(Props[ClusterDomainEventPublisher], name = "test-publisher") + val a1 = Member(Address("akka", "sys", "a", 2552), Up) + val b1 = Member(Address("akka", "sys", "b", 2552), Up) + val c1 = Member(Address("akka", "sys", "c", 2552), Joining) + val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val d1 = Member(Address("akka", "sys", "a", 2551), Up) + + val g0 = Gossip(members = SortedSet(a1)).seen(a1.address) + val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address) + val g2 = Gossip(members = SortedSet(a1, b1, c2)).seen(a1.address) + val g3 = g2.seen(b1.address).seen(c2.address) + val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) + val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) + + "ClusterDomainEventPublisher" must { + + "send snapshot when starting subscription" in { + publisher ! PublishChanges(g0, g1) + publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) + val state = expectMsgType[CurrentClusterState] + state.members must be(g1.members) + state.convergence must be(true) + } + + "publish MemberUp when member status changed to Up" in { + publisher ! PublishChanges(g1, g2) + expectMsg(MemberUp(c2)) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + } + + "publish convergence true when all seen it" in { + publisher ! PublishChanges(g2, g3) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + } + + "publish leader changed when new leader and after convergence" in { + publisher ! PublishChanges(g3, g4) + expectMsg(MemberUp(d1)) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g4, g5) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + + // convergence both before and after + publisher ! PublishChanges(g3, g5) + expectMsg(MemberUp(d1)) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsgType[SeenChanged] + expectNoMsg(1 second) + + // not convergence + publisher ! PublishChanges(g2, g4) + expectMsg(MemberUp(d1)) + expectNoMsg(1 second) + } + + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 1bbffca3c2..3a4e3ee3a4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -69,10 +69,9 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address) val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address) - // LeaderChanged is also published when convergence changed - diff(g1, g2) must be(Seq(ConvergenceChanged(false), LeaderChanged(Some(a1.address), convergence = false), + diff(g1, g2) must be(Seq(ConvergenceChanged(false), SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) - diff(g2, g1) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(a1.address), convergence = true), + diff(g2, g1) must be(Seq(ConvergenceChanged(true), SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) } @@ -81,8 +80,8 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(b1, e1), overview = GossipOverview(unreachable = Set(a1))) val g3 = g2.copy(overview = GossipOverview()).seen(b1.address).seen(e1.address) - diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address), convergence = false))) - diff(g2, g3) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(b1.address), convergence = true), + diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address)))) + diff(g2, g3) must be(Seq(ConvergenceChanged(true), SeenChanged(convergence = true, seenBy = Set(b1.address, e1.address)))) } } diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 0ba5a19b3e..351b5d0a48 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -89,11 +89,9 @@ class StatsFacade extends Actor with ActorLogging { case job: StatsJob ⇒ currentMaster foreach { _ forward job } case state: CurrentClusterState ⇒ - if (state.convergence) - state.leader foreach updateCurrentMaster - case LeaderChanged(Some(leaderAddress), true) ⇒ + state.leader foreach updateCurrentMaster + case LeaderChanged(Some(leaderAddress)) ⇒ updateCurrentMaster(leaderAddress) - case other: LeaderChanged ⇒ // ignore, not convergence } def updateCurrentMaster(leaderAddress: Address): Unit = {