2012-06-05 15:46:26 +02:00
|
|
|
/**
|
2013-01-09 01:47:48 +01:00
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
2012-06-05 15:46:26 +02:00
|
|
|
*/
|
|
|
|
|
package akka.cluster
|
|
|
|
|
|
2012-07-26 14:47:21 +02:00
|
|
|
import language.postfixOps
|
2013-04-11 09:18:12 +02:00
|
|
|
import scala.collection.immutable
|
|
|
|
|
import scala.concurrent.duration._
|
2012-10-30 15:08:41 +01:00
|
|
|
import com.typesafe.config.ConfigFactory
|
2013-04-11 09:18:12 +02:00
|
|
|
import akka.actor.ActorSystem
|
|
|
|
|
import akka.actor.ExtendedActorSystem
|
|
|
|
|
import akka.remote.testconductor.RoleName
|
2012-06-05 15:46:26 +02:00
|
|
|
import akka.remote.testkit.MultiNodeConfig
|
|
|
|
|
import akka.remote.testkit.MultiNodeSpec
|
2012-12-12 12:20:54 +01:00
|
|
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
2013-04-11 09:18:12 +02:00
|
|
|
import akka.testkit._
|
|
|
|
|
import akka.actor.Actor
|
|
|
|
|
import akka.actor.ActorRef
|
|
|
|
|
import akka.actor.Props
|
|
|
|
|
import akka.actor.RootActorPath
|
2012-06-05 15:46:26 +02:00
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig {
|
2012-06-05 15:46:26 +02:00
|
|
|
val first = role("first")
|
|
|
|
|
val second = role("second")
|
|
|
|
|
val third = role("third")
|
|
|
|
|
val fourth = role("fourth")
|
|
|
|
|
|
2012-12-12 12:20:54 +01:00
|
|
|
commonConfig(ConfigFactory.parseString(
|
|
|
|
|
"""
|
2013-03-20 10:32:18 +01:00
|
|
|
# this setting is here to limit the number of retries and failures while the
|
|
|
|
|
# node is being blackholed
|
|
|
|
|
akka.remote.failure-detector.retry-gate-closed-for = 500 ms
|
|
|
|
|
|
2013-01-17 16:19:31 +01:00
|
|
|
akka.remote.log-remote-lifecycle-events = off
|
2012-12-12 12:20:54 +01:00
|
|
|
akka.cluster.publish-stats-interval = 0s
|
2013-04-11 09:18:12 +02:00
|
|
|
""").withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)))
|
2012-10-05 14:52:18 +02:00
|
|
|
|
|
|
|
|
testTransport(on = true)
|
2012-06-05 15:46:26 +02:00
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
class EndActor(testActor: ActorRef) extends Actor {
|
|
|
|
|
def receive = { case msg ⇒ testActor forward msg }
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-06-05 15:46:26 +02:00
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
class UnreachableNodeJoinsAgainMultiJvmNode1 extends UnreachableNodeJoinsAgainSpec
|
|
|
|
|
class UnreachableNodeJoinsAgainMultiJvmNode2 extends UnreachableNodeJoinsAgainSpec
|
|
|
|
|
class UnreachableNodeJoinsAgainMultiJvmNode3 extends UnreachableNodeJoinsAgainSpec
|
|
|
|
|
class UnreachableNodeJoinsAgainMultiJvmNode4 extends UnreachableNodeJoinsAgainSpec
|
2012-06-27 13:54:43 +02:00
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
abstract class UnreachableNodeJoinsAgainSpec
|
|
|
|
|
extends MultiNodeSpec(UnreachableNodeJoinsAgainMultiNodeConfig)
|
2012-06-28 14:52:12 +02:00
|
|
|
with MultiNodeClusterSpec {
|
2012-09-06 21:48:40 +02:00
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
import UnreachableNodeJoinsAgainMultiNodeConfig._
|
2012-06-05 15:46:26 +02:00
|
|
|
|
2012-10-01 20:08:21 +02:00
|
|
|
muteMarkingAsUnreachable()
|
|
|
|
|
|
2012-10-30 15:08:41 +01:00
|
|
|
def allBut(role: RoleName, roles: immutable.Seq[RoleName] = roles): immutable.Seq[RoleName] = {
|
2012-06-28 14:52:12 +02:00
|
|
|
roles.filterNot(_ == role)
|
|
|
|
|
}
|
|
|
|
|
|
2013-04-17 22:20:51 +02:00
|
|
|
lazy val master = second
|
|
|
|
|
lazy val victim = fourth
|
2012-06-05 15:46:26 +02:00
|
|
|
|
|
|
|
|
var endBarrierNumber = 0
|
2013-03-28 23:45:48 +01:00
|
|
|
def endBarrier(): Unit = {
|
2012-06-05 15:46:26 +02:00
|
|
|
endBarrierNumber += 1
|
2012-06-27 13:54:43 +02:00
|
|
|
enterBarrier("after_" + endBarrierNumber)
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
|
2013-01-04 11:44:12 +01:00
|
|
|
"A cluster of " + roles.size + " members" must {
|
2012-06-05 15:46:26 +02:00
|
|
|
|
|
|
|
|
"reach initial convergence" taggedAs LongRunningTest in {
|
2012-07-04 11:37:56 +02:00
|
|
|
awaitClusterUp(roles: _*)
|
2013-03-28 23:45:48 +01:00
|
|
|
endBarrier()
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
|
2013-03-20 10:32:18 +01:00
|
|
|
"mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in {
|
2012-07-05 09:50:58 +02:00
|
|
|
// let them send at least one heartbeat to each other after the gossip convergence
|
|
|
|
|
// because for new joining nodes we remove them from the failure detector when
|
|
|
|
|
// receive gossip
|
2012-07-25 18:02:45 +02:00
|
|
|
Thread.sleep(2.seconds.dilated.toMillis)
|
2012-07-05 09:50:58 +02:00
|
|
|
|
2012-06-05 15:46:26 +02:00
|
|
|
runOn(first) {
|
|
|
|
|
// pull network for victim node from all nodes
|
2012-07-04 11:37:56 +02:00
|
|
|
allBut(victim).foreach { roleName ⇒
|
2012-06-05 15:46:26 +02:00
|
|
|
testConductor.blackhole(victim, roleName, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-27 13:54:43 +02:00
|
|
|
enterBarrier("unplug_victim")
|
2012-06-05 15:46:26 +02:00
|
|
|
|
2013-04-17 22:20:51 +02:00
|
|
|
val allButVictim = allBut(victim, roles)
|
2012-06-05 15:46:26 +02:00
|
|
|
runOn(victim) {
|
2012-06-28 14:52:12 +02:00
|
|
|
allButVictim.foreach(markNodeAsUnavailable(_))
|
2012-06-05 15:46:26 +02:00
|
|
|
within(30 seconds) {
|
2012-06-07 11:08:23 +02:00
|
|
|
// victim becomes all alone
|
2013-03-24 22:01:57 +01:00
|
|
|
awaitAssert {
|
2012-08-16 18:28:01 +02:00
|
|
|
val members = clusterView.members
|
2013-03-24 22:01:57 +01:00
|
|
|
clusterView.unreachableMembers.size must be(roles.size - 1)
|
|
|
|
|
members.size must be(1)
|
|
|
|
|
members.map(_.status) must be(Set(MemberStatus.Up))
|
|
|
|
|
}
|
2012-08-16 18:28:01 +02:00
|
|
|
clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-04 11:37:56 +02:00
|
|
|
runOn(allButVictim: _*) {
|
2012-06-28 14:52:12 +02:00
|
|
|
markNodeAsUnavailable(victim)
|
2012-06-05 15:46:26 +02:00
|
|
|
within(30 seconds) {
|
|
|
|
|
// victim becomes unreachable
|
2013-03-24 22:01:57 +01:00
|
|
|
awaitAssert {
|
2012-08-16 18:28:01 +02:00
|
|
|
val members = clusterView.members
|
2013-03-24 22:01:57 +01:00
|
|
|
clusterView.unreachableMembers.size must be(1)
|
|
|
|
|
members.size must be(roles.size - 1)
|
|
|
|
|
members.map(_.status) must be(Set(MemberStatus.Up))
|
|
|
|
|
}
|
2012-07-04 11:37:56 +02:00
|
|
|
awaitSeenSameState(allButVictim map address: _*)
|
2012-06-05 15:46:26 +02:00
|
|
|
// still one unreachable
|
2012-08-16 18:28:01 +02:00
|
|
|
clusterView.unreachableMembers.size must be(1)
|
|
|
|
|
clusterView.unreachableMembers.head.address must be(node(victim).address)
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2013-03-28 23:45:48 +01:00
|
|
|
endBarrier()
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
|
2013-03-20 10:32:18 +01:00
|
|
|
"mark the node as DOWN" taggedAs LongRunningTest in {
|
2012-06-05 15:46:26 +02:00
|
|
|
runOn(master) {
|
2012-06-28 14:52:12 +02:00
|
|
|
cluster down victim
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
|
2012-07-04 11:37:56 +02:00
|
|
|
runOn(allBut(victim): _*) {
|
2013-03-05 21:05:11 +01:00
|
|
|
awaitMembersUp(roles.size - 1, Set(victim))
|
2013-03-05 15:32:13 +01:00
|
|
|
// eventually removed
|
2013-03-24 22:01:57 +01:00
|
|
|
awaitAssert(clusterView.unreachableMembers must be(Set.empty), 15 seconds)
|
2012-06-05 15:46:26 +02:00
|
|
|
|
2013-03-20 10:32:18 +01:00
|
|
|
}
|
2013-04-11 09:18:12 +02:00
|
|
|
|
2013-03-28 23:45:48 +01:00
|
|
|
endBarrier()
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
"allow fresh node with same host:port to join again when the network is plugged back in" taggedAs LongRunningTest in {
|
|
|
|
|
val expectedNumberOfMembers = roles.size
|
|
|
|
|
|
|
|
|
|
// victim actor system will be shutdown, not part of testConductor any more
|
|
|
|
|
// so we can't use barriers to synchronize with it
|
|
|
|
|
val masterAddress = address(master)
|
|
|
|
|
runOn(master) {
|
|
|
|
|
system.actorOf(Props(classOf[EndActor], testActor), "end")
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("end-actor-created")
|
|
|
|
|
|
2012-06-05 15:46:26 +02:00
|
|
|
runOn(first) {
|
|
|
|
|
// put the network back in
|
2012-07-04 11:37:56 +02:00
|
|
|
allBut(victim).foreach { roleName ⇒
|
2012-06-05 15:46:26 +02:00
|
|
|
testConductor.passThrough(victim, roleName, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-27 13:54:43 +02:00
|
|
|
enterBarrier("plug_in_victim")
|
2012-06-05 15:46:26 +02:00
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
runOn(first) {
|
|
|
|
|
// will shutdown ActorSystem of victim
|
|
|
|
|
testConductor.removeNode(victim)
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-05 15:46:26 +02:00
|
|
|
runOn(victim) {
|
2013-04-11 09:18:12 +02:00
|
|
|
val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
|
|
|
|
system.shutdown()
|
|
|
|
|
system.awaitTermination(10 seconds)
|
|
|
|
|
// create new ActorSystem with same host:port
|
|
|
|
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
|
|
|
|
akka.remote.netty.tcp {
|
|
|
|
|
hostname = ${victimAddress.host.get}
|
|
|
|
|
port = ${victimAddress.port.get}
|
|
|
|
|
}
|
|
|
|
|
""").withFallback(system.settings.config))
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
Cluster(freshSystem).join(masterAddress)
|
|
|
|
|
Thread.sleep(5000)
|
|
|
|
|
within(15 seconds) {
|
|
|
|
|
awaitAssert(Cluster(freshSystem).readView.members.map(_.address) must contain(victimAddress))
|
|
|
|
|
awaitAssert(Cluster(freshSystem).readView.members.size must be(expectedNumberOfMembers))
|
|
|
|
|
awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up)))
|
|
|
|
|
}
|
|
|
|
|
freshSystem.actorSelection(RootActorPath(master) / "user" / "end") ! "done"
|
|
|
|
|
} finally {
|
|
|
|
|
freshSystem.shutdown()
|
|
|
|
|
freshSystem.awaitTermination(10 seconds)
|
|
|
|
|
}
|
|
|
|
|
// no barrier here, because it is not part of testConductor roles any more
|
2012-06-05 15:46:26 +02:00
|
|
|
}
|
|
|
|
|
|
2013-04-11 09:18:12 +02:00
|
|
|
runOn(allBut(victim): _*) {
|
|
|
|
|
awaitMembersUp(expectedNumberOfMembers)
|
|
|
|
|
// don't end the test until the freshSystem is done
|
|
|
|
|
runOn(master) {
|
|
|
|
|
expectMsg("done")
|
|
|
|
|
}
|
|
|
|
|
endBarrier()
|
|
|
|
|
}
|
2012-06-05 15:46:26 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|