diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index fd7bfa0de9..17988fd7ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -174,6 +174,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto var latestGossip: Gossip = Gossip() var stashedLeaderChanged: Option[LeaderChanged] = None + var publishedLeaderChanged: Option[LeaderChanged] = None def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) @@ -203,16 +204,26 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestGossip = newGossip diff(oldGossip, newGossip) foreach { event ⇒ event match { - case LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ + case x @ LeaderChanged(_) if Some(x) == publishedLeaderChanged ⇒ + // skip, this leader has already been published + + case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ stashedLeaderChanged = None - eventStream publish event + publishedLeaderChanged = Some(x) + eventStream publish x + case x: LeaderChanged ⇒ // publish later, when convergence stashedLeaderChanged = Some(x) + case ConvergenceChanged(true) ⇒ - stashedLeaderChanged foreach { eventStream publish _ } - stashedLeaderChanged = None + stashedLeaderChanged foreach { + publishedLeaderChanged = stashedLeaderChanged + stashedLeaderChanged = None + eventStream publish _ + } eventStream publish event + case MemberUnreachable(m) ⇒ eventStream publish event // notify DeathWatch about unreachable node diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index d7c76270f3..c29f237be7 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -7,6 +7,7 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet import scala.concurrent.util.duration._ +import org.scalatest.BeforeAndAfterEach import akka.actor.Address import akka.actor.Props import akka.cluster.MemberStatus._ @@ -14,6 +15,7 @@ import akka.cluster.InternalClusterAction._ import akka.cluster.ClusterEvent._ import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender +import akka.actor.ActorRef object ClusterDomainEventPublisherSpec { val config = """ @@ -27,10 +29,11 @@ object ClusterDomainEventPublisherSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) with ImplicitSender { +class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) + with BeforeAndAfterEach with ImplicitSender { import ClusterDomainEventPublisherSpec._ - val publisher = system.actorOf(Props[ClusterDomainEventPublisher], name = "test-publisher") + var publisher: ActorRef = _ val a1 = Member(Address("akka", "sys", "a", 2552), Up) val b1 = Member(Address("akka", "sys", "b", 2552), Up) val c1 = Member(Address("akka", "sys", "c", 2552), Joining) @@ -44,15 +47,18 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) - "ClusterDomainEventPublisher" must { + override def beforeEach(): Unit = { + publisher = system.actorOf(Props[ClusterDomainEventPublisher]) + publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) + expectMsgType[CurrentClusterState] + } - "send snapshot when starting subscription" in { - publisher ! PublishChanges(g0, g1) - publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) - val state = expectMsgType[CurrentClusterState] - state.members must be(g1.members) - state.convergence must be(true) - } + override def afterEach(): Unit = { + publisher ! Unsubscribe(testActor) + system.stop(publisher) + } + + "ClusterDomainEventPublisher" must { "publish MemberUp when member status changed to Up" in { publisher ! PublishChanges(g1, g2) @@ -67,30 +73,49 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish expectMsgType[SeenChanged] } - "publish leader changed when new leader and after convergence" in { + "publish leader changed when new leader after convergence" in { publisher ! PublishChanges(g3, g4) expectMsg(MemberUp(d1)) expectMsg(ConvergenceChanged(false)) expectMsgType[SeenChanged] + expectNoMsg(1 second) publisher ! PublishChanges(g4, g5) expectMsg(LeaderChanged(Some(d1.address))) expectMsg(ConvergenceChanged(true)) expectMsgType[SeenChanged] + } + "publish leader changed when new leader and convergence both before and after" in { // convergence both before and after publisher ! PublishChanges(g3, g5) expectMsg(MemberUp(d1)) expectMsg(LeaderChanged(Some(d1.address))) expectMsgType[SeenChanged] - expectNoMsg(1 second) + } - // not convergence + "not publish leader changed when not convergence" in { publisher ! PublishChanges(g2, g4) expectMsg(MemberUp(d1)) expectNoMsg(1 second) } + "not publish leader changed when changed convergence but still same leader" in { + publisher ! PublishChanges(g2, g5) + expectMsg(MemberUp(d1)) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g5, g4) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g4, g5) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + } + } }