Don't loadExtensions at startup if already terminated, #27840 (#28248)

* Problem could be recreated with DowningProviderSpec when running
  with -Dakka.remote.artery.enabled=off
* The exception from the downing provider caused cluster shutdown
  followed by CoordinatedShutdown and ActorSystem.finalTerminate
* The ActorSystem had still not fully initialized so it tried to
  load the configured extensions and the SystemMaterializer extension
* SystemMaterializer can't create child actor
* The thrown exception isn't wrong but might be confusing
* Skip loading extensions if ActorSystem already terminated
* The reason for different behavior with Artery and classic remoting
  is that Artery inits the SystemMaterializer earlier
This commit is contained in:
Patrik Nordwall 2019-11-26 11:44:37 +01:00 committed by Arnout Engelen
parent 8febed2aed
commit 82db446bb7
2 changed files with 25 additions and 21 deletions

View file

@ -1038,7 +1038,8 @@ private[akka] class ActorSystemImpl(
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener")) logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
eventStream.startUnsubscriber() eventStream.startUnsubscriber()
ManifestInfo(this).checkSameVersion("Akka", allModules, logWarning = true) ManifestInfo(this).checkSameVersion("Akka", allModules, logWarning = true)
loadExtensions() if (!terminating)
loadExtensions()
if (LogConfigOnStart) logConfiguration() if (LogConfigOnStart) logConfiguration()
this this
} catch { } catch {
@ -1052,7 +1053,10 @@ private[akka] class ActorSystemImpl(
def registerOnTermination[T](code: => T): Unit = { registerOnTermination(new Runnable { def run = code }) } def registerOnTermination[T](code: => T): Unit = { registerOnTermination(new Runnable { def run = code }) }
def registerOnTermination(code: Runnable): Unit = { terminationCallbacks.add(code) } def registerOnTermination(code: Runnable): Unit = { terminationCallbacks.add(code) }
@volatile private var terminating = false
override def terminate(): Future[Terminated] = { override def terminate(): Future[Terminated] = {
terminating = true
if (settings.CoordinatedShutdownRunByActorSystemTerminate && !aborting) { if (settings.CoordinatedShutdownRunByActorSystemTerminate && !aborting) {
// Note that the combination CoordinatedShutdownRunByActorSystemTerminate==true && // Note that the combination CoordinatedShutdownRunByActorSystemTerminate==true &&
// CoordinatedShutdownTerminateActorSystem==false is disallowed, checked in Settings. // CoordinatedShutdownTerminateActorSystem==false is disallowed, checked in Settings.
@ -1069,6 +1073,7 @@ private[akka] class ActorSystemImpl(
} }
override private[akka] def finalTerminate(): Unit = { override private[akka] def finalTerminate(): Unit = {
terminating = true
// these actions are idempotent // these actions are idempotent
if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener.foreach(stop) if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener.foreach(stop)
guardian.stop() guardian.stop()
@ -1177,18 +1182,26 @@ private[akka] class ActorSystemImpl(
* @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)
*/ */
def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = { def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = {
def handleException(fqcn: String, problem: Throwable): Unit = {
if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
}
immutableSeq(settings.config.getStringList(key)).foreach { fqcn => immutableSeq(settings.config.getStringList(key)).foreach { fqcn =>
dynamicAccess.getObjectFor[AnyRef](fqcn).recoverWith { dynamicAccess.getObjectFor[AnyRef](fqcn).recoverWith {
case _ => dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) case _ => dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil)
} match { } match {
case Success(p: ExtensionIdProvider) => registerExtension(p.lookup()) case Success(p: ExtensionIdProvider) =>
case Success(p: ExtensionId[_]) => registerExtension(p) try registerExtension(p.lookup())
catch { case NonFatal(e) => handleException(fqcn, e) }
case Success(p: ExtensionId[_]) =>
try registerExtension(p)
catch { case NonFatal(e) => handleException(fqcn, e) }
case Success(_) => case Success(_) =>
if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'") else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'")
case Failure(problem) => case Failure(problem) =>
if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn) handleException(fqcn, problem)
else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
} }
} }
} }

View file

@ -7,7 +7,6 @@ package akka.cluster
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.ConfigurationException import akka.ConfigurationException
import akka.actor.ActorSystem import akka.actor.ActorSystem
@ -76,25 +75,17 @@ class DowningProviderSpec extends WordSpec with Matchers {
} }
"stop the cluster if the downing provider throws exception in props method" in { "stop the cluster if the downing provider throws exception in props method" in {
try { val system = ActorSystem(
val system = ActorSystem( "auto-downing",
"auto-downing", ConfigFactory.parseString("""
ConfigFactory.parseString("""
akka.cluster.downing-provider-class="akka.cluster.FailingDowningProvider" akka.cluster.downing-provider-class="akka.cluster.FailingDowningProvider"
""").withFallback(baseConf)) """).withFallback(baseConf))
val cluster = Cluster(system) val cluster = Cluster(system)
cluster.join(cluster.selfAddress) cluster.join(cluster.selfAddress)
awaitCond(cluster.isTerminated, 3.seconds) awaitCond(cluster.isTerminated, 3.seconds)
shutdownActorSystem(system) shutdownActorSystem(system)
} catch {
case NonFatal(e) if e.getMessage.contains("cannot create children while terminating") =>
// FIXME #27840
// cannot create children while terminating or terminated
// thrown from loadExtension SystemMaterializer
pending
}
} }
} }