Merge pull request #1112 from akka/wip-3017-singleton-bug-patriknw

Hardening of another corner case in cluster singleton, see #3017
This commit is contained in:
Patrik Nordwall 2013-02-10 08:07:32 -08:00
commit af7ca554c9
4 changed files with 98 additions and 63 deletions

View file

@ -339,7 +339,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
var latestGossip: Gossip = Gossip.empty
var latestConvergedGossip: Gossip = Gossip.empty
var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty
var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty
def receive = {
case PublishChanges(newGossip) publishChanges(newGossip)
@ -401,14 +401,16 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
val newMemberEvents = diffMemberEvents(oldGossip, newGossip)
convertToInstantMemberEvents(newMemberEvents) foreach publish
// buffer up the MemberEvents waiting for convergence
memberEvents ++= newMemberEvents
// if we have convergence then publish the MemberEvents and possibly a LeaderChanged
bufferedEvents ++= newMemberEvents
// buffer up the LeaderChanged waiting for convergence
bufferedEvents ++= diffLeader(oldGossip, newGossip)
// if we have convergence then publish the MemberEvents and LeaderChanged
if (newGossip.convergence) {
val previousConvergedGossip = latestConvergedGossip
latestConvergedGossip = newGossip
memberEvents foreach { event
bufferedEvents foreach { event
event match {
case m @ (MemberDowned(_) | MemberRemoved(_))
case m: MemberEvent if m.isInstanceOf[MemberDowned] || m.isInstanceOf[MemberRemoved]
// TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788
// but right now we don't change Downed to Removed
publish(event)
@ -417,8 +419,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case _ publish(event)
}
}
memberEvents = immutable.Seq.empty
diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish
bufferedEvents = Vector.empty
}
// publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach publish

View file

@ -23,18 +23,25 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
with BeforeAndAfterEach with ImplicitSender {
var publisher: ActorRef = _
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up)
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up)
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Joining)
val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up)
val d1 = Member(Address("akka.tcp", "sys", "a", 2551), Up)
val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up)
val aLeaving = aUp.copy(status = Leaving)
val aExiting = aUp.copy(status = Exiting)
val aRemoved = aUp.copy(status = Removed)
val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up)
val bRemoved = bUp.copy(status = Removed)
val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining)
val cUp = cJoining.copy(status = Up)
val cRemoved = cUp.copy(status = Removed)
val dUp = Member(Address("akka.tcp", "sys", "a", 2551), Up)
val g0 = Gossip(members = SortedSet(a1)).seen(a1.address)
val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address)
val g2 = Gossip(members = SortedSet(a1, b1, c2)).seen(a1.address)
val g3 = g2.seen(b1.address).seen(c2.address)
val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address)
val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address)
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address)
val g1 = Gossip(members = SortedSet(aUp, bUp, cJoining)).seen(aUp.address).seen(bUp.address).seen(cJoining.address)
val g2 = Gossip(members = SortedSet(aUp, bUp, cUp)).seen(aUp.address)
val g3 = g2.seen(bUp.address).seen(cUp.address)
val g4 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address)
val g5 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(dUp.address)
val g6 = Gossip(members = SortedSet(aLeaving, bUp, cUp)).seen(aUp.address)
val g7 = Gossip(members = SortedSet(aExiting, bUp, cUp)).seen(aUp.address)
// created in beforeEach
var memberSubscriber: TestProbe = _
@ -46,8 +53,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
publisher ! PublishChanges(g0)
memberSubscriber.expectMsg(MemberUp(a1))
memberSubscriber.expectMsg(LeaderChanged(Some(a1.address)))
memberSubscriber.expectMsg(MemberUp(aUp))
memberSubscriber.expectMsg(LeaderChanged(Some(aUp.address)))
}
override def afterEach(): Unit = {
@ -63,8 +70,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"publish MemberEvents when there is convergence" in {
publisher ! PublishChanges(g2)
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
}
"publish leader changed when new leader after convergence" in {
@ -72,20 +79,39 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
memberSubscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g5)
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
}
"publish leader changed when new leader and convergence both before and after" in {
// convergence both before and after
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g5)
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
}
"publish leader changed when old leader leaves and is removed" in {
publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g6)
memberSubscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g7)
memberSubscriber.expectNoMsg(1 second)
// at the removed member a an empty gossip is the last thing
publisher ! PublishChanges(Gossip.empty)
memberSubscriber.expectMsg(MemberLeft(aLeaving))
memberSubscriber.expectMsg(MemberExited(aExiting))
memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address)))
memberSubscriber.expectMsg(MemberRemoved(aRemoved))
memberSubscriber.expectMsg(MemberRemoved(bRemoved))
memberSubscriber.expectMsg(MemberRemoved(cRemoved))
memberSubscriber.expectMsg(LeaderChanged(None))
}
"not publish leader changed when not convergence" in {
@ -95,10 +121,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"not publish leader changed when changed convergence but still same leader" in {
publisher ! PublishChanges(g5)
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
publisher ! PublishChanges(g4)
memberSubscriber.expectNoMsg(1 second)
@ -124,8 +150,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second)
// but memberSubscriber is still subscriber
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
}
"publish clean state when PublishStart" in {
@ -134,10 +160,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
subscriber.expectMsgType[InstantClusterState]
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(g3)
subscriber.expectMsg(InstantMemberUp(b1))
subscriber.expectMsg(InstantMemberUp(c2))
subscriber.expectMsg(MemberUp(b1))
subscriber.expectMsg(MemberUp(c2))
subscriber.expectMsg(InstantMemberUp(bUp))
subscriber.expectMsg(InstantMemberUp(cUp))
subscriber.expectMsg(MemberUp(bUp))
subscriber.expectMsg(MemberUp(cUp))
subscriber.expectMsgType[SeenChanged]
publisher ! PublishStart
@ -149,8 +175,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! Subscribe(subscriber.ref, classOf[InstantMemberEvent])
subscriber.expectMsgType[InstantClusterState]
publisher ! PublishChanges(g2)
subscriber.expectMsg(InstantMemberUp(b1))
subscriber.expectMsg(InstantMemberUp(c2))
subscriber.expectMsg(InstantMemberUp(bUp))
subscriber.expectMsg(InstantMemberUp(cUp))
subscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second)

