2017-07-04 09:09:40 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.cluster
|
|
|
|
|
|
2017-10-31 10:07:24 +00:00
|
|
|
import akka.actor.ActorSystem
|
2017-09-04 13:21:34 +02:00
|
|
|
import akka.cluster.ClusterEvent._
|
2017-07-05 11:08:55 +02:00
|
|
|
import akka.remote.testconductor.RoleName
|
2017-07-04 09:09:40 +01:00
|
|
|
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
|
|
|
|
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
2017-08-22 15:02:27 +02:00
|
|
|
import akka.testkit.TestProbe
|
2017-07-04 09:09:40 +01:00
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
|
2017-09-04 13:21:34 +02:00
|
|
|
import scala.concurrent.Await
|
2017-10-31 10:07:24 +00:00
|
|
|
import scala.concurrent.duration._
|
2017-07-04 09:09:40 +01:00
|
|
|
|
2017-07-04 17:11:21 +02:00
|
|
|
object MultiDcSplitBrainMultiJvmSpec extends MultiNodeConfig {
|
2017-07-04 09:09:40 +01:00
|
|
|
val first = role("first")
|
|
|
|
|
val second = role("second")
|
|
|
|
|
val third = role("third")
|
|
|
|
|
val fourth = role("fourth")
|
2017-09-04 13:21:34 +02:00
|
|
|
val fifth = role("fifth")
|
2017-07-04 09:09:40 +01:00
|
|
|
|
2017-07-05 11:08:55 +02:00
|
|
|
commonConfig(ConfigFactory.parseString(
|
|
|
|
|
"""
|
2017-08-28 14:40:56 +02:00
|
|
|
akka.loglevel = DEBUG
|
|
|
|
|
akka.cluster.debug.verbose-heartbeat-logging = on
|
|
|
|
|
akka.remote.netty.tcp.connection-timeout = 5 s # speedup in case of connection issue
|
|
|
|
|
akka.remote.retry-gate-closed-for = 1 s
|
2017-08-22 15:02:27 +02:00
|
|
|
akka.cluster.multi-data-center {
|
|
|
|
|
failure-detector {
|
|
|
|
|
acceptable-heartbeat-pause = 4s
|
|
|
|
|
heartbeat-interval = 1s
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-09-04 13:21:34 +02:00
|
|
|
akka.cluster {
|
|
|
|
|
gossip-interval = 1s
|
|
|
|
|
leader-actions-interval = 1s
|
|
|
|
|
auto-down-unreachable-after = 1s
|
|
|
|
|
}
|
2017-07-05 11:08:55 +02:00
|
|
|
""").withFallback(MultiNodeClusterSpec.clusterConfig))
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
nodeConfig(first, second)(ConfigFactory.parseString(
|
|
|
|
|
"""
|
2017-07-12 11:47:32 +01:00
|
|
|
akka.cluster.multi-data-center.self-data-center = "dc1"
|
2017-07-04 09:09:40 +01:00
|
|
|
"""))
|
|
|
|
|
|
2017-09-04 13:21:34 +02:00
|
|
|
nodeConfig(third, fourth, fifth)(ConfigFactory.parseString(
|
2017-07-04 09:09:40 +01:00
|
|
|
"""
|
2017-07-12 11:47:32 +01:00
|
|
|
akka.cluster.multi-data-center.self-data-center = "dc2"
|
2017-07-04 09:09:40 +01:00
|
|
|
"""))
|
|
|
|
|
|
|
|
|
|
testTransport(on = true)
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-05 11:08:55 +02:00
|
|
|
class MultiDcSplitBrainMultiJvmNode1 extends MultiDcSplitBrainSpec
|
|
|
|
|
class MultiDcSplitBrainMultiJvmNode2 extends MultiDcSplitBrainSpec
|
|
|
|
|
class MultiDcSplitBrainMultiJvmNode3 extends MultiDcSplitBrainSpec
|
|
|
|
|
class MultiDcSplitBrainMultiJvmNode4 extends MultiDcSplitBrainSpec
|
2017-09-04 13:21:34 +02:00
|
|
|
class MultiDcSplitBrainMultiJvmNode5 extends MultiDcSplitBrainSpec
|
2017-07-04 09:09:40 +01:00
|
|
|
|
2017-07-04 17:11:21 +02:00
|
|
|
abstract class MultiDcSplitBrainSpec
|
|
|
|
|
extends MultiNodeSpec(MultiDcSplitBrainMultiJvmSpec)
|
2017-07-04 09:09:40 +01:00
|
|
|
with MultiNodeClusterSpec {
|
|
|
|
|
|
2017-07-04 17:11:21 +02:00
|
|
|
import MultiDcSplitBrainMultiJvmSpec._
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
val dc1 = List(first, second)
|
2017-09-04 13:21:34 +02:00
|
|
|
val dc2 = List(third, fourth, fifth)
|
2017-08-22 15:02:27 +02:00
|
|
|
var barrierCounter = 0
|
|
|
|
|
|
|
|
|
|
def splitDataCenters(notMembers: Set[RoleName]): Unit = {
|
|
|
|
|
val memberNodes = (dc1 ++ dc2).filterNot(notMembers)
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
runOn(memberNodes: _*) {
|
|
|
|
|
cluster.subscribe(probe.ref, classOf[DataCenterReachabilityEvent])
|
|
|
|
|
probe.expectMsgType[CurrentClusterState]
|
|
|
|
|
}
|
|
|
|
|
enterBarrier(s"split-$barrierCounter")
|
|
|
|
|
barrierCounter += 1
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
runOn(first) {
|
2017-08-22 15:02:27 +02:00
|
|
|
for (dc1Node ← dc1; dc2Node ← dc2) {
|
2017-07-04 09:09:40 +01:00
|
|
|
testConductor.blackhole(dc1Node, dc2Node, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-08-22 15:02:27 +02:00
|
|
|
|
|
|
|
|
enterBarrier(s"after-split-$barrierCounter")
|
|
|
|
|
barrierCounter += 1
|
|
|
|
|
|
|
|
|
|
runOn(memberNodes: _*) {
|
|
|
|
|
probe.expectMsgType[UnreachableDataCenter](15.seconds)
|
|
|
|
|
cluster.unsubscribe(probe.ref)
|
|
|
|
|
runOn(dc1: _*) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
cluster.state.unreachableDataCenters should ===(Set("dc2"))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
runOn(dc2: _*) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
cluster.state.unreachableDataCenters should ===(Set("dc1"))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
cluster.state.unreachable should ===(Set.empty)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier(s"after-split-verified-$barrierCounter")
|
|
|
|
|
barrierCounter += 1
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
|
2017-08-22 15:02:27 +02:00
|
|
|
def unsplitDataCenters(notMembers: Set[RoleName]): Unit = {
|
|
|
|
|
val memberNodes = (dc1 ++ dc2).filterNot(notMembers)
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
runOn(memberNodes: _*) {
|
2017-08-28 14:40:56 +02:00
|
|
|
cluster.subscribe(probe.ref, classOf[ReachableDataCenter])
|
2017-08-22 15:02:27 +02:00
|
|
|
probe.expectMsgType[CurrentClusterState]
|
|
|
|
|
}
|
|
|
|
|
enterBarrier(s"unsplit-$barrierCounter")
|
|
|
|
|
barrierCounter += 1
|
|
|
|
|
|
2017-07-04 09:09:40 +01:00
|
|
|
runOn(first) {
|
2017-08-22 15:02:27 +02:00
|
|
|
for (dc1Node ← dc1; dc2Node ← dc2) {
|
2017-07-04 09:09:40 +01:00
|
|
|
testConductor.passThrough(dc1Node, dc2Node, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-22 15:02:27 +02:00
|
|
|
enterBarrier(s"after-unsplit-$barrierCounter")
|
|
|
|
|
barrierCounter += 1
|
|
|
|
|
|
|
|
|
|
runOn(memberNodes: _*) {
|
2017-08-28 14:40:56 +02:00
|
|
|
probe.expectMsgType[ReachableDataCenter](25.seconds)
|
2017-08-22 15:02:27 +02:00
|
|
|
cluster.unsubscribe(probe.ref)
|
|
|
|
|
awaitAssert {
|
|
|
|
|
cluster.state.unreachableDataCenters should ===(Set.empty)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
enterBarrier(s"after-unsplit-verified-$barrierCounter")
|
|
|
|
|
barrierCounter += 1
|
|
|
|
|
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
|
2017-07-04 17:11:21 +02:00
|
|
|
"A cluster with multiple data centers" must {
|
|
|
|
|
"be able to form two data centers" in {
|
2017-07-04 09:09:40 +01:00
|
|
|
awaitClusterUp(first, second, third)
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-04 17:11:21 +02:00
|
|
|
"be able to have a data center member join while there is inter data center split" in within(20.seconds) {
|
|
|
|
|
// introduce a split between data centers
|
2017-09-04 13:21:34 +02:00
|
|
|
splitDataCenters(notMembers = Set(fourth, fifth))
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
runOn(fourth) {
|
|
|
|
|
cluster.join(third)
|
|
|
|
|
}
|
2017-07-04 17:11:21 +02:00
|
|
|
enterBarrier("inter-data-center unreachability")
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
// should be able to join and become up since the
|
|
|
|
|
// split is between dc1 and dc2
|
|
|
|
|
runOn(third, fourth) {
|
|
|
|
|
awaitAssert(clusterView.members.collect {
|
2017-07-04 17:11:21 +02:00
|
|
|
case m if m.dataCenter == "dc2" && m.status == MemberStatus.Up ⇒ m.address
|
2017-07-05 11:08:55 +02:00
|
|
|
} should ===(Set(address(third), address(fourth))))
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
enterBarrier("dc2-join-completed")
|
|
|
|
|
|
2017-09-04 13:21:34 +02:00
|
|
|
unsplitDataCenters(notMembers = Set(fifth))
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
runOn(dc1: _*) {
|
|
|
|
|
awaitAssert(clusterView.members.collect {
|
2017-07-04 17:11:21 +02:00
|
|
|
case m if m.dataCenter == "dc2" && m.status == MemberStatus.Up ⇒ m.address
|
2017-07-05 11:08:55 +02:00
|
|
|
} should ===(Set(address(third), address(fourth))))
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
|
2017-07-04 17:11:21 +02:00
|
|
|
enterBarrier("inter-data-center-split-1-done")
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
|
2017-07-04 17:11:21 +02:00
|
|
|
"be able to have data center member leave while there is inter data center split" in within(20.seconds) {
|
2017-09-04 13:21:34 +02:00
|
|
|
splitDataCenters(notMembers = Set(fifth))
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
runOn(fourth) {
|
2017-07-05 11:08:55 +02:00
|
|
|
cluster.leave(fourth)
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
|
2017-07-05 11:08:55 +02:00
|
|
|
runOn(third) {
|
2017-07-04 09:09:40 +01:00
|
|
|
awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("node-4-left")
|
|
|
|
|
|
2017-09-04 13:21:34 +02:00
|
|
|
unsplitDataCenters(notMembers = Set(fourth, fifth))
|
2017-07-04 09:09:40 +01:00
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
awaitAssert(clusterView.members.filter(_.address == address(fourth)) should ===(Set.empty))
|
|
|
|
|
}
|
2017-07-04 17:11:21 +02:00
|
|
|
enterBarrier("inter-data-center-split-2-done")
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
|
2017-10-31 10:07:24 +00:00
|
|
|
"be able to have data center member restart (same host:port) while there is inter data center split" in within(60.seconds) {
|
2017-09-04 13:21:34 +02:00
|
|
|
val subscribeProbe = TestProbe()
|
|
|
|
|
runOn(first, second, third, fifth) {
|
|
|
|
|
Cluster(system).subscribe(subscribeProbe.ref, InitialStateAsSnapshot, classOf[MemberUp], classOf[MemberRemoved])
|
|
|
|
|
subscribeProbe.expectMsgType[CurrentClusterState]
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("subscribed")
|
|
|
|
|
runOn(fifth) {
|
|
|
|
|
Cluster(system).join(third)
|
|
|
|
|
}
|
|
|
|
|
var fifthOriginalUniqueAddress: Option[UniqueAddress] = None
|
|
|
|
|
runOn(first, second, third, fifth) {
|
|
|
|
|
awaitAssert(clusterView.members.collect {
|
|
|
|
|
case m if m.dataCenter == "dc2" && m.status == MemberStatus.Up ⇒ m.address
|
|
|
|
|
} should ===(Set(address(third), address(fifth))))
|
|
|
|
|
fifthOriginalUniqueAddress = clusterView.members.collectFirst { case m if m.address == address(fifth) ⇒ m.uniqueAddress }
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("fifth-joined")
|
|
|
|
|
|
|
|
|
|
splitDataCenters(notMembers = Set(fourth))
|
|
|
|
|
|
|
|
|
|
runOn(fifth) {
|
|
|
|
|
Cluster(system).shutdown()
|
|
|
|
|
}
|
|
|
|
|
runOn(third) {
|
|
|
|
|
awaitAssert(clusterView.members.collect {
|
|
|
|
|
case m if m.dataCenter == "dc2" ⇒ m.address
|
|
|
|
|
} should ===(Set(address(third))))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("fifth-removed")
|
|
|
|
|
|
|
|
|
|
runOn(fifth) {
|
|
|
|
|
// we can't use any multi-jvm test facilities on fifth after this, because have to shutdown
|
|
|
|
|
// actor system to be able to start new with same port
|
|
|
|
|
val thirdAddress = address(third)
|
|
|
|
|
enterBarrier("fifth-waiting-for-termination")
|
|
|
|
|
Await.ready(system.whenTerminated, remaining)
|
|
|
|
|
|
|
|
|
|
val restartedSystem = ActorSystem(
|
|
|
|
|
system.name,
|
|
|
|
|
ConfigFactory.parseString(s"""
|
|
|
|
|
akka.remote.netty.tcp.port = ${Cluster(system).selfAddress.port.get}
|
|
|
|
|
akka.remote.artery.canonical.port = ${Cluster(system).selfAddress.port.get}
|
|
|
|
|
akka.coordinated-shutdown.terminate-actor-system = on
|
|
|
|
|
""").withFallback(system.settings.config))
|
|
|
|
|
Cluster(restartedSystem).join(thirdAddress)
|
|
|
|
|
Await.ready(restartedSystem.whenTerminated, remaining)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// no multi-jvm test facilities on fifth after this
|
|
|
|
|
val remainingRoles = roles.filterNot(_ == fifth)
|
|
|
|
|
|
|
|
|
|
runOn(remainingRoles: _*) {
|
|
|
|
|
enterBarrier("fifth-waiting-for-termination")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
for (dc1Node ← dc1; dc2Node ← dc2) {
|
|
|
|
|
testConductor.passThrough(dc1Node, dc2Node, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
testConductor.shutdown(fifth)
|
|
|
|
|
}
|
|
|
|
|
runOn(remainingRoles: _*) {
|
|
|
|
|
enterBarrier("fifth-restarted")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runOn(first, second, third) {
|
|
|
|
|
awaitAssert(clusterView.members.collectFirst {
|
|
|
|
|
case m if m.dataCenter == "dc2" && m.address == fifthOriginalUniqueAddress.get.address ⇒ m.uniqueAddress
|
2017-10-31 10:07:24 +00:00
|
|
|
} should not be fifthOriginalUniqueAddress) // different uid
|
|
|
|
|
|
2017-09-04 13:21:34 +02:00
|
|
|
subscribeProbe.expectMsgType[MemberUp].member.uniqueAddress should ===(fifthOriginalUniqueAddress.get)
|
|
|
|
|
subscribeProbe.expectMsgType[MemberRemoved].member.uniqueAddress should ===(fifthOriginalUniqueAddress.get)
|
|
|
|
|
subscribeProbe.expectMsgType[MemberUp].member.address should ===(fifthOriginalUniqueAddress.get.address)
|
|
|
|
|
}
|
|
|
|
|
runOn(remainingRoles: _*) {
|
|
|
|
|
enterBarrier("fifth-re-joined")
|
|
|
|
|
}
|
|
|
|
|
runOn(first) {
|
|
|
|
|
// to shutdown the restartedSystem on fifth
|
|
|
|
|
Cluster(system).leave(fifthOriginalUniqueAddress.get.address)
|
|
|
|
|
}
|
|
|
|
|
runOn(first, second, third) {
|
2017-10-31 10:07:24 +00:00
|
|
|
awaitAssert({
|
|
|
|
|
clusterView.members.map(_.address) should ===(Set(address(first), address(second), address(third)))
|
|
|
|
|
})
|
2017-09-04 13:21:34 +02:00
|
|
|
}
|
|
|
|
|
runOn(remainingRoles: _*) {
|
|
|
|
|
enterBarrier("restarted-fifth-removed")
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-07-04 09:09:40 +01:00
|
|
|
}
|
|
|
|
|
}
|