From dce668771ed4f60bedd201df307150a9645997f5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Dec 2016 15:38:11 +0100 Subject: [PATCH] fix shutdown of pending StressSpec, #21960 (#21963) --- .../scala/akka/cluster/NodeChurnSpec.scala | 8 +++++--- .../multi-jvm/scala/akka/cluster/StressSpec.scala | 7 ++++--- .../scala/akka/remote/artery/AeronSource.scala | 14 +++++++++++--- .../scala/akka/remote/artery/ArteryTransport.scala | 2 +- .../src/main/scala/akka/stream/Materializer.scala | 10 +++++----- 5 files changed, 26 insertions(+), 15 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala index f4af16fc2c..07de9cb29a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala @@ -107,10 +107,8 @@ abstract class NodeChurnSpec def isArteryEnabled: Boolean = RARP(system).provider.remoteSettings.Artery.Enabled - // FIXME issue #21483 - if (isArteryEnabled) pending - "Cluster with short lived members" must { + "setup stable nodes" taggedAs LongRunningTest in within(15.seconds) { val logListener = system.actorOf(Props(classOf[LogListener], testActor), "logListener") system.eventStream.subscribe(logListener, classOf[Info]) @@ -119,6 +117,10 @@ abstract class NodeChurnSpec enterBarrier("stable") } + // FIXME issue #21483 + // note: there must be one test step before pending, otherwise afterTermination will not run + if (isArteryEnabled) pending + "join and remove transient nodes without growing gossip payload" taggedAs LongRunningTest in { // This test is configured with log-frame-size-exceeding and the LogListener // will send to the testActor if unexpected increase in message payload size. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 111eb231e3..09309656a1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -1153,8 +1153,6 @@ abstract class StressSpec "A cluster under stress" must { - if (isArteryEnabled) pending - "log settings" taggedAs LongRunningTest in { if (infolog) { log.info("StressSpec JVM:\n{}", jvmInfo) @@ -1165,6 +1163,10 @@ abstract class StressSpec enterBarrier("after-" + step) } + // FIXME issue #21810 + // note: there must be one test step before pending, otherwise afterTermination will not run + if (isArteryEnabled) pending + "join seed nodes" taggedAs LongRunningTest in within(30 seconds) { val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) @@ -1349,5 +1351,4 @@ abstract class StressSpec enterBarrier("after-" + step) } } - } diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index 931998a8f1..befc797868 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -23,6 +23,8 @@ import org.agrona.concurrent.BackoffIdleStrategy import org.agrona.hints.ThreadHints import akka.stream.stage.GraphStageWithMaterializedValue import scala.util.control.NonFatal +import akka.stream.stage.StageLogging +import io.aeron.exceptions.DriverTimeoutException /** * INTERNAL API @@ -86,7 +88,7 @@ private[remote] class AeronSource( override val shape: SourceShape[EnvelopeBuffer] = SourceShape(out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val logic = new GraphStageLogic(shape) with OutHandler with ResourceLifecycle { + val logic = new GraphStageLogic(shape) with OutHandler with ResourceLifecycle with StageLogging { private val sub = aeron.addSubscription(channel, streamId) // spin between 100 to 10000 depending on idleCpuLevel @@ -109,14 +111,20 @@ private[remote] class AeronSource( freeSessionBuffers() } + override protected def logSource = classOf[AeronSource] + override def preStart(): Unit = { flightRecorder.loFreq(AeronSource_Started, channelMetadata) } override def postStop(): Unit = { - sub.close() taskRunner.command(Remove(addPollTask.task)) - flightRecorder.loFreq(AeronSource_Stopped, channelMetadata) + try sub.close() catch { + case e: DriverTimeoutException ⇒ + // media driver was shutdown + log.debug("DriverTimeout when closing subscription. {}", e.getMessage) + } finally + flightRecorder.loFreq(AeronSource_Stopped, channelMetadata) } // OutHandler diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 21470a31eb..0ad8b9abfa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -443,7 +443,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def run(): Unit = { if (hasBeenShutdown.compareAndSet(false, true)) { log.debug("Shutting down [{}] via shutdownHook", localAddress) - Await.result(internalShutdown(), 20.seconds) + Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds) } } } diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index d172023d21..36c9e7df5f 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -9,16 +9,16 @@ import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration -/** - * Materializer SPI (Service Provider Interface) - * +/** + * Materializer SPI (Service Provider Interface) + * * Binary compatibility is NOT guaranteed on materializer internals. - * + * * Custom materializer implementations should be aware that the materializer SPI * is not yet final and may change in patch releases of Akka. Please note that this * does not impact end-users of Akka streams, only implementors of custom materializers, * with whom the Akka team co-ordinates such changes. - * + * * Once the SPI is final this notice will be removed. */ abstract class Materializer {