diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 8a84e7644c..01a3cc2bea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -156,16 +156,24 @@ private[cluster] object InternalClusterAction { private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ - // Important - don't use Cluster(context.system) here because that would + // Important - don't use Cluster(context.system) in constructor because that would // cause deadlock. The Cluster extension is currently being created and is waiting // for response from GetClusterCoreRef in its constructor. - val coreSupervisor = context.actorOf(Props[ClusterCoreSupervisor]. - withDispatcher(context.props.dispatcher), name = "core") - context.actorOf(Props[ClusterHeartbeatReceiver]. - withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") + // Child actors are therefore created when GetClusterCoreRef is received + var coreSupervisor: Option[ActorRef] = None + + def createChildren(): Unit = { + coreSupervisor = Some(context.actorOf(Props[ClusterCoreSupervisor]. + withDispatcher(context.props.dispatcher), name = "core")) + context.actorOf(Props[ClusterHeartbeatReceiver]. + withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") + } def receive = { - case msg: GetClusterCoreRef.type ⇒ coreSupervisor forward msg + case msg: GetClusterCoreRef.type ⇒ + if (coreSupervisor.isEmpty) + createChildren() + coreSupervisor.foreach(_ forward msg) case AddOnMemberUpListener(code) ⇒ context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local)) case AddOnMemberRemovedListener(code) ⇒ @@ -191,12 +199,20 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ - val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. - withDispatcher(context.props.dispatcher), name = "publisher") - val coreDaemon = context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher). - withDispatcher(context.props.dispatcher), name = "daemon")) + // Important - don't use Cluster(context.system) in constructor because that would + // cause deadlock. The Cluster extension is currently being created and is waiting + // for response from GetClusterCoreRef in its constructor. + // Child actors are therefore created when GetClusterCoreRef is received - context.parent ! PublisherCreated(publisher) + var coreDaemon: Option[ActorRef] = None + + def createChildren(): Unit = { + val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. + withDispatcher(context.props.dispatcher), name = "publisher") + coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher). + withDispatcher(context.props.dispatcher), name = "daemon"))) + context.parent ! PublisherCreated(publisher) + } override val supervisorStrategy = OneForOneStrategy() { @@ -209,7 +225,10 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi override def postStop(): Unit = Cluster(context.system).shutdown() def receive = { - case InternalClusterAction.GetClusterCoreRef ⇒ sender() ! coreDaemon + case InternalClusterAction.GetClusterCoreRef ⇒ + if (coreDaemon.isEmpty) + createChildren() + coreDaemon.foreach(sender() ! _) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index b2271aafb9..a5ac2e2022 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -22,7 +22,9 @@ import akka.actor.DeadLetterSuppression private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { import ClusterHeartbeatSender._ - val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress) + // Important - don't use Cluster(context.system) in constructor because that would + // cause deadlock. See startup sequence in ClusterDaemon. + lazy val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress) def receive = { case Heartbeat(from) ⇒ sender() ! selfHeartbeatRsp diff --git a/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala new file mode 100644 index 0000000000..9f969cd4ed --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.cluster + +import language.postfixOps +import language.reflectiveCalls +import scala.concurrent.duration._ +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.actor.ExtendedActorSystem +import akka.actor.Address +import akka.cluster.InternalClusterAction._ +import java.lang.management.ManagementFactory +import javax.management.ObjectName +import akka.actor.ActorRef +import akka.testkit.TestProbe +import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorLogging + +object StartupWithOneThreadSpec { + val config = """ + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.actor.creation-timeout = 10s + akka.remote.netty.tcp.port = 0 + + akka.actor.default-dispatcher { + executor = thread-pool-executor + thread-pool-executor { + core-pool-size-min = 1 + core-pool-size-max = 1 + } + } + """ + + final case class GossipTo(address: Address) + + def testProps = Props(new Actor with ActorLogging { + val cluster = Cluster(context.system) + log.debug(s"started ${cluster.selfAddress} ${Thread.currentThread().getName}") + def receive = { + case msg ⇒ sender() ! msg + } + }) +} + +class StartupWithOneThreadSpec(startTime: Long) extends AkkaSpec(StartupWithOneThreadSpec.config) with ImplicitSender { + import StartupWithOneThreadSpec._ + + def this() = this(System.nanoTime()) + + "A Cluster" must { + + "startup with one dispatcher thread" in { + // This test failed before fixing #17253 when adding a sleep before the + // Await of GetClusterCoreRef in the Cluster extension constructor. + // The reason was that other cluster actors were started too early and + // they also tried to get the Cluster extension and thereby blocking + // dispatcher threads. + // Note that the Cluster extension is started via ClusterActorRefProvider + // before ActorSystem.apply returns, i.e. in the constructor of AkkaSpec. + (System.nanoTime - startTime).nanos.toMillis should be < + (system.settings.CreationTimeout.duration - 2.second).toMillis + system.actorOf(testProps) ! "hello" + system.actorOf(testProps) ! "hello" + system.actorOf(testProps) ! "hello" + + val cluster = Cluster(system) + (System.nanoTime - startTime).nanos.toMillis should be < + (system.settings.CreationTimeout.duration - 2.second).toMillis + + expectMsg("hello") + expectMsg("hello") + expectMsg("hello") + } + + } +} diff --git a/project/MiMa.scala b/project/MiMa.scala index 9101e09714..3bb1ea8e65 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -545,7 +545,12 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.japi.Pair.toString"), // #17805 - ProblemFilters.exclude[MissingMethodProblem]("akka.actor.ActorCell.clearActorFields") + ProblemFilters.exclude[MissingMethodProblem]("akka.actor.ActorCell.clearActorFields"), + + // internal changes introduced by #17253 + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ClusterDaemon.coreSupervisor"), + ProblemFilters.exclude[MissingMethodProblem]("akka.cluster.ClusterCoreSupervisor.publisher"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ClusterCoreSupervisor.coreDaemon") ) }