fix shutdown of pending StressSpec, #21960 (#21963)

This commit is contained in:
Patrik Nordwall 2016-12-07 15:38:11 +01:00 committed by Konrad Malawski
parent 12536a3537
commit dce668771e
5 changed files with 26 additions and 15 deletions

View file

@ -107,10 +107,8 @@ abstract class NodeChurnSpec
def isArteryEnabled: Boolean = RARP(system).provider.remoteSettings.Artery.Enabled def isArteryEnabled: Boolean = RARP(system).provider.remoteSettings.Artery.Enabled
// FIXME issue #21483
if (isArteryEnabled) pending
"Cluster with short lived members" must { "Cluster with short lived members" must {
"setup stable nodes" taggedAs LongRunningTest in within(15.seconds) { "setup stable nodes" taggedAs LongRunningTest in within(15.seconds) {
val logListener = system.actorOf(Props(classOf[LogListener], testActor), "logListener") val logListener = system.actorOf(Props(classOf[LogListener], testActor), "logListener")
system.eventStream.subscribe(logListener, classOf[Info]) system.eventStream.subscribe(logListener, classOf[Info])
@ -119,6 +117,10 @@ abstract class NodeChurnSpec
enterBarrier("stable") enterBarrier("stable")
} }
// FIXME issue #21483
// note: there must be one test step before pending, otherwise afterTermination will not run
if (isArteryEnabled) pending
"join and remove transient nodes without growing gossip payload" taggedAs LongRunningTest in { "join and remove transient nodes without growing gossip payload" taggedAs LongRunningTest in {
// This test is configured with log-frame-size-exceeding and the LogListener // This test is configured with log-frame-size-exceeding and the LogListener
// will send to the testActor if unexpected increase in message payload size. // will send to the testActor if unexpected increase in message payload size.

View file

@ -1153,8 +1153,6 @@ abstract class StressSpec
"A cluster under stress" must { "A cluster under stress" must {
if (isArteryEnabled) pending
"log settings" taggedAs LongRunningTest in { "log settings" taggedAs LongRunningTest in {
if (infolog) { if (infolog) {
log.info("StressSpec JVM:\n{}", jvmInfo) log.info("StressSpec JVM:\n{}", jvmInfo)
@ -1165,6 +1163,10 @@ abstract class StressSpec
enterBarrier("after-" + step) enterBarrier("after-" + step)
} }
// FIXME issue #21810
// note: there must be one test step before pending, otherwise afterTermination will not run
if (isArteryEnabled) pending
"join seed nodes" taggedAs LongRunningTest in within(30 seconds) { "join seed nodes" taggedAs LongRunningTest in within(30 seconds) {
val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially)
@ -1349,5 +1351,4 @@ abstract class StressSpec
enterBarrier("after-" + step) enterBarrier("after-" + step)
} }
} }
} }

View file

@ -23,6 +23,8 @@ import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.hints.ThreadHints import org.agrona.hints.ThreadHints
import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.GraphStageWithMaterializedValue
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.stream.stage.StageLogging
import io.aeron.exceptions.DriverTimeoutException
/** /**
* INTERNAL API * INTERNAL API
@ -86,7 +88,7 @@ private[remote] class AeronSource(
override val shape: SourceShape[EnvelopeBuffer] = SourceShape(out) override val shape: SourceShape[EnvelopeBuffer] = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val logic = new GraphStageLogic(shape) with OutHandler with ResourceLifecycle { val logic = new GraphStageLogic(shape) with OutHandler with ResourceLifecycle with StageLogging {
private val sub = aeron.addSubscription(channel, streamId) private val sub = aeron.addSubscription(channel, streamId)
// spin between 100 to 10000 depending on idleCpuLevel // spin between 100 to 10000 depending on idleCpuLevel
@ -109,13 +111,19 @@ private[remote] class AeronSource(
freeSessionBuffers() freeSessionBuffers()
} }
override protected def logSource = classOf[AeronSource]
override def preStart(): Unit = { override def preStart(): Unit = {
flightRecorder.loFreq(AeronSource_Started, channelMetadata) flightRecorder.loFreq(AeronSource_Started, channelMetadata)
} }
override def postStop(): Unit = { override def postStop(): Unit = {
sub.close()
taskRunner.command(Remove(addPollTask.task)) taskRunner.command(Remove(addPollTask.task))
try sub.close() catch {
case e: DriverTimeoutException
// media driver was shutdown
log.debug("DriverTimeout when closing subscription. {}", e.getMessage)
} finally
flightRecorder.loFreq(AeronSource_Stopped, channelMetadata) flightRecorder.loFreq(AeronSource_Stopped, channelMetadata)
} }

View file

@ -443,7 +443,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def run(): Unit = { override def run(): Unit = {
if (hasBeenShutdown.compareAndSet(false, true)) { if (hasBeenShutdown.compareAndSet(false, true)) {
log.debug("Shutting down [{}] via shutdownHook", localAddress) log.debug("Shutting down [{}] via shutdownHook", localAddress)
Await.result(internalShutdown(), 20.seconds) Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds)
} }
} }
} }