harden CrossDcHeartbeatSenderSpec - wait until active (#31219)
This commit is contained in:
parent
1371cb6a35
commit
ad5864c6d8
1 changed files with 20 additions and 8 deletions
|
|
@ -6,20 +6,25 @@ package akka.cluster
|
||||||
|
|
||||||
import scala.collection.immutable.SortedSet
|
import scala.collection.immutable.SortedSet
|
||||||
|
|
||||||
import akka.actor.{ ActorSelection, Address, Props }
|
import akka.actor.ActorSelection
|
||||||
|
import akka.actor.Address
|
||||||
|
import akka.actor.Props
|
||||||
import akka.cluster.ClusterEvent.CurrentClusterState
|
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||||
import akka.cluster.ClusterHeartbeatSender.Heartbeat
|
import akka.cluster.ClusterHeartbeatSender.Heartbeat
|
||||||
|
import akka.cluster.CrossDcHeartbeatSender.ReportStatus
|
||||||
import akka.cluster.CrossDcHeartbeatSenderSpec.TestCrossDcHeartbeatSender
|
import akka.cluster.CrossDcHeartbeatSenderSpec.TestCrossDcHeartbeatSender
|
||||||
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
|
import akka.testkit.AkkaSpec
|
||||||
|
import akka.testkit.ImplicitSender
|
||||||
|
import akka.testkit.TestProbe
|
||||||
import akka.util.Version
|
import akka.util.Version
|
||||||
|
|
||||||
object CrossDcHeartbeatSenderSpec {
|
object CrossDcHeartbeatSenderSpec {
|
||||||
class TestCrossDcHeartbeatSender(probe: TestProbe) extends CrossDcHeartbeatSender {
|
class TestCrossDcHeartbeatSender(heartbeatProbe: TestProbe) extends CrossDcHeartbeatSender {
|
||||||
// disable register for cluster events
|
// disable register for cluster events
|
||||||
override def preStart(): Unit = {}
|
override def preStart(): Unit = {}
|
||||||
|
|
||||||
override def heartbeatReceiver(address: Address): ActorSelection = {
|
override def heartbeatReceiver(address: Address): ActorSelection = {
|
||||||
context.actorSelection(probe.ref.path)
|
context.actorSelection(heartbeatProbe.ref.path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -36,18 +41,25 @@ class CrossDcHeartbeatSenderSpec extends AkkaSpec("""
|
||||||
""") with ImplicitSender {
|
""") with ImplicitSender {
|
||||||
"CrossDcHeartBeatSender" should {
|
"CrossDcHeartBeatSender" should {
|
||||||
"increment heart beat sequence nr" in {
|
"increment heart beat sequence nr" in {
|
||||||
val probe = TestProbe()
|
|
||||||
|
val heartbeatProbe = TestProbe()
|
||||||
Cluster(system).join(Cluster(system).selfMember.address)
|
Cluster(system).join(Cluster(system).selfMember.address)
|
||||||
awaitAssert(Cluster(system).selfMember.status == MemberStatus.Up)
|
awaitAssert(Cluster(system).selfMember.status == MemberStatus.Up)
|
||||||
val underTest = system.actorOf(Props(new TestCrossDcHeartbeatSender(probe)))
|
val underTest = system.actorOf(Props(new TestCrossDcHeartbeatSender(heartbeatProbe)))
|
||||||
|
|
||||||
underTest ! CurrentClusterState(
|
underTest ! CurrentClusterState(
|
||||||
members = SortedSet(
|
members = SortedSet(
|
||||||
Cluster(system).selfMember,
|
Cluster(system).selfMember,
|
||||||
Member(UniqueAddress(Address("akka", system.name), 2L), Set("dc-dc2"), Version.Zero)
|
Member(UniqueAddress(Address("akka", system.name), 2L), Set("dc-dc2"), Version.Zero)
|
||||||
.copy(status = MemberStatus.Up)))
|
.copy(status = MemberStatus.Up)))
|
||||||
|
|
||||||
probe.expectMsgType[Heartbeat].sequenceNr shouldEqual 1
|
awaitAssert {
|
||||||
probe.expectMsgType[Heartbeat].sequenceNr shouldEqual 2
|
underTest ! ReportStatus()
|
||||||
|
expectMsgType[CrossDcHeartbeatSender.MonitoringActive]
|
||||||
|
}
|
||||||
|
|
||||||
|
heartbeatProbe.expectMsgType[Heartbeat].sequenceNr shouldEqual 1
|
||||||
|
heartbeatProbe.expectMsgType[Heartbeat].sequenceNr shouldEqual 2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue