Buffer LeaderChanged events and publish all on convergence, see #3017

* Otherwise some changes might never be published, since it doesn't have
  to be convergence on all nodes inbetween all transitions.
* Detected by a failure ClusterSingletonManagerSpec.
* Added a test to simulate the failure scenario.
This commit is contained in:
Patrik Nordwall 2013-02-08 09:17:55 +01:00
parent 4ee299c729
commit d32a2edc51
3 changed files with 80 additions and 44 deletions

View file

@ -339,7 +339,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
var latestGossip: Gossip = Gossip.empty
var latestConvergedGossip: Gossip = Gossip.empty
var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty
var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty
def receive = {
case PublishChanges(newGossip) publishChanges(newGossip)
@ -401,14 +401,16 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
val newMemberEvents = diffMemberEvents(oldGossip, newGossip)
convertToInstantMemberEvents(newMemberEvents) foreach publish
// buffer up the MemberEvents waiting for convergence
memberEvents ++= newMemberEvents
// if we have convergence then publish the MemberEvents and possibly a LeaderChanged
bufferedEvents ++= newMemberEvents
// buffer up the LeaderChanged waiting for convergence
bufferedEvents ++= diffLeader(oldGossip, newGossip)
// if we have convergence then publish the MemberEvents and LeaderChanged
if (newGossip.convergence) {
val previousConvergedGossip = latestConvergedGossip
latestConvergedGossip = newGossip
memberEvents foreach { event
bufferedEvents foreach { event
event match {
case m @ (MemberDowned(_) | MemberRemoved(_))
case m: MemberEvent if m.isInstanceOf[MemberDowned] || m.isInstanceOf[MemberRemoved]
// TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788
// but right now we don't change Downed to Removed
publish(event)
@ -417,8 +419,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case _ publish(event)
}
}
memberEvents = immutable.Seq.empty
diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish
bufferedEvents = Vector.empty
}
// publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach publish

View file

@ -23,18 +23,25 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
with BeforeAndAfterEach with ImplicitSender {
var publisher: ActorRef = _
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up)
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up)
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Joining)
val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up)
val d1 = Member(Address("akka.tcp", "sys", "a", 2551), Up)
val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up)
val aLeaving = aUp.copy(status = Leaving)
val aExiting = aUp.copy(status = Exiting)
val aRemoved = aUp.copy(status = Removed)
val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up)
val bRemoved = bUp.copy(status = Removed)
val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining)
val cUp = cJoining.copy(status = Up)
val cRemoved = cUp.copy(status = Removed)
val dUp = Member(Address("akka.tcp", "sys", "a", 2551), Up)
val g0 = Gossip(members = SortedSet(a1)).seen(a1.address)
val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address)
val g2 = Gossip(members = SortedSet(a1, b1, c2)).seen(a1.address)
val g3 = g2.seen(b1.address).seen(c2.address)
val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address)
val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address)
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address)
val g1 = Gossip(members = SortedSet(aUp, bUp, cJoining)).seen(aUp.address).seen(bUp.address).seen(cJoining.address)
val g2 = Gossip(members = SortedSet(aUp, bUp, cUp)).seen(aUp.address)
val g3 = g2.seen(bUp.address).seen(cUp.address)
val g4 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address)
val g5 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(dUp.address)
val g6 = Gossip(members = SortedSet(aLeaving, bUp, cUp)).seen(aUp.address)
val g7 = Gossip(members = SortedSet(aExiting, bUp, cUp)).seen(aUp.address)
// created in beforeEach
var memberSubscriber: TestProbe = _
@ -46,8 +53,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
publisher ! PublishChanges(g0)
memberSubscriber.expectMsg(MemberUp(a1))
memberSubscriber.expectMsg(LeaderChanged(Some(a1.address)))
memberSubscriber.expectMsg(MemberUp(aUp))
memberSubscriber.expectMsg(LeaderChanged(Some(aUp.address)))
}
override def afterEach(): Unit = {
@ -63,8 +70,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"publish MemberEvents when there is convergence" in {
publisher ! PublishChanges(g2)
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
}
"publish leader changed when new leader after convergence" in {
@ -72,20 +79,39 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
memberSubscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g5)
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
}
"publish leader changed when new leader and convergence both before and after" in {
// convergence both before and after
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g5)
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
}
"publish leader changed when old leader leaves and is removed" in {
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g6)
memberSubscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g7)
memberSubscriber.expectNoMsg(1 second)
// at the removed member a an empty gossip is the last thing
publisher ! PublishChanges(Gossip.empty)
memberSubscriber.expectMsg(MemberLeft(aLeaving))
memberSubscriber.expectMsg(MemberExited(aExiting))
memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address)))
memberSubscriber.expectMsg(MemberRemoved(aRemoved))
memberSubscriber.expectMsg(MemberRemoved(bRemoved))
memberSubscriber.expectMsg(MemberRemoved(cRemoved))
memberSubscriber.expectMsg(LeaderChanged(None))
}
"not publish leader changed when not convergence" in {
@ -95,10 +121,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"not publish leader changed when changed convergence but still same leader" in {
publisher ! PublishChanges(g5)
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
publisher ! PublishChanges(g4)
memberSubscriber.expectNoMsg(1 second)
@ -124,8 +150,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second)
// but memberSubscriber is still subscriber
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
}
"publish clean state when PublishStart" in {
@ -134,10 +160,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
subscriber.expectMsgType[InstantClusterState]
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(g3)
subscriber.expectMsg(InstantMemberUp(b1))
subscriber.expectMsg(InstantMemberUp(c2))
subscriber.expectMsg(MemberUp(b1))
subscriber.expectMsg(MemberUp(c2))
subscriber.expectMsg(InstantMemberUp(bUp))
subscriber.expectMsg(InstantMemberUp(cUp))
subscriber.expectMsg(MemberUp(bUp))
subscriber.expectMsg(MemberUp(cUp))
subscriber.expectMsgType[SeenChanged]
publisher ! PublishStart
@ -149,8 +175,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! Subscribe(subscriber.ref, classOf[InstantMemberEvent])
subscriber.expectMsgType[InstantClusterState]
publisher ! PublishChanges(g2)
subscriber.expectMsg(InstantMemberUp(b1))
subscriber.expectMsg(InstantMemberUp(c2))
subscriber.expectMsg(InstantMemberUp(bUp))
subscriber.expectMsg(InstantMemberUp(cUp))
subscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second)

View file

@ -283,8 +283,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
verify(newLeaderRole, msg = 3, expectedCurrent = 2)
}
"hand over when adding three new potential leaders to 3 nodes cluster" in within(30 seconds) {
"hand over when adding three new potential leaders to 3 nodes cluster" in within(60 seconds) {
// this test will result in restart after retry timeout
// because the new leader will not know about the real previous leader and the
// previous leader sortedClusterRoles(3) will first think that sortedClusterRoles(2)
// is the new leader
runOn(controller) {
queue ! Reset
expectMsg(ResetOk)
}
runOn(sortedClusterRoles(2)) {
// previous leader
Cluster(system) join node(sortedClusterRoles(3)).address
createSingleton()
}
@ -297,7 +306,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
createSingleton()
}
verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 3)
verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 0)
}
"hand over when leader leaves in 6 nodes cluster " in within(30 seconds) {