fix shutdown race in sendControl, #21514 (#21517)

* fix shutdown race in sendControl, #21514

* the stack trace showed IllegalStateException: outboundControlIngress not initialized yet
  via the call to sendControl
* that could happen if there is a shutdown at the same time, which is exactly what the test does
* it was actually caused by a merge mistake, but now it got even better

* countDown latch on shutdown
This commit is contained in:
Patrik Nordwall 2016-09-22 11:07:17 +02:00 committed by GitHub
parent 1d3661b556
commit 455d6a45cc
3 changed files with 18 additions and 5 deletions

View file

@ -59,6 +59,10 @@ abstract class MultiNodeConfig {
log-received-messages = on
log-sent-messages = on
}
akka.remote.artery {
log-received-messages = on
log-sent-messages = on
}
akka.actor.debug {
receive = on
fsm = on

View file

@ -426,6 +426,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private lazy val shutdownHook = new Thread {
override def run(): Unit = {
if (hasBeenShutdown.compareAndSet(false, true)) {
log.debug("Shutting down [{}] via shutdownHook", localAddress)
Await.result(internalShutdown(), 20.seconds)
}
}

View file

@ -191,7 +191,9 @@ private[remote] class Association(
materializing.await(10, TimeUnit.SECONDS)
_outboundControlIngress match {
case OptionVal.Some(o) o
case OptionVal.None throw new IllegalStateException("outboundControlIngress not initialized yet")
case OptionVal.None
if (transport.isShutdown) throw ShuttingDown
else throw new IllegalStateException("outboundControlIngress not initialized yet")
}
}
}
@ -265,13 +267,14 @@ private[remote] class Association(
// OutboundContext
override def sendControl(message: ControlMessage): Unit = {
try {
if (!transport.isShutdown)
if (!transport.isShutdown) {
if (associationState.isQuarantined()) {
log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message),
remoteAddress)
startIdleTimer()
}
outboundControlIngress.sendControlMessage(message)
outboundControlIngress.sendControlMessage(message)
}
} catch {
case ShuttingDown // silence it
}
@ -624,11 +627,16 @@ private[remote] class Association(
implicit val ec = materializer.executionContext
updateStreamCompletion(streamName, (streamKillSwitch, streamCompleted.recover { case _ Done }))
streamCompleted.onFailure {
case ArteryTransport.ShutdownSignal // shutdown as expected
case _: AeronTerminated // shutdown already in progress
case ArteryTransport.ShutdownSignal
// shutdown as expected
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case _: AeronTerminated // shutdown already in progress
case cause if transport.isShutdown
// don't restart after shutdown, but log some details so we notice
log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case _: AbruptTerminationException // ActorSystem shutdown
case OutboundStreamStopSignal
// stop as expected due to quarantine