View file

@ -65,7 +65,7 @@ object ClusterSingletonManager {
case object TakeOverFromMe
case class HandOverRetry(count: Int)
case class TakeOverRetry(leaderPeer: ActorRef, count: Int)
case class TakeOverRetry(count: Int)
case object Cleanup
case object StartLeaderChangedBuffer
@ -83,7 +83,7 @@ object ClusterSingletonManager {
case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false,
handOverData: Option[Any] = None) extends Data
case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any],
newLeader: Address) extends Data
newLeaderOption: Option[Address]) extends Data
case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data
val HandOverRetryTimer = "hand-over-retry"
@ -475,13 +475,13 @@ class ClusterSingletonManager(
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
case Some(a)
// send TakeOver request in case the new leader doesn't know previous leader
val leaderPeer = peer(a)
leaderPeer ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false)
goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a)
case _
peer(a) ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false)
goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = Some(a))
case None
// new leader will initiate the hand-over
stay
setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false)
goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = None)
}
case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData))
@ -495,20 +495,19 @@ class ClusterSingletonManager(
}
when(WasLeader) {
case Event(TakeOverRetry(leaderPeer, count), _)
val newLeader = leaderPeer.path.address
case Event(TakeOverRetry(count), WasLeaderData(_, _, _, newLeaderOption))
if (count <= maxTakeOverRetries) {
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeader)
leaderPeer ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false)
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeaderOption)
newLeaderOption foreach { peer(_) ! TakeOverFromMe }
setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), retryInterval, repeat = false)
stay
} else
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeader}] never occured")
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeaderOption}] never occured")
case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _))
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, newLeader)) if m.address == newLeader
case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, Some(newLeader))) if m.address == newLeader
addDowned(m.address)
gotoHandingOver(singleton, singletonTerminated, handOverData, None)

View file

@ -155,12 +155,12 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
def receive = {
case state: CurrentClusterState leaderAddress = state.leader
case LeaderChanged(leader) leaderAddress = leader
case other => consumer foreach { _ forward other }
case LeaderChanged(leader) leaderAddress = leader
case other consumer foreach { _ forward other }
}
def consumer: Option[ActorRef] =
leaderAddress map (a => context.actorFor(RootActorPath(a) /
leaderAddress map (a context.actorFor(RootActorPath(a) /
"user" / "singleton" / "consumer"))
}
//#singleton-proxy
@ -283,8 +283,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
verify(newLeaderRole, msg = 3, expectedCurrent = 2)
}
"hand over when adding three new potential leaders to 3 nodes cluster" in within(30 seconds) {
"hand over when adding three new potential leaders to 3 nodes cluster" in within(60 seconds) {
// this test will result in restart after retry timeout
// because the new leader will not know about the real previous leader and the
// previous leader sortedClusterRoles(3) will first think that sortedClusterRoles(2)
// is the new leader
runOn(controller) {
queue ! Reset
expectMsg(ResetOk)
}
runOn(sortedClusterRoles(2)) {
// previous leader
Cluster(system) join node(sortedClusterRoles(3)).address
createSingleton()
}
@ -297,10 +306,10 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
createSingleton()
}
verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 3)
verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 0)
}
"hand over when leader leaves in 6 nodes cluster " in within(20 seconds) {
"hand over when leader leaves in 6 nodes cluster " in within(30 seconds) {
//#test-leave
val leaveRole = sortedClusterRoles(0)
val newLeaderRole = sortedClusterRoles(1)