Merge pull request #21390 from akka/wip-21388-shutdown-killSwitch-patriknw
abort streams on shutdown, #21388
This commit is contained in:
commit
72cf185686
2 changed files with 8 additions and 1 deletions
|
|
@ -69,6 +69,7 @@ import org.agrona.ErrorHandler
|
|||
import org.agrona.IoUtil
|
||||
import org.agrona.concurrent.BackoffIdleStrategy
|
||||
import akka.stream.scaladsl.BroadcastHub
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -242,6 +243,7 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati
|
|||
val timeoutTask = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
|
||||
|
||||
override def preStart(): Unit = {
|
||||
// FIXME shall we also try to flush the ordinary message stream, not only control stream?
|
||||
val msg = ActorSystemTerminating(inboundContext.localAddress)
|
||||
associations.foreach { a ⇒ a.send(msg, OptionVal.Some(self), OptionVal.None) }
|
||||
}
|
||||
|
|
@ -268,6 +270,7 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati
|
|||
private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
|
||||
extends RemoteTransport(_system, _provider) with InboundContext {
|
||||
import FlightRecorderEvents._
|
||||
import ArteryTransport.ShutdownSignal
|
||||
|
||||
// these vars are initialized once in the start method
|
||||
@volatile private[this] var _localAddress: UniqueAddress = _
|
||||
|
|
@ -666,6 +669,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
implicit val ec = materializer.executionContext
|
||||
updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done })
|
||||
streamCompleted.onFailure {
|
||||
case ShutdownSignal ⇒ // shutdown as expected
|
||||
case cause if isShutdown ⇒
|
||||
// don't restart after shutdown, but log some details so we notice
|
||||
log.error(cause, s"{} failed after shutdown. {}", streamName, cause.getMessage)
|
||||
|
|
@ -697,7 +701,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
|
||||
for {
|
||||
_ ← flushing.recover { case _ ⇒ Done }
|
||||
_ = killSwitch.shutdown()
|
||||
_ = killSwitch.abort(ShutdownSignal)
|
||||
_ ← streamsCompleted
|
||||
} yield {
|
||||
topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData)
|
||||
|
|
@ -966,4 +970,6 @@ private[remote] object ArteryTransport {
|
|||
port
|
||||
}
|
||||
|
||||
object ShutdownSignal extends RuntimeException with NoStackTrace
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -537,6 +537,7 @@ private[remote] class Association(
|
|||
implicit val ec = materializer.executionContext
|
||||
updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done })
|
||||
streamCompleted.onFailure {
|
||||
case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected
|
||||
case cause if transport.isShutdown ⇒
|
||||
// don't restart after shutdown, but log some details so we notice
|
||||
log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue