diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala index 33f4097222..292169699d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSplitBrainSpec.scala @@ -73,11 +73,6 @@ abstract class MultiDcSplitBrainSpec extends MultiNodeSpec(MultiDcSplitBrainMult def splitDataCenters(doNotVerify: Set[RoleName]): Unit = { splits += 1 val memberNodes = (dc1 ++ dc2).filterNot(doNotVerify) - val probe = TestProbe() - runOn(memberNodes: _*) { - cluster.subscribe(probe.ref, classOf[UnreachableDataCenter]) - probe.expectMsgType[CurrentClusterState] - } enterBarrier(s"split-$splits") runOn(first) { @@ -89,18 +84,19 @@ abstract class MultiDcSplitBrainSpec extends MultiNodeSpec(MultiDcSplitBrainMult enterBarrier(s"after-split-$splits") runOn(memberNodes: _*) { - probe.expectMsgType[UnreachableDataCenter](15.seconds) - cluster.unsubscribe(probe.ref) - runOn(dc1: _*) { - awaitAssert { - cluster.state.unreachableDataCenters should ===(Set("dc2")) - } - } - runOn(dc2: _*) { - awaitAssert { - cluster.state.unreachableDataCenters should ===(Set("dc1")) + within(15.seconds) { + runOn(dc1: _*) { + awaitAssert { + cluster.state.unreachableDataCenters should ===(Set("dc2")) + } + } + runOn(dc2: _*) { + awaitAssert { + cluster.state.unreachableDataCenters should ===(Set("dc1")) + } } } + system.log.debug("Cluster state after split: {}", cluster.state) cluster.state.unreachable should ===(Set.empty) } enterBarrier(s"after-split-verified-$splits") @@ -109,11 +105,6 @@ abstract class MultiDcSplitBrainSpec extends MultiNodeSpec(MultiDcSplitBrainMult def unsplitDataCenters(notMembers: Set[RoleName]): Unit = { unsplits += 1 val memberNodes = (dc1 ++ dc2).filterNot(notMembers) - val probe = TestProbe() - runOn(memberNodes: _*) { - cluster.subscribe(probe.ref, classOf[ReachableDataCenter]) - probe.expectMsgType[CurrentClusterState] - } enterBarrier(s"unsplit-$unsplits") runOn(first) { @@ -125,12 +116,11 @@ abstract class MultiDcSplitBrainSpec extends MultiNodeSpec(MultiDcSplitBrainMult enterBarrier(s"after-unsplit-$unsplits") runOn(memberNodes: _*) { - probe.expectMsgType[ReachableDataCenter](25.seconds) - system.log.debug("Reachable data center received") - cluster.unsubscribe(probe.ref) - awaitAssert { - cluster.state.unreachableDataCenters should ===(Set.empty) - system.log.debug("Cluster state: {}", cluster.state) + within(15.seconds) { + awaitAssert { + cluster.state.unreachableDataCenters should ===(Set.empty) + } + system.log.debug("Cluster state after unsplit: {}", cluster.state) } } enterBarrier(s"after-unsplit-verified-$unsplits") @@ -141,6 +131,28 @@ abstract class MultiDcSplitBrainSpec extends MultiNodeSpec(MultiDcSplitBrainMult awaitClusterUp(first, second, third) } + "publish DataCenterReachabilityEvent" in { + val dcReachabilityProbe = TestProbe() + Cluster(system).subscribe( + dcReachabilityProbe.ref, + ClusterEvent.InitialStateAsSnapshot, + classOf[DataCenterReachabilityEvent]) + dcReachabilityProbe.expectMsgType[CurrentClusterState].unreachableDataCenters should ===(Set.empty) + + splitDataCenters(doNotVerify = Set(fourth, fifth)) + runOn(first, second, third) { + dcReachabilityProbe.expectMsgType[UnreachableDataCenter] + } + enterBarrier("unreachable-verified") + + unsplitDataCenters(notMembers = Set(fourth, fifth)) + runOn(first, second, third) { + dcReachabilityProbe.expectMsgType[ReachableDataCenter] + } + + enterBarrier("publish-event-verified") + } + "be able to have a data center member join while there is inter data center split" in within(20.seconds) { // introduce a split between data centers splitDataCenters(doNotVerify = Set(fourth, fifth)) @@ -189,6 +201,7 @@ abstract class MultiDcSplitBrainSpec extends MultiNodeSpec(MultiDcSplitBrainMult runOn(first, second) { awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty)) } + enterBarrier("inter-data-center-split-2-done") }