Add another test case for publish of LeaderChanged, see #2518
* It didn't handle convergence changes with same leader correctly
This commit is contained in:
parent
c0c6cc3931
commit
718686e2f2
2 changed files with 53 additions and 17 deletions
|
|
@ -174,6 +174,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
|
||||
var latestGossip: Gossip = Gossip()
|
||||
var stashedLeaderChanged: Option[LeaderChanged] = None
|
||||
var publishedLeaderChanged: Option[LeaderChanged] = None
|
||||
|
||||
def receive = {
|
||||
case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip)
|
||||
|
|
@ -203,16 +204,26 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
latestGossip = newGossip
|
||||
diff(oldGossip, newGossip) foreach { event ⇒
|
||||
event match {
|
||||
case LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒
|
||||
case x @ LeaderChanged(_) if Some(x) == publishedLeaderChanged ⇒
|
||||
// skip, this leader has already been published
|
||||
|
||||
case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒
|
||||
stashedLeaderChanged = None
|
||||
eventStream publish event
|
||||
publishedLeaderChanged = Some(x)
|
||||
eventStream publish x
|
||||
|
||||
case x: LeaderChanged ⇒
|
||||
// publish later, when convergence
|
||||
stashedLeaderChanged = Some(x)
|
||||
|
||||
case ConvergenceChanged(true) ⇒
|
||||
stashedLeaderChanged foreach { eventStream publish _ }
|
||||
stashedLeaderChanged = None
|
||||
stashedLeaderChanged foreach {
|
||||
publishedLeaderChanged = stashedLeaderChanged
|
||||
stashedLeaderChanged = None
|
||||
eventStream publish _
|
||||
}
|
||||
eventStream publish event
|
||||
|
||||
case MemberUnreachable(m) ⇒
|
||||
eventStream publish event
|
||||
// notify DeathWatch about unreachable node
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.cluster
|
|||
import language.postfixOps
|
||||
import scala.collection.immutable.SortedSet
|
||||
import scala.concurrent.util.duration._
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import akka.actor.Address
|
||||
import akka.actor.Props
|
||||
import akka.cluster.MemberStatus._
|
||||
|
|
@ -14,6 +15,7 @@ import akka.cluster.InternalClusterAction._
|
|||
import akka.cluster.ClusterEvent._
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object ClusterDomainEventPublisherSpec {
|
||||
val config = """
|
||||
|
|
@ -27,10 +29,11 @@ object ClusterDomainEventPublisherSpec {
|
|||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) with ImplicitSender {
|
||||
class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config)
|
||||
with BeforeAndAfterEach with ImplicitSender {
|
||||
import ClusterDomainEventPublisherSpec._
|
||||
|
||||
val publisher = system.actorOf(Props[ClusterDomainEventPublisher], name = "test-publisher")
|
||||
var publisher: ActorRef = _
|
||||
val a1 = Member(Address("akka", "sys", "a", 2552), Up)
|
||||
val b1 = Member(Address("akka", "sys", "b", 2552), Up)
|
||||
val c1 = Member(Address("akka", "sys", "c", 2552), Joining)
|
||||
|
|
@ -44,15 +47,18 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
|||
val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address)
|
||||
val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address)
|
||||
|
||||
"ClusterDomainEventPublisher" must {
|
||||
override def beforeEach(): Unit = {
|
||||
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
|
||||
publisher ! Subscribe(testActor, classOf[ClusterDomainEvent])
|
||||
expectMsgType[CurrentClusterState]
|
||||
}
|
||||
|
||||
"send snapshot when starting subscription" in {
|
||||
publisher ! PublishChanges(g0, g1)
|
||||
publisher ! Subscribe(testActor, classOf[ClusterDomainEvent])
|
||||
val state = expectMsgType[CurrentClusterState]
|
||||
state.members must be(g1.members)
|
||||
state.convergence must be(true)
|
||||
}
|
||||
override def afterEach(): Unit = {
|
||||
publisher ! Unsubscribe(testActor)
|
||||
system.stop(publisher)
|
||||
}
|
||||
|
||||
"ClusterDomainEventPublisher" must {
|
||||
|
||||
"publish MemberUp when member status changed to Up" in {
|
||||
publisher ! PublishChanges(g1, g2)
|
||||
|
|
@ -67,30 +73,49 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
|||
expectMsgType[SeenChanged]
|
||||
}
|
||||
|
||||
"publish leader changed when new leader and after convergence" in {
|
||||
"publish leader changed when new leader after convergence" in {
|
||||
publisher ! PublishChanges(g3, g4)
|
||||
expectMsg(MemberUp(d1))
|
||||
expectMsg(ConvergenceChanged(false))
|
||||
expectMsgType[SeenChanged]
|
||||
expectNoMsg(1 second)
|
||||
|
||||
publisher ! PublishChanges(g4, g5)
|
||||
expectMsg(LeaderChanged(Some(d1.address)))
|
||||
expectMsg(ConvergenceChanged(true))
|
||||
expectMsgType[SeenChanged]
|
||||
}
|
||||
|
||||
"publish leader changed when new leader and convergence both before and after" in {
|
||||
// convergence both before and after
|
||||
publisher ! PublishChanges(g3, g5)
|
||||
expectMsg(MemberUp(d1))
|
||||
expectMsg(LeaderChanged(Some(d1.address)))
|
||||
expectMsgType[SeenChanged]
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
// not convergence
|
||||
"not publish leader changed when not convergence" in {
|
||||
publisher ! PublishChanges(g2, g4)
|
||||
expectMsg(MemberUp(d1))
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"not publish leader changed when changed convergence but still same leader" in {
|
||||
publisher ! PublishChanges(g2, g5)
|
||||
expectMsg(MemberUp(d1))
|
||||
expectMsg(LeaderChanged(Some(d1.address)))
|
||||
expectMsg(ConvergenceChanged(true))
|
||||
expectMsgType[SeenChanged]
|
||||
|
||||
publisher ! PublishChanges(g5, g4)
|
||||
expectMsg(ConvergenceChanged(false))
|
||||
expectMsgType[SeenChanged]
|
||||
|
||||
publisher ! PublishChanges(g4, g5)
|
||||
expectMsg(ConvergenceChanged(true))
|
||||
expectMsgType[SeenChanged]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue