From 737a50ebf3b3c70b579abbf1735a354b220a3122 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Aug 2015 08:58:52 +0200 Subject: [PATCH] =clu #17253 Improve cluster startup thread usage When using a dispatcher (default or separate cluster dispatcher) with less than 5 threads the Cluster extension initialization could deadlock. It was reproducable by adding a sleep before the Await of GetClusterCoreRef in the Cluster extension constructor. The reason was that other cluster actors were started too early and they also tried to get the Cluster extension and thereby blocking dispatcher threads. Note that the Cluster extension is started via ClusterActorRefProvider before ActorSystem.apply returns. The improvement is to start the cluster child actors lazily when the GetClusterCoreRef is received. --- .../scala/akka/cluster/ClusterDaemon.scala | 43 +++++++--- .../scala/akka/cluster/ClusterHeartbeat.scala | 4 +- .../cluster/StartupWithOneThreadSpec.scala | 80 +++++++++++++++++++ project/MiMa.scala | 7 +- 4 files changed, 120 insertions(+), 14 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 8a84e7644c..01a3cc2bea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -156,16 +156,24 @@ private[cluster] object InternalClusterAction { private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ - // Important - don't use Cluster(context.system) here because that would + // Important - don't use Cluster(context.system) in constructor because that would // cause deadlock. The Cluster extension is currently being created and is waiting // for response from GetClusterCoreRef in its constructor. - val coreSupervisor = context.actorOf(Props[ClusterCoreSupervisor]. - withDispatcher(context.props.dispatcher), name = "core") - context.actorOf(Props[ClusterHeartbeatReceiver]. - withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") + // Child actors are therefore created when GetClusterCoreRef is received + var coreSupervisor: Option[ActorRef] = None + + def createChildren(): Unit = { + coreSupervisor = Some(context.actorOf(Props[ClusterCoreSupervisor]. + withDispatcher(context.props.dispatcher), name = "core")) + context.actorOf(Props[ClusterHeartbeatReceiver]. + withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") + } def receive = { - case msg: GetClusterCoreRef.type ⇒ coreSupervisor forward msg + case msg: GetClusterCoreRef.type ⇒ + if (coreSupervisor.isEmpty) + createChildren() + coreSupervisor.foreach(_ forward msg) case AddOnMemberUpListener(code) ⇒ context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local)) case AddOnMemberRemovedListener(code) ⇒ @@ -191,12 +199,20 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ - val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. - withDispatcher(context.props.dispatcher), name = "publisher") - val coreDaemon = context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher). - withDispatcher(context.props.dispatcher), name = "daemon")) + // Important - don't use Cluster(context.system) in constructor because that would + // cause deadlock. The Cluster extension is currently being created and is waiting + // for response from GetClusterCoreRef in its constructor. + // Child actors are therefore created when GetClusterCoreRef is received - context.parent ! PublisherCreated(publisher) + var coreDaemon: Option[ActorRef] = None + + def createChildren(): Unit = { + val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. + withDispatcher(context.props.dispatcher), name = "publisher") + coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher). + withDispatcher(context.props.dispatcher), name = "daemon"))) + context.parent ! PublisherCreated(publisher) + } override val supervisorStrategy = OneForOneStrategy() { @@ -209,7 +225,10 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi override def postStop(): Unit = Cluster(context.system).shutdown() def receive = { - case InternalClusterAction.GetClusterCoreRef ⇒ sender() ! coreDaemon + case InternalClusterAction.GetClusterCoreRef ⇒ + if (coreDaemon.isEmpty) + createChildren() + coreDaemon.foreach(sender() ! _) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index b2271aafb9..a5ac2e2022 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -22,7 +22,9 @@ import akka.actor.DeadLetterSuppression private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { import ClusterHeartbeatSender._ - val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress) + // Important - don't use Cluster(context.system) in constructor because that would + // cause deadlock. See startup sequence in ClusterDaemon. + lazy val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress) def receive = { case Heartbeat(from) ⇒ sender() ! selfHeartbeatRsp diff --git a/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala new file mode 100644 index 0000000000..9f969cd4ed --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.cluster + +import language.postfixOps +import language.reflectiveCalls +import scala.concurrent.duration._ +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.actor.ExtendedActorSystem +import akka.actor.Address +import akka.cluster.InternalClusterAction._ +import java.lang.management.ManagementFactory +import javax.management.ObjectName +import akka.actor.ActorRef +import akka.testkit.TestProbe +import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorLogging + +object StartupWithOneThreadSpec { + val config = """ + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.actor.creation-timeout = 10s + akka.remote.netty.tcp.port = 0 + + akka.actor.default-dispatcher { + executor = thread-pool-executor + thread-pool-executor { + core-pool-size-min = 1 + core-pool-size-max = 1 + } + } + """ + + final case class GossipTo(address: Address) + + def testProps = Props(new Actor with ActorLogging { + val cluster = Cluster(context.system) + log.debug(s"started ${cluster.selfAddress} ${Thread.currentThread().getName}") + def receive = { + case msg ⇒ sender() ! msg + } + }) +} + +class StartupWithOneThreadSpec(startTime: Long) extends AkkaSpec(StartupWithOneThreadSpec.config) with ImplicitSender { + import StartupWithOneThreadSpec._ + + def this() = this(System.nanoTime()) + + "A Cluster" must { + + "startup with one dispatcher thread" in { + // This test failed before fixing #17253 when adding a sleep before the + // Await of GetClusterCoreRef in the Cluster extension constructor. + // The reason was that other cluster actors were started too early and + // they also tried to get the Cluster extension and thereby blocking + // dispatcher threads. + // Note that the Cluster extension is started via ClusterActorRefProvider + // before ActorSystem.apply returns, i.e. in the constructor of AkkaSpec. + (System.nanoTime - startTime).nanos.toMillis should be < + (system.settings.CreationTimeout.duration - 2.second).toMillis + system.actorOf(testProps) ! "hello" + system.actorOf(testProps) ! "hello" + system.actorOf(testProps) ! "hello" + + val cluster = Cluster(system) + (System.nanoTime - startTime).nanos.toMillis should be < + (system.settings.CreationTimeout.duration - 2.second).toMillis + + expectMsg("hello") + expectMsg("hello") + expectMsg("hello") + } + + } +} diff --git a/project/MiMa.scala b/project/MiMa.scala index 9101e09714..3bb1ea8e65 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -545,7 +545,12 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.japi.Pair.toString"), // #17805 - ProblemFilters.exclude[MissingMethodProblem]("akka.actor.ActorCell.clearActorFields") + ProblemFilters.exclude[MissingMethodProblem]("akka.actor.ActorCell.clearActorFields"), + + // internal changes introduced by #17253 + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ClusterDaemon.coreSupervisor"), + ProblemFilters.exclude[MissingMethodProblem]("akka.cluster.ClusterCoreSupervisor.publisher"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ClusterCoreSupervisor.coreDaemon") ) }