diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 509536b3d8..8b0e7a4ffc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -339,7 +339,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto var latestGossip: Gossip = Gossip.empty var latestConvergedGossip: Gossip = Gossip.empty - var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty + var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty def receive = { case PublishChanges(newGossip) ⇒ publishChanges(newGossip) @@ -401,14 +401,16 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val newMemberEvents = diffMemberEvents(oldGossip, newGossip) convertToInstantMemberEvents(newMemberEvents) foreach publish // buffer up the MemberEvents waiting for convergence - memberEvents ++= newMemberEvents - // if we have convergence then publish the MemberEvents and possibly a LeaderChanged + bufferedEvents ++= newMemberEvents + // buffer up the LeaderChanged waiting for convergence + bufferedEvents ++= diffLeader(oldGossip, newGossip) + // if we have convergence then publish the MemberEvents and LeaderChanged if (newGossip.convergence) { val previousConvergedGossip = latestConvergedGossip latestConvergedGossip = newGossip - memberEvents foreach { event ⇒ + bufferedEvents foreach { event ⇒ event match { - case m @ (MemberDowned(_) | MemberRemoved(_)) ⇒ + case m: MemberEvent if m.isInstanceOf[MemberDowned] || m.isInstanceOf[MemberRemoved] ⇒ // TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788 // but right now we don't change Downed to Removed publish(event) @@ -417,8 +419,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case _ ⇒ publish(event) } } - memberEvents = immutable.Seq.empty - diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish + bufferedEvents = Vector.empty } // publish internal SeenState for testing purposes diffSeen(oldGossip, newGossip) foreach publish diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index fe6fbdb8f0..a7c1608b00 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -23,18 +23,25 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { var publisher: ActorRef = _ - val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up) - val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up) - val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Joining) - val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up) - val d1 = Member(Address("akka.tcp", "sys", "a", 2551), Up) + val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up) + val aLeaving = aUp.copy(status = Leaving) + val aExiting = aUp.copy(status = Exiting) + val aRemoved = aUp.copy(status = Removed) + val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up) + val bRemoved = bUp.copy(status = Removed) + val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining) + val cUp = cJoining.copy(status = Up) + val cRemoved = cUp.copy(status = Removed) + val dUp = Member(Address("akka.tcp", "sys", "a", 2551), Up) - val g0 = Gossip(members = SortedSet(a1)).seen(a1.address) - val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address) - val g2 = Gossip(members = SortedSet(a1, b1, c2)).seen(a1.address) - val g3 = g2.seen(b1.address).seen(c2.address) - val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) - val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) + val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address) + val g1 = Gossip(members = SortedSet(aUp, bUp, cJoining)).seen(aUp.address).seen(bUp.address).seen(cJoining.address) + val g2 = Gossip(members = SortedSet(aUp, bUp, cUp)).seen(aUp.address) + val g3 = g2.seen(bUp.address).seen(cUp.address) + val g4 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address) + val g5 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(dUp.address) + val g6 = Gossip(members = SortedSet(aLeaving, bUp, cUp)).seen(aUp.address) + val g7 = Gossip(members = SortedSet(aExiting, bUp, cUp)).seen(aUp.address) // created in beforeEach var memberSubscriber: TestProbe = _ @@ -46,8 +53,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher = system.actorOf(Props[ClusterDomainEventPublisher]) publisher ! PublishChanges(g0) - memberSubscriber.expectMsg(MemberUp(a1)) - memberSubscriber.expectMsg(LeaderChanged(Some(a1.address))) + memberSubscriber.expectMsg(MemberUp(aUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(aUp.address))) } override def afterEach(): Unit = { @@ -63,8 +70,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "publish MemberEvents when there is convergence" in { publisher ! PublishChanges(g2) publisher ! PublishChanges(g3) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) } "publish leader changed when new leader after convergence" in { @@ -72,20 +79,39 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectNoMsg(1 second) publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(d1)) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) - memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) + memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) } "publish leader changed when new leader and convergence both before and after" in { // convergence both before and after publisher ! PublishChanges(g3) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(d1)) - memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) + memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) + } + + "publish leader changed when old leader leaves and is removed" in { + publisher ! PublishChanges(g3) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) + publisher ! PublishChanges(g6) + memberSubscriber.expectNoMsg(1 second) + publisher ! PublishChanges(g7) + memberSubscriber.expectNoMsg(1 second) + // at the removed member a an empty gossip is the last thing + publisher ! PublishChanges(Gossip.empty) + memberSubscriber.expectMsg(MemberLeft(aLeaving)) + memberSubscriber.expectMsg(MemberExited(aExiting)) + memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address))) + memberSubscriber.expectMsg(MemberRemoved(aRemoved)) + memberSubscriber.expectMsg(MemberRemoved(bRemoved)) + memberSubscriber.expectMsg(MemberRemoved(cRemoved)) + memberSubscriber.expectMsg(LeaderChanged(None)) } "not publish leader changed when not convergence" in { @@ -95,10 +121,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "not publish leader changed when changed convergence but still same leader" in { publisher ! PublishChanges(g5) - memberSubscriber.expectMsg(MemberUp(d1)) - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) - memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) + memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) + memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) publisher ! PublishChanges(g4) memberSubscriber.expectNoMsg(1 second) @@ -124,8 +150,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher ! PublishChanges(g3) subscriber.expectNoMsg(1 second) // but memberSubscriber is still subscriber - memberSubscriber.expectMsg(MemberUp(b1)) - memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) } "publish clean state when PublishStart" in { @@ -134,10 +160,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec subscriber.expectMsgType[InstantClusterState] subscriber.expectMsgType[CurrentClusterState] publisher ! PublishChanges(g3) - subscriber.expectMsg(InstantMemberUp(b1)) - subscriber.expectMsg(InstantMemberUp(c2)) - subscriber.expectMsg(MemberUp(b1)) - subscriber.expectMsg(MemberUp(c2)) + subscriber.expectMsg(InstantMemberUp(bUp)) + subscriber.expectMsg(InstantMemberUp(cUp)) + subscriber.expectMsg(MemberUp(bUp)) + subscriber.expectMsg(MemberUp(cUp)) subscriber.expectMsgType[SeenChanged] publisher ! PublishStart @@ -149,8 +175,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher ! Subscribe(subscriber.ref, classOf[InstantMemberEvent]) subscriber.expectMsgType[InstantClusterState] publisher ! PublishChanges(g2) - subscriber.expectMsg(InstantMemberUp(b1)) - subscriber.expectMsg(InstantMemberUp(c2)) + subscriber.expectMsg(InstantMemberUp(bUp)) + subscriber.expectMsg(InstantMemberUp(cUp)) subscriber.expectNoMsg(1 second) publisher ! PublishChanges(g3) subscriber.expectNoMsg(1 second) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index c641524644..71ed4d4d17 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -65,7 +65,7 @@ object ClusterSingletonManager { case object TakeOverFromMe case class HandOverRetry(count: Int) - case class TakeOverRetry(leaderPeer: ActorRef, count: Int) + case class TakeOverRetry(count: Int) case object Cleanup case object StartLeaderChangedBuffer @@ -83,7 +83,7 @@ object ClusterSingletonManager { case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false, handOverData: Option[Any] = None) extends Data case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], - newLeader: Address) extends Data + newLeaderOption: Option[Address]) extends Data case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data val HandOverRetryTimer = "hand-over-retry" @@ -475,13 +475,13 @@ class ClusterSingletonManager( gotoHandingOver(singleton, singletonTerminated, handOverData, None) case Some(a) ⇒ // send TakeOver request in case the new leader doesn't know previous leader - val leaderPeer = peer(a) - leaderPeer ! TakeOverFromMe - setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false) - goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a) - case _ ⇒ + peer(a) ! TakeOverFromMe + setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) + goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = Some(a)) + case None ⇒ // new leader will initiate the hand-over - stay + setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) + goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = None) } case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData)) ⇒ @@ -495,20 +495,19 @@ class ClusterSingletonManager( } when(WasLeader) { - case Event(TakeOverRetry(leaderPeer, count), _) ⇒ - val newLeader = leaderPeer.path.address + case Event(TakeOverRetry(count), WasLeaderData(_, _, _, newLeaderOption)) ⇒ if (count <= maxTakeOverRetries) { - logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeader) - leaderPeer ! TakeOverFromMe - setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false) + logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeaderOption) + newLeaderOption foreach { peer(_) ! TakeOverFromMe } + setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), retryInterval, repeat = false) stay } else - throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeader}] never occured") + throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeaderOption}] never occured") case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) - case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, newLeader)) if m.address == newLeader ⇒ + case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, Some(newLeader))) if m.address == newLeader ⇒ addDowned(m.address) gotoHandingOver(singleton, singletonTerminated, handOverData, None) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 672ca7037d..7ecc34327f 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -155,12 +155,12 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { def receive = { case state: CurrentClusterState ⇒ leaderAddress = state.leader - case LeaderChanged(leader) ⇒ leaderAddress = leader - case other => consumer foreach { _ forward other } + case LeaderChanged(leader) ⇒ leaderAddress = leader + case other ⇒ consumer foreach { _ forward other } } def consumer: Option[ActorRef] = - leaderAddress map (a => context.actorFor(RootActorPath(a) / + leaderAddress map (a ⇒ context.actorFor(RootActorPath(a) / "user" / "singleton" / "consumer")) } //#singleton-proxy @@ -283,8 +283,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS verify(newLeaderRole, msg = 3, expectedCurrent = 2) } - "hand over when adding three new potential leaders to 3 nodes cluster" in within(30 seconds) { + "hand over when adding three new potential leaders to 3 nodes cluster" in within(60 seconds) { + // this test will result in restart after retry timeout + // because the new leader will not know about the real previous leader and the + // previous leader sortedClusterRoles(3) will first think that sortedClusterRoles(2) + // is the new leader + runOn(controller) { + queue ! Reset + expectMsg(ResetOk) + } runOn(sortedClusterRoles(2)) { + // previous leader Cluster(system) join node(sortedClusterRoles(3)).address createSingleton() } @@ -297,10 +306,10 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS createSingleton() } - verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 3) + verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 0) } - "hand over when leader leaves in 6 nodes cluster " in within(20 seconds) { + "hand over when leader leaves in 6 nodes cluster " in within(30 seconds) { //#test-leave val leaveRole = sortedClusterRoles(0) val newLeaderRole = sortedClusterRoles(1)