From edf1c83839d0e3803ab2f0c64403a428b908d02e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 08:27:33 +0200 Subject: [PATCH] abort streams on shutdown, #21388 * otherwise AeronSink will continue sending outstanding messages before completing * this was noticed by RemoteDeathWatchSpec couldn't shutdown, since it was trying to send to unknown --- .../main/scala/akka/remote/artery/ArteryTransport.scala | 8 +++++++- .../src/main/scala/akka/remote/artery/Association.scala | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index ba5fd1f4b1..bc8d27df53 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -69,6 +69,7 @@ import org.agrona.ErrorHandler import org.agrona.IoUtil import org.agrona.concurrent.BackoffIdleStrategy import akka.stream.scaladsl.BroadcastHub +import scala.util.control.NoStackTrace /** * INTERNAL API @@ -242,6 +243,7 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati val timeoutTask = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher) override def preStart(): Unit = { + // FIXME shall we also try to flush the ordinary message stream, not only control stream? val msg = ActorSystemTerminating(inboundContext.localAddress) associations.foreach { a ⇒ a.send(msg, OptionVal.Some(self), OptionVal.None) } } @@ -268,6 +270,7 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with InboundContext { import FlightRecorderEvents._ + import ArteryTransport.ShutdownSignal // these vars are initialized once in the start method @volatile private[this] var _localAddress: UniqueAddress = _ @@ -666,6 +669,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R implicit val ec = materializer.executionContext updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done }) streamCompleted.onFailure { + case ShutdownSignal ⇒ // shutdown as expected case cause if isShutdown ⇒ // don't restart after shutdown, but log some details so we notice log.error(cause, s"{} failed after shutdown. {}", streamName, cause.getMessage) @@ -697,7 +701,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R for { _ ← flushing.recover { case _ ⇒ Done } - _ = killSwitch.shutdown() + _ = killSwitch.abort(ShutdownSignal) _ ← streamsCompleted } yield { topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) @@ -966,4 +970,6 @@ private[remote] object ArteryTransport { port } + object ShutdownSignal extends RuntimeException with NoStackTrace + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index cd361840d0..0456ceb62b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -537,6 +537,7 @@ private[remote] class Association( implicit val ec = materializer.executionContext updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done }) streamCompleted.onFailure { + case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected case cause if transport.isShutdown ⇒ // don't restart after shutdown, but log some details so we notice log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)