diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 3264c661b0..8e0f781ceb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -203,6 +203,10 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu nodesInCluster.sorted.head } + def clusterSortedRoles(nodesInCluster: Seq[RoleName]): Seq[RoleName] = { + nodesInCluster.sorted + } + /** * Sort the roles in the order used by the cluster. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala new file mode 100644 index 0000000000..6ce00687bf --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import akka.actor.Address +import akka.remote.testconductor.{RoleName, Direction} +import akka.util.duration._ + +object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + val allRoles = Seq(first, second, third, fourth) + + def allBut(role: RoleName, roles: Seq[RoleName] = allRoles): Seq[RoleName] = { + roles.filter(_ != role) + } + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + failure-detector.threshold = 5 + } """) + ).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class UnreachableNodeRejoinsClusterMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec +class UnreachableNodeRejoinsClusterMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec +class UnreachableNodeRejoinsClusterMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec +class UnreachableNodeRejoinsClusterMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec + +class UnreachableNodeRejoinsClusterSpec + extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { + import UnreachableNodeRejoinsClusterMultiJvmSpec._ + + override def initialParticipants = allRoles.size + + val sortedRoles = clusterSortedRoles(allRoles) + val master = sortedRoles(0) + val victim = sortedRoles(1) + + var endBarrierNumber = 0 + def endBarrier = { + endBarrierNumber += 1 + testConductor.enter("after_" + endBarrierNumber) + } + + "A cluster of " + allRoles.size + " members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + runOn(master) { + cluster.self + awaitUpConvergence(numberOfMembers = allRoles.size) + } + + runOn(allBut(master):_*) { + cluster.join(node(master).address) + awaitUpConvergence(numberOfMembers = allRoles.size) + } + + endBarrier + } + + "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in { + runOn(first) { + // pull network for victim node from all nodes + allBut(victim).foreach { roleName => + testConductor.blackhole(victim, roleName, Direction.Both).await + } + testConductor.enter("unplug_victim") + } + + runOn(allBut(first):_*) { + testConductor.enter("unplug_victim") + } + + runOn(victim) { + val otherAddresses = sortedRoles.filter(_ != victim).map(node(_).address) + within(30 seconds) { + awaitCond(cluster.latestGossip.overview.unreachable.size == (allRoles.size - 1)) + awaitCond(cluster.latestGossip.members.size == 1) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + cluster.latestGossip.overview.unreachable.map(_.address) must be(otherAddresses.toSet) + cluster.convergence.isDefined must be(false) + } + } + + val allButVictim = allBut(victim) + runOn(allButVictim: _*) { + val victimAddress = node(victim).address + val otherAddresses = allButVictim.map(node(_).address) + within(30 seconds) { + // victim becomes unreachable + awaitCond(cluster.latestGossip.overview.unreachable.size == 1) + awaitCond(cluster.latestGossip.members.size == (allRoles.size - 1)) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitSeenSameState(otherAddresses) + // still one unreachable + cluster.latestGossip.overview.unreachable.size must be(1) + cluster.latestGossip.overview.unreachable.head.address must be(victimAddress) + // and therefore no convergence + cluster.convergence.isDefined must be(false) + } + } + + endBarrier + } + + "mark the node as DOWN" taggedAs LongRunningTest in { + val victimAddress = node(victim).address + runOn(master) { + cluster.down(victimAddress) + } + + runOn(allBut(victim):_*) { + awaitUpConvergence(allRoles.size - 1, Seq(victimAddress)) + } + + endBarrier + } + + "allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in { + runOn(first) { + // put the network back in + allBut(victim).foreach { roleName => + testConductor.passThrough(victim, roleName, Direction.Both).await + } + testConductor.enter("plug_in_victim") + } + + runOn(allBut(first):_*) { + testConductor.enter("plug_in_victim") + } + + runOn(victim) { + cluster.join(node(master).address) + } + + awaitUpConvergence(allRoles.size) + + endBarrier + } + } +} diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index eba0fffe63..24377d54a1 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -139,6 +139,19 @@ trait Conductor { this: TestConductorExt ⇒ controller ? Throttle(node, target, direction, 0f) mapTo } + /** + * Switch the Netty pipeline of the remote support into pass through mode for + * sending and/or receiving. + * + * @param node is the symbolic name of the node which is to be affected + * @param target is the symbolic name of the other node to which connectivity shall be impeded + * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` + */ + def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { + import Settings.QueryTimeout + controller ? Throttle(node, target, direction, -1f) mapTo + } + /** * Tell the remote support to shutdown the connection to the given remote * peer. It works regardless of whether the recipient was initiator or