diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b60b91ec43..613f6320d8 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -52,6 +52,10 @@ akka { # of the cluster within this deadline. join-timeout = 60s + # The id of the dispatcher to use for cluster actors. If not specified default dispatcher is used. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "" + # Gossip to random node with newer or older state information, if any with some # this probability. Otherwise Gossip to any random live node. # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 44c646ebe8..b1bf73eddb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -372,7 +372,8 @@ case class Heartbeat(from: Address) extends ClusterMessage * INTERNAL API. * * Manages routing of the different cluster commands. - * Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message. + * Instantiated as a single instance for each Cluster - e.g. commands are serialized + * to Cluster message after message, but concurrent with other types of messages. */ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Actor { import ClusterUserAction._ @@ -398,7 +399,7 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto if (seedRoutees.isEmpty) { cluster join cluster.selfAddress } else { - implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout) + implicit val within = Timeout(cluster.settings.SeedNodeTimeout) val seedRouter = context.actorOf( Props.empty.withRouter(ScatterGatherFirstCompletedRouter( routees = seedRoutees, within = within.duration))) @@ -415,18 +416,35 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto /** * INTERNAL API. * - * Pooled and routed with N number of configurable instances. - * Concurrent access to Cluster. + * Receives Gossip messages and delegates to Cluster. + * Instantiated as a single instance for each Cluster - e.g. gossips are serialized + * to Cluster message after message, but concurrent with other types of messages. */ -private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor { - val log = Logging(context.system, this) +private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor with ActorLogging { def receive = { - case Heartbeat(from) ⇒ cluster.receiveHeartbeat(from) case GossipEnvelope(from, gossip) ⇒ cluster.receiveGossip(from, gossip) } - override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown) + override def unhandled(unknown: Any) = log.error("[{}] can not respond to messages - received [{}]", + self.path, unknown) +} + +/** + * INTERNAL API. + * + * Receives Heartbeat messages and delegates to Cluster. + * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized + * to Cluster message after message, but concurrent with other types of messages. + */ +private[cluster] final class ClusterHeartbeatDaemon(cluster: Cluster) extends Actor with ActorLogging { + + def receive = { + case Heartbeat(from) ⇒ cluster.receiveHeartbeat(from) + } + + override def unhandled(unknown: Any) = log.error("[{}] can not respond to messages - received [{}]", + self.path, unknown) } /** @@ -434,17 +452,22 @@ private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor * * Supervisor managing the different Cluster daemons. */ -private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor { - val log = Logging(context.system, this) +private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor with ActorLogging { - private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)), "commands") - private val gossip = context.actorOf( - Props(new ClusterGossipDaemon(cluster)).withRouter( - RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip") + val configuredDispatcher = cluster.settings.UseDispatcher + private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)). + withDispatcher(configuredDispatcher), name = "commands") + private val gossip = context.actorOf(Props(new ClusterGossipDaemon(cluster)). + withDispatcher(configuredDispatcher). + withRouter(RoundRobinRouter(cluster.settings.NrOfGossipDaemons)), + name = "gossip") + private val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). + withDispatcher(configuredDispatcher), name = "heartbeat") def receive = Actor.emptyBehavior - override def unhandled(unknown: Any): Unit = log.error("[/system/cluster] can not respond to messages - received [{}]", unknown) + override def unhandled(unknown: Any): Unit = log.error("[{}] can not respond to messages - received [{}]", + self.path, unknown) } /** @@ -526,8 +549,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider] val remoteSettings = new RemoteSettings(system.settings.config, system.name) - val clusterSettings = new ClusterSettings(system.settings.config, system.name) - import clusterSettings._ + val settings = new ClusterSettings(system.settings.config, system.name) + import settings._ val selfAddress = remote.transport.address private val selfHeartbeat = Heartbeat(selfAddress) @@ -548,7 +571,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // create supervisor for daemons under path "/system/cluster" private val clusterDaemons = { - val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster") + val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)). + withDispatcher(UseDispatcher), name = "cluster") Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match { case a: ActorRef ⇒ a case e: Exception ⇒ throw e @@ -1138,7 +1162,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val beatTo = localState.latestGossip.members.toSeq.map(_.address) ++ localState.joinInProgress.keys for (address ← beatTo; if address != selfAddress) { - val connection = clusterGossipConnectionFor(address) + val connection = clusterHeartbeatConnectionFor(address) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) connection ! selfHeartbeat } @@ -1460,6 +1484,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip") + /** + * Looks up and returns the remote cluster heartbeat connection for the specific address. + */ + private def clusterHeartbeatConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + /** * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 6e4cbc4e60..2a63f32e83 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -10,6 +10,7 @@ import akka.ConfigurationException import scala.collection.JavaConverters._ import akka.actor.Address import akka.actor.AddressFromURIString +import akka.dispatch.Dispatchers class ClusterSettings(val config: Config, val systemName: String) { import config._ @@ -36,6 +37,10 @@ class ClusterSettings(val config: Config, val systemName: String) { final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) + final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match { + case "" ⇒ Dispatchers.DefaultDispatcherId + case id ⇒ id + } final val GossipDifferentViewProbability: Double = getDouble("akka.cluster.gossip-different-view-probability") final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 07671c6164..d146e22982 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -7,6 +7,7 @@ package akka.cluster import akka.testkit.AkkaSpec import akka.util.duration._ import akka.util.Duration +import akka.dispatch.Dispatchers @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ClusterConfigSpec extends AkkaSpec { @@ -32,6 +33,7 @@ class ClusterConfigSpec extends AkkaSpec { NrOfGossipDaemons must be(4) AutoJoin must be(true) AutoDown must be(true) + UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512)