=clu #3826 Harden cluster.StressSpec
* The Identify message didn't get through to the master, which was stopping at the same time, and it didn't got redirected to deadletters, i.e. the "termination race"
This commit is contained in:
parent
34f9f4cfb9
commit
ba3587de64
1 changed files with 24 additions and 24 deletions
|
|
@ -1027,13 +1027,13 @@ abstract class StressSpec
|
|||
runOn(masterRoles: _*) {
|
||||
reportResult {
|
||||
val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local),
|
||||
name = "master-" + myself.name)
|
||||
name = masterName)
|
||||
m ! Begin
|
||||
import system.dispatcher
|
||||
system.scheduler.scheduleOnce(duration) {
|
||||
m.tell(End, testActor)
|
||||
}
|
||||
val workResult = awaitWorkResult
|
||||
val workResult = awaitWorkResult(m)
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
if (!expectDroppedMessages)
|
||||
|
|
@ -1051,18 +1051,14 @@ abstract class StressSpec
|
|||
awaitClusterResult()
|
||||
}
|
||||
|
||||
def awaitWorkResult: WorkResult = {
|
||||
def awaitWorkResult(m: ActorRef): WorkResult = {
|
||||
val workResult = expectMsgType[WorkResult]
|
||||
if (settings.infolog)
|
||||
log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName,
|
||||
workResult.jobsPerSecond.form,
|
||||
workResult.retryCount, workResult.sendCount)
|
||||
master match {
|
||||
case Some(m) ⇒
|
||||
watch(m)
|
||||
expectMsgPF(remaining) { case Terminated(a) if a.path == m.path ⇒ true }
|
||||
case None ⇒ // ok, already terminated
|
||||
}
|
||||
watch(m)
|
||||
expectTerminated(m)
|
||||
workResult
|
||||
}
|
||||
|
||||
|
|
@ -1159,7 +1155,7 @@ abstract class StressSpec
|
|||
"start routers that are running while nodes are joining" taggedAs LongRunningTest in {
|
||||
runOn(roles.take(3): _*) {
|
||||
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
|
||||
name = "master-" + myself.name) ! Begin
|
||||
name = masterName) ! Begin
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1190,13 +1186,15 @@ abstract class StressSpec
|
|||
"end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) {
|
||||
if (exerciseActors) {
|
||||
runOn(roles.take(3): _*) {
|
||||
val m = master
|
||||
m should not be (None)
|
||||
m.get.tell(End, testActor)
|
||||
val workResult = awaitWorkResult
|
||||
workResult.retryCount should be(0)
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
master match {
|
||||
case Some(m) ⇒
|
||||
m.tell(End, testActor)
|
||||
val workResult = awaitWorkResult(m)
|
||||
workResult.retryCount should be(0)
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
case None ⇒ fail("master not running")
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
|
|
@ -1255,7 +1253,7 @@ abstract class StressSpec
|
|||
if (exerciseActors) {
|
||||
runOn(roles.take(3): _*) {
|
||||
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
|
||||
name = "master-" + myself.name) ! Begin
|
||||
name = masterName) ! Begin
|
||||
}
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
|
|
@ -1296,12 +1294,14 @@ abstract class StressSpec
|
|||
"end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) {
|
||||
if (exerciseActors) {
|
||||
runOn(roles.take(3): _*) {
|
||||
val m = master
|
||||
m should not be (None)
|
||||
m.get.tell(End, testActor)
|
||||
val workResult = awaitWorkResult
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
master match {
|
||||
case Some(m) ⇒
|
||||
m.tell(End, testActor)
|
||||
val workResult = awaitWorkResult(m)
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
case None ⇒ fail("master not running")
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue