=clu #15439 Harden SurviveNetworkInstabilitySpec
* The problem was that the sys msg buffer was filled up during the deploy phase and triggered quarantine too early and therefore the "hello" reply was lost. The "hello" ping-pong was not good enough for deploying one-by-one. (cherry picked from commit f729afe1fa5401e562655e5a0aaab3f9789e4df6) Conflicts: akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala
This commit is contained in:
parent
953a316563
commit
8e6d81242f
1 changed files with 34 additions and 56 deletions
|
|
@ -21,6 +21,7 @@ import akka.remote.RemoteActorRefProvider
|
|||
import akka.actor.ActorRef
|
||||
import akka.dispatch.sysmsg.Failed
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Terminated
|
||||
|
||||
object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -33,43 +34,30 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
|
|||
val eighth = role("eighth")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(
|
||||
ConfigFactory.parseString("akka.remote.system-message-buffer-size=20")).
|
||||
ConfigFactory.parseString("akka.remote.system-message-buffer-size=100")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
testTransport(on = true)
|
||||
|
||||
deployOn(second, """"/parent/*" {
|
||||
remote = "@third@"
|
||||
}""")
|
||||
|
||||
class Parent extends Actor {
|
||||
def receive = {
|
||||
case p: Props ⇒ sender() ! context.actorOf(p)
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteChild extends Actor {
|
||||
import context.dispatcher
|
||||
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
context.actorSelection("/user/bad") ! self
|
||||
sender() ! "hello"
|
||||
case "boom" ⇒ throw new SimulatedException
|
||||
}
|
||||
}
|
||||
|
||||
class BadGuy extends Actor {
|
||||
var victims = Vector.empty[ActorRef]
|
||||
def receive = {
|
||||
case ref: ActorRef ⇒ victims :+= ref
|
||||
case "boom" ⇒ victims foreach { _ ! "boom" }
|
||||
}
|
||||
}
|
||||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case m ⇒ sender() ! m
|
||||
case m ⇒ sender ! m
|
||||
}
|
||||
}
|
||||
|
||||
case class Targets(refs: Set[ActorRef])
|
||||
case object TargetsRegistered
|
||||
|
||||
class Watcher extends Actor {
|
||||
var targets = Set.empty[ActorRef]
|
||||
|
||||
def receive = {
|
||||
case Targets(refs) ⇒
|
||||
targets = refs
|
||||
sender() ! TargetsRegistered
|
||||
case "boom" ⇒
|
||||
targets.foreach(context.watch)
|
||||
case Terminated(_) ⇒
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -103,7 +91,6 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
}
|
||||
|
||||
system.actorOf(Props[Echo], "echo")
|
||||
val bad = system.actorOf(Props[BadGuy], "bad")
|
||||
|
||||
def assertCanTalk(alive: RoleName*): Unit = {
|
||||
runOn(alive: _*) {
|
||||
|
|
@ -265,31 +252,23 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
"down and remove quarantined node" taggedAs LongRunningTest in within(60.seconds) {
|
||||
val others = Vector(first, third, fourth, fifth, sixth, seventh)
|
||||
|
||||
runOn(third) {
|
||||
system.actorOf(Props[Watcher], "watcher")
|
||||
|
||||
// undelivered system messages in RemoteChild on third should trigger QuarantinedEvent
|
||||
system.eventStream.subscribe(testActor, classOf[QuarantinedEvent])
|
||||
}
|
||||
enterBarrier("watcher-created")
|
||||
|
||||
runOn(second) {
|
||||
val sysMsgBufferSize = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].
|
||||
remoteSettings.SysMsgBufferSize
|
||||
val parent = system.actorOf(Props[Parent], "parent")
|
||||
// fill up the system message redeliver buffer with many failing actors
|
||||
for (_ ← 1 to sysMsgBufferSize + 1) {
|
||||
// remote deployment to third
|
||||
parent ! Props[RemoteChild]
|
||||
val child = receiveOne(remainingOrDefault) match {
|
||||
case a: ActorRef ⇒ a
|
||||
case other ⇒ fail(s"expected ActorRef, got $other")
|
||||
}
|
||||
child ! "hello"
|
||||
expectMsg("hello")
|
||||
lastSender.path.address should be(address(third))
|
||||
}
|
||||
val refs = Vector.fill(sysMsgBufferSize + 1)(system.actorOf(Props[Echo])).toSet
|
||||
system.actorSelection(node(third) / "user" / "watcher") ! Targets(refs)
|
||||
expectMsg(TargetsRegistered)
|
||||
}
|
||||
runOn(third) {
|
||||
// undelivered system messages in RemoteChild on third should trigger QuarantinedEvent
|
||||
system.eventStream.subscribe(testActor, classOf[QuarantinedEvent])
|
||||
|
||||
// after quarantined it will drop the Failed messages to deadLetters
|
||||
muteDeadLetters(classOf[Failed])(system)
|
||||
}
|
||||
enterBarrier("children-deployed")
|
||||
enterBarrier("targets-registered")
|
||||
|
||||
runOn(first) {
|
||||
for (role ← others)
|
||||
|
|
@ -298,10 +277,9 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
enterBarrier("blackhole-6")
|
||||
|
||||
runOn(third) {
|
||||
// this will trigger Exception in RemoteChild on third, and the failures
|
||||
// can't be reported to parent on second, resulting in too many outstanding
|
||||
// this will trigger watch of targets on second, resulting in too many outstanding
|
||||
// system messages and quarantine
|
||||
bad ! "boom"
|
||||
system.actorSelection("/user/watcher") ! "boom"
|
||||
within(10.seconds) {
|
||||
expectMsgType[QuarantinedEvent].address should be(address(second))
|
||||
}
|
||||
|
|
@ -383,4 +361,4 @@ abstract class SurviveNetworkInstabilitySpec
|
|||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue