diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 6f7fce6cbb..8e85475a7d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -1038,7 +1038,8 @@ private[akka] class ActorSystemImpl( logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener")) eventStream.startUnsubscriber() ManifestInfo(this).checkSameVersion("Akka", allModules, logWarning = true) - loadExtensions() + if (!terminating) + loadExtensions() if (LogConfigOnStart) logConfiguration() this } catch { @@ -1052,7 +1053,10 @@ private[akka] class ActorSystemImpl( def registerOnTermination[T](code: => T): Unit = { registerOnTermination(new Runnable { def run = code }) } def registerOnTermination(code: Runnable): Unit = { terminationCallbacks.add(code) } + @volatile private var terminating = false + override def terminate(): Future[Terminated] = { + terminating = true if (settings.CoordinatedShutdownRunByActorSystemTerminate && !aborting) { // Note that the combination CoordinatedShutdownRunByActorSystemTerminate==true && // CoordinatedShutdownTerminateActorSystem==false is disallowed, checked in Settings. @@ -1069,6 +1073,7 @@ private[akka] class ActorSystemImpl( } override private[akka] def finalTerminate(): Unit = { + terminating = true // these actions are idempotent if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener.foreach(stop) guardian.stop() @@ -1177,18 +1182,26 @@ private[akka] class ActorSystemImpl( * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) */ def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = { + def handleException(fqcn: String, problem: Throwable): Unit = { + if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn) + else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem) + } + immutableSeq(settings.config.getStringList(key)).foreach { fqcn => dynamicAccess.getObjectFor[AnyRef](fqcn).recoverWith { case _ => dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match { - case Success(p: ExtensionIdProvider) => registerExtension(p.lookup()) - case Success(p: ExtensionId[_]) => registerExtension(p) + case Success(p: ExtensionIdProvider) => + try registerExtension(p.lookup()) + catch { case NonFatal(e) => handleException(fqcn, e) } + case Success(p: ExtensionId[_]) => + try registerExtension(p) + catch { case NonFatal(e) => handleException(fqcn, e) } case Success(_) => if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'") case Failure(problem) => - if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn) - else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem) + handleException(fqcn, problem) } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/DowningProviderSpec.scala b/akka-cluster/src/test/scala/akka/cluster/DowningProviderSpec.scala index 29a5688d40..a35b5c83a4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/DowningProviderSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/DowningProviderSpec.scala @@ -7,7 +7,6 @@ package akka.cluster import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.duration._ -import scala.util.control.NonFatal import akka.ConfigurationException import akka.actor.ActorSystem @@ -76,25 +75,17 @@ class DowningProviderSpec extends WordSpec with Matchers { } "stop the cluster if the downing provider throws exception in props method" in { - try { - val system = ActorSystem( - "auto-downing", - ConfigFactory.parseString(""" + val system = ActorSystem( + "auto-downing", + ConfigFactory.parseString(""" akka.cluster.downing-provider-class="akka.cluster.FailingDowningProvider" """).withFallback(baseConf)) - val cluster = Cluster(system) - cluster.join(cluster.selfAddress) + val cluster = Cluster(system) + cluster.join(cluster.selfAddress) - awaitCond(cluster.isTerminated, 3.seconds) - shutdownActorSystem(system) - } catch { - case NonFatal(e) if e.getMessage.contains("cannot create children while terminating") => - // FIXME #27840 - // cannot create children while terminating or terminated - // thrown from loadExtension SystemMaterializer - pending - } + awaitCond(cluster.isTerminated, 3.seconds) + shutdownActorSystem(system) } }