diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 509536b3d8..8b0e7a4ffc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -339,7 +339,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto var latestGossip: Gossip = Gossip.empty var latestConvergedGossip: Gossip = Gossip.empty - var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty + var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty def receive = { case PublishChanges(newGossip) ⇒ publishChanges(newGossip) @@ -401,14 +401,16 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val newMemberEvents = diffMemberEvents(oldGossip, newGossip) convertToInstantMemberEvents(newMemberEvents) foreach publish // buffer up the MemberEvents waiting for convergence - memberEvents ++= newMemberEvents - // if we have convergence then publish the MemberEvents and possibly a LeaderChanged + bufferedEvents ++= newMemberEvents + // buffer up the LeaderChanged waiting for convergence + bufferedEvents ++= diffLeader(oldGossip, newGossip) + // if we have convergence then publish the MemberEvents and LeaderChanged if (newGossip.convergence) { val previousConvergedGossip = latestConvergedGossip latestConvergedGossip = newGossip - memberEvents foreach { event ⇒ + bufferedEvents foreach { event ⇒ event match { - case m @ (MemberDowned(_) | MemberRemoved(_)) ⇒ + case m: MemberEvent if m.isInstanceOf[MemberDowned] || m.isInstanceOf[MemberRemoved] ⇒ // TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788 // but right now we don't change Downed to Removed publish(event) @@ -417,8 +419,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case _ ⇒ publish(event) } } - memberEvents = immutable.Seq.empty - diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish + bufferedEvents = Vector.empty } // publish internal SeenState for testing purposes diffSeen(oldGossip, newGossip) foreach publish diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index fe6fbdb8f0..a7c1608b00 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -23,18 +23,25 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { var publisher: ActorRef = _ - val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up) - val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up) - val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Joining) - val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up) - val d1 = Member(Address("akka.tcp", "sys", "a", 2551), Up) + val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up) + val aLeaving = aUp.copy(status = Leaving) + val aExiting = aUp.copy(status = Exiting) + val aRemoved = aUp.copy(status = Removed) + val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up) + val bRemoved = bUp.copy(status = Removed) + val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining) + val cUp = cJoining.copy(status = Up) + val cRemoved = cUp.copy(status = Removed) + val dUp = Member(Address("akka.tcp", "sys", "a", 2551), Up) - val g0 = Gossip(members = SortedSet(a1)).seen(a1.address) - val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address) - val g2 = Gossip(members = SortedSet(a1, b1, c2)).seen(a1.address) - val g3 = g2.seen(b1.address).seen(c2.address) - val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) - val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) + val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address) + val g1 = Gossip(members = SortedSet(aUp, bUp, cJoining)).seen(aUp.address).seen(bUp.address).seen(cJoining.address) + val g2 = Gossip(members = SortedSet(aUp, bUp, cUp)).seen(aUp.address) + val g3 = g2.seen(bUp.address).seen(cUp.address) + val g4 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address) + val g5 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(dUp.address) + val g6 = Gossip(members = SortedSet(aLeaving, bUp, cUp)).seen(aUp.address) + val g7 = Gossip(members = SortedSet(aExiting, bUp, cUp)).seen(aUp.address) // created in beforeEach var memberSubscriber: TestProbe = _ @@ -46,8 +53,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher = system.actorOf(Props[ClusterDomainEventPublisher]) publisher ! PublishChanges(g0) - memberSubscriber.expectMsg(MemberUp(a1)) - memberSubscriber.expectMsg(LeaderChanged(Some(a1.address))) + memberSubscriber.expectMsg(MemberUp(aUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(aUp.address))) } override def afterEach(): Unit = { @@ -63,8 +70,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "publish MemberEvents when there is convergence" in { publisher ! PublishChanges(g2) publisher ! PublishChanges(g3) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) } "publish leader changed when new leader after convergence" in { @@ -72,20 +79,39 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectNoMsg(1 second) publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(d1)) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) - memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) + memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) } "publish leader changed when new leader and convergence both before and after" in { // convergence both before and after publisher ! PublishChanges(g3) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(d1)) - memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) + memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) + } + + "publish leader changed when old leader leaves and is removed" in { + publisher ! PublishChanges(g3) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) + publisher ! PublishChanges(g6) + memberSubscriber.expectNoMsg(1 second) + publisher ! PublishChanges(g7) + memberSubscriber.expectNoMsg(1 second) + // at the removed member a an empty gossip is the last thing + publisher ! PublishChanges(Gossip.empty) + memberSubscriber.expectMsg(MemberLeft(aLeaving)) + memberSubscriber.expectMsg(MemberExited(aExiting)) + memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address))) + memberSubscriber.expectMsg(MemberRemoved(aRemoved)) + memberSubscriber.expectMsg(MemberRemoved(bRemoved)) + memberSubscriber.expectMsg(MemberRemoved(cRemoved)) + memberSubscriber.expectMsg(LeaderChanged(None)) } "not publish leader changed when not convergence" in { @@ -95,10 +121,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "not publish leader changed when changed convergence but still same leader" in { publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(d1)) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) - memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) + memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) publisher ! PublishChanges(g4) memberSubscriber.expectNoMsg(1 second) @@ -124,8 +150,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher ! PublishChanges(g3) subscriber.expectNoMsg(1 second) // but memberSubscriber is still subscriber - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) } "publish clean state when PublishStart" in { @@ -134,10 +160,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec subscriber.expectMsgType[InstantClusterState] subscriber.expectMsgType[CurrentClusterState] publisher ! PublishChanges(g3) - subscriber.expectMsg(InstantMemberUp(b1)) - subscriber.expectMsg(InstantMemberUp(c2)) - subscriber.expectMsg(MemberUp(b1)) - subscriber.expectMsg(MemberUp(c2)) + subscriber.expectMsg(InstantMemberUp(bUp)) + subscriber.expectMsg(InstantMemberUp(cUp)) + subscriber.expectMsg(MemberUp(bUp)) + subscriber.expectMsg(MemberUp(cUp)) subscriber.expectMsgType[SeenChanged] publisher ! PublishStart @@ -149,8 +175,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher ! Subscribe(subscriber.ref, classOf[InstantMemberEvent]) subscriber.expectMsgType[InstantClusterState] publisher ! PublishChanges(g2) - subscriber.expectMsg(InstantMemberUp(b1)) - subscriber.expectMsg(InstantMemberUp(c2)) + subscriber.expectMsg(InstantMemberUp(bUp)) + subscriber.expectMsg(InstantMemberUp(cUp)) subscriber.expectNoMsg(1 second) publisher ! PublishChanges(g3) subscriber.expectNoMsg(1 second) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 0ceea4cddf..7ecc34327f 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -283,8 +283,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS verify(newLeaderRole, msg = 3, expectedCurrent = 2) } - "hand over when adding three new potential leaders to 3 nodes cluster" in within(30 seconds) { + "hand over when adding three new potential leaders to 3 nodes cluster" in within(60 seconds) { + // this test will result in restart after retry timeout + // because the new leader will not know about the real previous leader and the + // previous leader sortedClusterRoles(3) will first think that sortedClusterRoles(2) + // is the new leader + runOn(controller) { + queue ! Reset + expectMsg(ResetOk) + } runOn(sortedClusterRoles(2)) { + // previous leader Cluster(system) join node(sortedClusterRoles(3)).address createSingleton() } @@ -297,7 +306,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS createSingleton() } - verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 3) + verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 0) } "hand over when leader leaves in 6 nodes cluster " in within(30 seconds) {