From ba3587de64995433f0c50e0b051b906b9ae554ac Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 20 Jan 2014 11:01:07 +0100 Subject: [PATCH] =clu #3826 Harden cluster.StressSpec * The Identify message didn't get through to the master, which was stopping at the same time, and it didn't got redirected to deadletters, i.e. the "termination race" --- .../scala/akka/cluster/StressSpec.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index d78b01928f..fb54470109 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -1027,13 +1027,13 @@ abstract class StressSpec runOn(masterRoles: _*) { reportResult { val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local), - name = "master-" + myself.name) + name = masterName) m ! Begin import system.dispatcher system.scheduler.scheduleOnce(duration) { m.tell(End, testActor) } - val workResult = awaitWorkResult + val workResult = awaitWorkResult(m) workResult.sendCount should be > (0L) workResult.ackCount should be > (0L) if (!expectDroppedMessages) @@ -1051,18 +1051,14 @@ abstract class StressSpec awaitClusterResult() } - def awaitWorkResult: WorkResult = { + def awaitWorkResult(m: ActorRef): WorkResult = { val workResult = expectMsgType[WorkResult] if (settings.infolog) log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName, workResult.jobsPerSecond.form, workResult.retryCount, workResult.sendCount) - master match { - case Some(m) ⇒ - watch(m) - expectMsgPF(remaining) { case Terminated(a) if a.path == m.path ⇒ true } - case None ⇒ // ok, already terminated - } + watch(m) + expectTerminated(m) workResult } @@ -1159,7 +1155,7 @@ abstract class StressSpec "start routers that are running while nodes are joining" taggedAs LongRunningTest in { runOn(roles.take(3): _*) { system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), - name = "master-" + myself.name) ! Begin + name = masterName) ! Begin } } @@ -1190,13 +1186,15 @@ abstract class StressSpec "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { if (exerciseActors) { runOn(roles.take(3): _*) { - val m = master - m should not be (None) - m.get.tell(End, testActor) - val workResult = awaitWorkResult - workResult.retryCount should be(0) - workResult.sendCount should be > (0L) - workResult.ackCount should be > (0L) + master match { + case Some(m) ⇒ + m.tell(End, testActor) + val workResult = awaitWorkResult(m) + workResult.retryCount should be(0) + workResult.sendCount should be > (0L) + workResult.ackCount should be > (0L) + case None ⇒ fail("master not running") + } } } enterBarrier("after-" + step) @@ -1255,7 +1253,7 @@ abstract class StressSpec if (exerciseActors) { runOn(roles.take(3): _*) { system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), - name = "master-" + myself.name) ! Begin + name = masterName) ! Begin } } enterBarrier("after-" + step) @@ -1296,12 +1294,14 @@ abstract class StressSpec "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { if (exerciseActors) { runOn(roles.take(3): _*) { - val m = master - m should not be (None) - m.get.tell(End, testActor) - val workResult = awaitWorkResult - workResult.sendCount should be > (0L) - workResult.ackCount should be > (0L) + master match { + case Some(m) ⇒ + m.tell(End, testActor) + val workResult = awaitWorkResult(m) + workResult.sendCount should be > (0L) + workResult.ackCount should be > (0L) + case None ⇒ fail("master not running") + } } } enterBarrier("after-" + step)