diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 33616f5812..f75c7f9018 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -8,9 +8,15 @@ akka { cluster { - # node to join - the full URI defined by a string on the form of "akka://system@hostname:port" - # leave as empty string if the node should be a singleton cluster - node-to-join = "" + # Initial contact points of the cluster. Nodes to join at startup if auto-join = on. + # The seed nodes also play the role of deputy nodes (the nodes responsible + # for breaking network partitions). + # Comma separated full URIs defined by a string on the form of "akka://system@hostname:port" + # Leave as empty if the node should be a singleton cluster. + seed-nodes = [] + + # automatic join the seed-nodes at startup + auto-join = on # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? auto-down = on diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3732019a50..a4765fc2cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -11,7 +11,7 @@ import akka.dispatch.Await import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.jsr166y.ThreadLocalRandom -import akka.pattern.ask +import akka.pattern._ import akka.remote._ import akka.routing._ import akka.util._ @@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable object ClusterUserAction { /** - * Command to join the cluster. Sent when a node (reprsesented by 'address') + * Command to join the cluster. Sent when a node (represented by 'address') * wants to join another node (the receiver). */ case class Join(address: Address) extends ClusterMessage + /** + * Start message of the process to join one of the seed nodes. + * The node sends `InitJoin` to all seed nodes, which replies + * with `InitJoinAck`. The first reply is used others are discarded. + * The node sends `Join` command to the seed node that replied first. + */ + case object JoinSeedNode extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case object InitJoin extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case class InitJoinAck(address: Address) extends ClusterMessage + /** * Command to leave the cluster. */ @@ -343,11 +361,28 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val log = Logging(context.system, this) def receive = { - case Join(address) ⇒ cluster.joining(address) - case Down(address) ⇒ cluster.downing(address) - case Leave(address) ⇒ cluster.leaving(address) - case Exit(address) ⇒ cluster.exiting(address) - case Remove(address) ⇒ cluster.removing(address) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) + case InitJoinAck(address) ⇒ cluster.join(address) + case Join(address) ⇒ cluster.joining(address) + case Down(address) ⇒ cluster.downing(address) + case Leave(address) ⇒ cluster.leaving(address) + case Exit(address) ⇒ cluster.exiting(address) + case Remove(address) ⇒ cluster.removing(address) + } + + def joinSeedNode(): Unit = { + val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) + yield self.path.toStringWithAddress(address) + if (seedRoutees.nonEmpty) { + // FIXME config of within (use JoinInProgressTimeout when that is in master) + implicit val within = Timeout(5 seconds) + val seedRouter = context.actorOf( + Props.empty.withRouter(ScatterGatherFirstCompletedRouter( + routees = seedRoutees, within = within.duration))) + seedRouter ? InitJoin pipeTo self + seedRouter ! PoisonPill + } } override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) @@ -479,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress) - private val serialization = remote.serialization private val _isRunning = new AtomicBoolean(true) @@ -507,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) new AtomicReference[State](State(seenVersionedGossip)) } - // try to join the node defined in the 'akka.cluster.node-to-join' option - autoJoin() + // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' + if (AutoJoin) joinSeedNode() // ======================================================== // ===================== WORK DAEMONS ===================== @@ -927,9 +960,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from /** - * Joins the pre-configured contact point. + * Joins the pre-configured contact points. */ - private def autoJoin(): Unit = nodeToJoin foreach join + private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode /** * INTERNAL API. @@ -999,6 +1032,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 1. gossip to alive members val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) + // FIXME does this work as intended? See ticket #2252 // 2. gossip to unreachable members if (localUnreachableSize > 0) { val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize) @@ -1006,11 +1040,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) } + // FIXME does this work as intended? See ticket #2252 // 3. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) - if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) { - val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes) + if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) { + val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size) if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(deputies) } @@ -1337,7 +1372,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = - addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress) + addresses filterNot (_ == selfAddress) intersect seedNodes + + /** + * INTERNAL API. + * + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes /** * INTERNAL API. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 64ae1c28cb..12ed666680 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -22,17 +22,16 @@ class ClusterSettings(val config: Config, val systemName: String) { final val FailureDetectorAcceptableHeartbeatPause: Duration = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { - case "" ⇒ None - case AddressFromURIString(addr) ⇒ Some(addr) - } + final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { + case AddressFromURIString(addr) ⇒ addr + }.toIndexedSeq final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") - final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoJoin = getBoolean("akka.cluster.auto-join") final val AutoDown = getBoolean("akka.cluster.auto-down") final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala new file mode 100644 index 0000000000..38f03a4e66 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + val ordinary1 = role("ordinary1") + val ordinary2 = role("ordinary2") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy + +abstract class JoinSeedNodeSpec + extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) + with MultiNodeClusterSpec { + + import JoinSeedNodeMultiJvmSpec._ + + override def seedNodes = IndexedSeq(seed1, seed2) + + "A cluster with configured seed nodes" must { + "join the seed nodes at startup" taggedAs LongRunningTest in { + + startClusterNode() + enterBarrier("all-started") + + awaitUpConvergence(4) + + enterBarrier("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 9fd8746923..79e3a67e1e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -26,7 +26,6 @@ object MultiNodeClusterSpec { leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms - nr-of-deputy-nodes = 2 } akka.test { single-expect-default = 5 s @@ -77,10 +76,22 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu throw t } + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty + /** * The cluster node instance. Needs to be lazily created. */ - private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + override def seedNodes: IndexedSeq[Address] = { + val testSeedNodes = MultiNodeClusterSpec.this.seedNodes + if (testSeedNodes.isEmpty) super.seedNodes + else testSeedNodes map address + } + } /** * Get the cluster node to use. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index cee5efc0db..50656a6a9d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -36,6 +36,8 @@ abstract class NodeJoinSpec startClusterNode() } + enterBarrier("first-started") + runOn(second) { cluster.join(first) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index c283665b30..086c2fb00a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -21,7 +21,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { - nr-of-deputy-nodes = 0 # FIXME remove this (use default) when ticket #2239 has been fixed gossip-interval = 400 ms } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 58f0683c25..5e44b0a4bc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -21,14 +21,14 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName) FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorAcceptableHeartbeatPause must be(3 seconds) - NodeToJoin must be(None) + SeedNodes must be(Seq.empty[String]) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) - NrOfDeputyNodes must be(3) + AutoJoin must be(true) AutoDown must be(true) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 229ec7137d..a0bc7f6450 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -11,12 +11,13 @@ import akka.actor.ExtendedActorSystem import akka.actor.Address import java.util.concurrent.atomic.AtomicInteger import org.scalatest.BeforeAndAfter +import akka.remote.RemoteActorRefProvider object ClusterSpec { val config = """ akka.cluster { + auto-join = off auto-down = off - nr-of-deputy-nodes = 3 periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks } akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -31,12 +32,24 @@ object ClusterSpec { class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { import ClusterSpec._ + val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address + val addresses = IndexedSeq( + selfAddress, + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) + val deterministicRandom = new AtomicInteger val failureDetector = new FailureDetectorPuppet(system) val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + // 3 deputy nodes (addresses index 1, 2, 3) + override def seedNodes = addresses.slice(1, 4) + override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { if (addresses.isEmpty) None else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size)) @@ -68,15 +81,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } - val selfAddress = cluster.self.address - val addresses = IndexedSeq( - selfAddress, - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) - def memberStatus(address: Address): Option[MemberStatus] = cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } @@ -89,6 +93,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "A Cluster" must { + "use the address of the remote transport" in { + cluster.selfAddress must be(selfAddress) + cluster.self.address must be(selfAddress) + } + "initially be singleton cluster and reach convergence immediately" in { cluster.isSingletonCluster must be(true) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) @@ -161,7 +170,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "gossip to duputy node" in { cluster._gossipToDeputyProbablity = 1.0 // always - // we have configured 2 deputy nodes + // we have configured 3 deputy nodes (seedNodes) cluster.gossip() // 1 is deputy cluster.gossip() // 2 is deputy cluster.gossip() // 3 is deputy diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 0126897dab..a9190420dc 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -183,14 +183,20 @@ according to the Failure Detector is considered unreachable. This means setting the unreachable node status to ``down`` automatically. +Seed Nodes +^^^^^^^^^^ + +The seed nodes are configured contact points for inital join of the cluster. +When a new node is started started it sends a message to all seed nodes and +then sends join command to the one that answers first. + +It is possible to turn off automatic join. + Deputy Nodes ^^^^^^^^^^^^ -After gossip convergence a set of ``deputy`` nodes for the cluster can be -determined. As with the ``leader``, there is no ``deputy`` election process, -the deputies can always be recognised deterministically by any node whenever there -is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number -of nodes (e.g. starting with the first node after the ``leader``) in sorted order. +The deputy nodes are the live members of the configured seed nodes. +It is preferred to use deputy nodes in different racks/data centers. The nodes defined as ``deputy`` nodes are just regular member nodes whose only "special role" is to help breaking logical partitions as seen in the gossip