From a7c8d7da1042d6ca766bc56e1f8cf815d596d7da Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Jun 2012 19:22:25 +0200 Subject: [PATCH 1/7] Remove unnecessary clock param in one of AccrualFailureDetector's constructors --- .../main/scala/akka/cluster/AccrualFailureDetector.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index b10962ce11..6962fc10d6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -57,7 +57,7 @@ object AccrualFailureDetector { * to this duration, with a with rather high standard deviation (since environment is unknown * in the beginning) * - * @clock The clock, returning current time in milliseconds, but can be faked for testing + * @param clock The clock, returning current time in milliseconds, but can be faked for testing * purposes. It is only used for measuring intervals (duration). * */ @@ -68,7 +68,7 @@ class AccrualFailureDetector( val minStdDeviation: Duration, val acceptableHeartbeatPause: Duration, val firstHeartbeatEstimate: Duration, - val clock: () ⇒ Long) extends FailureDetector { + val clock: () ⇒ Long = AccrualFailureDetector.realClock) extends FailureDetector { import AccrualFailureDetector._ @@ -77,8 +77,7 @@ class AccrualFailureDetector( */ def this( system: ActorSystem, - settings: ClusterSettings, - clock: () ⇒ Long = AccrualFailureDetector.realClock) = + settings: ClusterSettings) = this( system, settings.FailureDetectorThreshold, @@ -90,7 +89,7 @@ class AccrualFailureDetector( // first real heartbeat is sent. Initial heartbeat is added when joining. // FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed settings.GossipInterval * 3 + settings.HeartbeatInterval, - clock) + AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector") From 42078e70836ef593f392324c05211dd14db5a88a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 21 Jun 2012 10:58:35 +0200 Subject: [PATCH 2/7] Reintroduce 'seed' nodes, see #2219 * Implement the join to seed nodes process When a new node is started started it sends a message to all seed nodes and then sends join command to the one that answers first. * Configuration of seed-nodes and auto-join * New JoinSeedNodeSpec that verifies the auto join to seed nodes * In tests seed nodes are configured by overriding seedNodes function, since addresses are not known before start * Deputy nodes are the live members of the seed nodes (not sure if that will be the final solution, see ticket 2252 * Updated cluster.rst with latest info about deputy and seed nodes --- .../src/main/resources/reference.conf | 12 ++- .../src/main/scala/akka/cluster/Cluster.scala | 75 +++++++++++++++---- .../scala/akka/cluster/ClusterSettings.scala | 9 +-- .../scala/akka/cluster/JoinSeedNodeSpec.scala | 46 ++++++++++++ .../akka/cluster/MultiNodeClusterSpec.scala | 15 +++- .../scala/akka/cluster/NodeJoinSpec.scala | 2 + .../scala/akka/cluster/SunnyWeatherSpec.scala | 1 - .../akka/cluster/ClusterConfigSpec.scala | 4 +- .../test/scala/akka/cluster/ClusterSpec.scala | 31 +++++--- akka-docs/cluster/cluster.rst | 16 ++-- 10 files changed, 166 insertions(+), 45 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 33616f5812..f75c7f9018 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -8,9 +8,15 @@ akka { cluster { - # node to join - the full URI defined by a string on the form of "akka://system@hostname:port" - # leave as empty string if the node should be a singleton cluster - node-to-join = "" + # Initial contact points of the cluster. Nodes to join at startup if auto-join = on. + # The seed nodes also play the role of deputy nodes (the nodes responsible + # for breaking network partitions). + # Comma separated full URIs defined by a string on the form of "akka://system@hostname:port" + # Leave as empty if the node should be a singleton cluster. + seed-nodes = [] + + # automatic join the seed-nodes at startup + auto-join = on # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? auto-down = on diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3732019a50..a4765fc2cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -11,7 +11,7 @@ import akka.dispatch.Await import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.jsr166y.ThreadLocalRandom -import akka.pattern.ask +import akka.pattern._ import akka.remote._ import akka.routing._ import akka.util._ @@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable object ClusterUserAction { /** - * Command to join the cluster. Sent when a node (reprsesented by 'address') + * Command to join the cluster. Sent when a node (represented by 'address') * wants to join another node (the receiver). */ case class Join(address: Address) extends ClusterMessage + /** + * Start message of the process to join one of the seed nodes. + * The node sends `InitJoin` to all seed nodes, which replies + * with `InitJoinAck`. The first reply is used others are discarded. + * The node sends `Join` command to the seed node that replied first. + */ + case object JoinSeedNode extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case object InitJoin extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case class InitJoinAck(address: Address) extends ClusterMessage + /** * Command to leave the cluster. */ @@ -343,11 +361,28 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val log = Logging(context.system, this) def receive = { - case Join(address) ⇒ cluster.joining(address) - case Down(address) ⇒ cluster.downing(address) - case Leave(address) ⇒ cluster.leaving(address) - case Exit(address) ⇒ cluster.exiting(address) - case Remove(address) ⇒ cluster.removing(address) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) + case InitJoinAck(address) ⇒ cluster.join(address) + case Join(address) ⇒ cluster.joining(address) + case Down(address) ⇒ cluster.downing(address) + case Leave(address) ⇒ cluster.leaving(address) + case Exit(address) ⇒ cluster.exiting(address) + case Remove(address) ⇒ cluster.removing(address) + } + + def joinSeedNode(): Unit = { + val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) + yield self.path.toStringWithAddress(address) + if (seedRoutees.nonEmpty) { + // FIXME config of within (use JoinInProgressTimeout when that is in master) + implicit val within = Timeout(5 seconds) + val seedRouter = context.actorOf( + Props.empty.withRouter(ScatterGatherFirstCompletedRouter( + routees = seedRoutees, within = within.duration))) + seedRouter ? InitJoin pipeTo self + seedRouter ! PoisonPill + } } override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) @@ -479,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress) - private val serialization = remote.serialization private val _isRunning = new AtomicBoolean(true) @@ -507,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) new AtomicReference[State](State(seenVersionedGossip)) } - // try to join the node defined in the 'akka.cluster.node-to-join' option - autoJoin() + // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' + if (AutoJoin) joinSeedNode() // ======================================================== // ===================== WORK DAEMONS ===================== @@ -927,9 +960,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from /** - * Joins the pre-configured contact point. + * Joins the pre-configured contact points. */ - private def autoJoin(): Unit = nodeToJoin foreach join + private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode /** * INTERNAL API. @@ -999,6 +1032,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 1. gossip to alive members val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) + // FIXME does this work as intended? See ticket #2252 // 2. gossip to unreachable members if (localUnreachableSize > 0) { val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize) @@ -1006,11 +1040,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) } + // FIXME does this work as intended? See ticket #2252 // 3. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) - if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) { - val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes) + if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) { + val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size) if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(deputies) } @@ -1337,7 +1372,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = - addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress) + addresses filterNot (_ == selfAddress) intersect seedNodes + + /** + * INTERNAL API. + * + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes /** * INTERNAL API. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 64ae1c28cb..12ed666680 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -22,17 +22,16 @@ class ClusterSettings(val config: Config, val systemName: String) { final val FailureDetectorAcceptableHeartbeatPause: Duration = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { - case "" ⇒ None - case AddressFromURIString(addr) ⇒ Some(addr) - } + final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { + case AddressFromURIString(addr) ⇒ addr + }.toIndexedSeq final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") - final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoJoin = getBoolean("akka.cluster.auto-join") final val AutoDown = getBoolean("akka.cluster.auto-down") final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala new file mode 100644 index 0000000000..38f03a4e66 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + val ordinary1 = role("ordinary1") + val ordinary2 = role("ordinary2") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy + +abstract class JoinSeedNodeSpec + extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) + with MultiNodeClusterSpec { + + import JoinSeedNodeMultiJvmSpec._ + + override def seedNodes = IndexedSeq(seed1, seed2) + + "A cluster with configured seed nodes" must { + "join the seed nodes at startup" taggedAs LongRunningTest in { + + startClusterNode() + enterBarrier("all-started") + + awaitUpConvergence(4) + + enterBarrier("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 9fd8746923..79e3a67e1e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -26,7 +26,6 @@ object MultiNodeClusterSpec { leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms - nr-of-deputy-nodes = 2 } akka.test { single-expect-default = 5 s @@ -77,10 +76,22 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu throw t } + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty + /** * The cluster node instance. Needs to be lazily created. */ - private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + override def seedNodes: IndexedSeq[Address] = { + val testSeedNodes = MultiNodeClusterSpec.this.seedNodes + if (testSeedNodes.isEmpty) super.seedNodes + else testSeedNodes map address + } + } /** * Get the cluster node to use. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index cee5efc0db..50656a6a9d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -36,6 +36,8 @@ abstract class NodeJoinSpec startClusterNode() } + enterBarrier("first-started") + runOn(second) { cluster.join(first) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index c283665b30..086c2fb00a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -21,7 +21,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { - nr-of-deputy-nodes = 0 # FIXME remove this (use default) when ticket #2239 has been fixed gossip-interval = 400 ms } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 58f0683c25..5e44b0a4bc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -21,14 +21,14 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName) FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorAcceptableHeartbeatPause must be(3 seconds) - NodeToJoin must be(None) + SeedNodes must be(Seq.empty[String]) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) - NrOfDeputyNodes must be(3) + AutoJoin must be(true) AutoDown must be(true) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 229ec7137d..a0bc7f6450 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -11,12 +11,13 @@ import akka.actor.ExtendedActorSystem import akka.actor.Address import java.util.concurrent.atomic.AtomicInteger import org.scalatest.BeforeAndAfter +import akka.remote.RemoteActorRefProvider object ClusterSpec { val config = """ akka.cluster { + auto-join = off auto-down = off - nr-of-deputy-nodes = 3 periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks } akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -31,12 +32,24 @@ object ClusterSpec { class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { import ClusterSpec._ + val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address + val addresses = IndexedSeq( + selfAddress, + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) + val deterministicRandom = new AtomicInteger val failureDetector = new FailureDetectorPuppet(system) val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + // 3 deputy nodes (addresses index 1, 2, 3) + override def seedNodes = addresses.slice(1, 4) + override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { if (addresses.isEmpty) None else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size)) @@ -68,15 +81,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } - val selfAddress = cluster.self.address - val addresses = IndexedSeq( - selfAddress, - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) - def memberStatus(address: Address): Option[MemberStatus] = cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } @@ -89,6 +93,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "A Cluster" must { + "use the address of the remote transport" in { + cluster.selfAddress must be(selfAddress) + cluster.self.address must be(selfAddress) + } + "initially be singleton cluster and reach convergence immediately" in { cluster.isSingletonCluster must be(true) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) @@ -161,7 +170,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "gossip to duputy node" in { cluster._gossipToDeputyProbablity = 1.0 // always - // we have configured 2 deputy nodes + // we have configured 3 deputy nodes (seedNodes) cluster.gossip() // 1 is deputy cluster.gossip() // 2 is deputy cluster.gossip() // 3 is deputy diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 0126897dab..a9190420dc 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -183,14 +183,20 @@ according to the Failure Detector is considered unreachable. This means setting the unreachable node status to ``down`` automatically. +Seed Nodes +^^^^^^^^^^ + +The seed nodes are configured contact points for inital join of the cluster. +When a new node is started started it sends a message to all seed nodes and +then sends join command to the one that answers first. + +It is possible to turn off automatic join. + Deputy Nodes ^^^^^^^^^^^^ -After gossip convergence a set of ``deputy`` nodes for the cluster can be -determined. As with the ``leader``, there is no ``deputy`` election process, -the deputies can always be recognised deterministically by any node whenever there -is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number -of nodes (e.g. starting with the first node after the ``leader``) in sorted order. +The deputy nodes are the live members of the configured seed nodes. +It is preferred to use deputy nodes in different racks/data centers. The nodes defined as ``deputy`` nodes are just regular member nodes whose only "special role" is to help breaking logical partitions as seen in the gossip From 4e49b2c843ab08ddc08ee0ae98b9626b2a88eb02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 21 Jun 2012 13:16:35 +0200 Subject: [PATCH 3/7] Make MultiNodeSpec shut down the conductor after other nodes. See #2230 --- .../akka/remote/testconductor/Conductor.scala | 24 +++++++------ .../remote/testkit/MultiNodeSpecSpec.scala | 36 +++++++++++++++++++ .../remote/testconductor/BarrierSpec.scala | 20 ++++------- .../akka/remote/testkit/MultiNodeSpec.scala | 18 ++++++++-- .../test/scala/akka/testkit/AkkaSpec.scala | 3 ++ 5 files changed, 75 insertions(+), 26 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index b6265125b1..eba0fffe63 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -282,6 +282,8 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex import akka.actor.FSM._ import Controller._ + var roleName: RoleName = null + startWith(Initial, None) whenUnhandled { @@ -292,12 +294,15 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } onTermination { - case _ ⇒ controller ! ClientDisconnected + case _ ⇒ + controller ! ClientDisconnected(roleName) + channel.close() } when(Initial, stateTimeout = 10 seconds) { case Event(Hello(name, addr), _) ⇒ - controller ! NodeInfo(RoleName(name), addr, self) + roleName = RoleName(name) + controller ! NodeInfo(roleName, addr, self) goto(Ready) case Event(x: NetworkOp, _) ⇒ log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x) @@ -334,10 +339,6 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } initialize - - onTermination { - case _ ⇒ channel.close() - } } /** @@ -517,10 +518,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n) stay using d.copy(clients = clients + n) case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒ - if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect") - (clients find (_.name == name)) match { - case None ⇒ stay - case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + if (arrived.isEmpty) + stay using d.copy(clients = clients.filterNot(_.name == name)) + else { + (clients find (_.name == name)) match { + case None ⇒ stay + case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala new file mode 100644 index 0000000000..2a709a99a7 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.testkit + +import akka.testkit.LongRunningTest + +object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig { + commonConfig(debugConfig(on = false)) + + val node1 = role("node1") + val node2 = role("node2") + val node3 = role("node3") + val node4 = role("node4") +} + +class MultiNodeSpecSpecMultiJvmNode1 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec + +class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) { + + import MultiNodeSpecMultiJvmSpec._ + + def initialParticipants = 4 + + "A MultiNodeSpec" must { + + "wait for all nodes to remove themselves before we shut the conductor down" taggedAs LongRunningTest in { + enterBarrier("startup") + // this test is empty here since it only exercises the shutdown code in the MultiNodeSpec + } + + } +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index f418f4a717..8ff95d0831 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -59,14 +59,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { val b = getBarrier() b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A))) - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect"))) + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { @@ -264,12 +259,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor) expectMsg(ToClient(Done)) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 4d65a2084e..9f88f9e1c8 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -7,12 +7,13 @@ import java.net.InetSocketAddress import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } -import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem } +import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem } import akka.dispatch.Await import akka.dispatch.Await.Awaitable import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.testkit.AkkaSpec -import akka.util.{ Timeout, NonFatal, Duration } +import akka.util.{ Timeout, NonFatal } +import akka.util.duration._ /** * Configure the role names and participants of the test, including configuration settings. @@ -261,4 +262,17 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: // useful to see which jvm is running which role log.info("Role [{}] started", myself.name) + // wait for all nodes to remove themselves before we shut the conductor down + final override def beforeShutdown() = { + if (selfIndex == 0) { + testConductor.removeNode(myself) + within(testConductor.Settings.BarrierTimeout.duration) { + awaitCond { + val nodes = testConductor.getNodes.await + nodes.size < 1 || (nodes.size == 1 && nodes.head == myself) + } + } + } + } + } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 424c913662..f9ee989e1c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -74,6 +74,7 @@ abstract class AkkaSpec(_system: ActorSystem) } final override def afterAll { + beforeShutdown() system.shutdown() try system.awaitTermination(5 seconds) catch { case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) @@ -83,6 +84,8 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atStartup() {} + protected def beforeShutdown() {} + protected def atTermination() {} def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) { From 9710cceacb810428618c0803fc4f4e816f01b495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 21 Jun 2012 16:10:04 +0200 Subject: [PATCH 4/7] Changes based on review. See #2230 --- .../src/test/scala/akka/remote/testkit/MultiNodeSpec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 9f88f9e1c8..25bb8df7dc 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -268,8 +268,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: testConductor.removeNode(myself) within(testConductor.Settings.BarrierTimeout.duration) { awaitCond { - val nodes = testConductor.getNodes.await - nodes.size < 1 || (nodes.size == 1 && nodes.head == myself) + testConductor.getNodes.await.filterNot(_ == myself).isEmpty } } } From 5fe9dcaf4e126487a8ff5a9f4e416a7b6edf4b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 24 Jun 2012 14:43:46 +0200 Subject: [PATCH 5/7] Cleaned up stuff around file-based mailbox --- .../src/main/resources/reference.conf | 38 +++++++++---------- .../akka/actor/mailbox/FileBasedMailbox.scala | 9 ++--- .../mailbox/FileBasedMailboxSettings.scala | 32 ++++++++-------- .../mailbox/filequeue/PersistentQueue.scala | 26 ++++++------- .../akka/actor/mailbox/DurableMailbox.scala | 14 ++++--- 5 files changed, 61 insertions(+), 58 deletions(-) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf index f454716af0..1fb5cceeb1 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf @@ -13,50 +13,50 @@ akka { file-based { # directory below which this queue resides directory-path = "./_mb" - + # attempting to add an item after the queue reaches this size (in items) will fail. max-items = 2147483647 - + # attempting to add an item after the queue reaches this size (in bytes) will fail. max-size = 2147483647 bytes - + # attempting to add an item larger than this size (in bytes) will fail. max-item-size = 2147483647 bytes - + # maximum expiration time for this queue (seconds). max-age = 0s - + # maximum journal size before the journal should be rotated. max-journal-size = 16 MiB - + # maximum size of a queue before it drops into read-behind mode. max-memory-size = 128 MiB - + # maximum overflow (multiplier) of a journal file before we re-create it. max-journal-overflow = 10 - + # absolute maximum size of a journal file until we rebuild it, no matter what. max-journal-size-absolute = 9223372036854775807 bytes - + # whether to drop older items (instead of newer) when the queue is full - discard-old-when-full = on - + discard-old-when-full = on + # whether to keep a journal file at all - keep-journal = on - + keep-journal = on + # whether to sync the journal after each transaction sync-journal = off # circuit breaker configuration circuit-breaker { - # maximum number of failures before opening breaker - max-failures = 3 + # maximum number of failures before opening breaker + max-failures = 3 - # duration of time beyond which a call is assumed to be timed out and considered a failure - call-timeout = 3 seconds + # duration of time beyond which a call is assumed to be timed out and considered a failure + call-timeout = 3 seconds - # duration of time to wait until attempting to reset the breaker during which all calls fail-fast - reset-timeout = 30 seconds + # duration of time to wait until attempting to reset the breaker during which all calls fail-fast + reset-timeout = 30 seconds } } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index fccb6b5aea..1416e8f148 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -21,18 +21,17 @@ class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } - class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? - private val log = Logging(system, "FileBasedMessageQueue") - val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + private val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + + private val log = Logging(system, "FileBasedMessageQueue") private val queue = try { (new java.io.File(settings.QueuePath)) match { case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir) case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir) - case _ ⇒ //All good + case _ ⇒ // All good } val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log) queue.setup // replays journal diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala index dff4021d96..27088dfc92 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -16,20 +16,20 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use val config = initialize import config._ - val QueuePath: String = getString("directory-path") - val MaxItems: Int = getInt("max-items") - val MaxSize: Long = getBytes("max-size") - val MaxItemSize: Long = getBytes("max-item-size") - val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) - val MaxJournalSize: Long = getBytes("max-journal-size") - val MaxMemorySize: Long = getBytes("max-memory-size") - val MaxJournalOverflow: Int = getInt("max-journal-overflow") - val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") - val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") - val KeepJournal: Boolean = getBoolean("keep-journal") - val SyncJournal: Boolean = getBoolean("sync-journal") + final val QueuePath: String = getString("directory-path") + final val MaxItems: Int = getInt("max-items") + final val MaxSize: Long = getBytes("max-size") + final val MaxItemSize: Long = getBytes("max-item-size") + final val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) + final val MaxJournalSize: Long = getBytes("max-journal-size") + final val MaxMemorySize: Long = getBytes("max-memory-size") + final val MaxJournalOverflow: Int = getInt("max-journal-overflow") + final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") + final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") + final val KeepJournal: Boolean = getBoolean("keep-journal") + final val SyncJournal: Boolean = getBoolean("sync-journal") - val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") - val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) - val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) -} \ No newline at end of file + final val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") + final val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) + final val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala index 1a5ddf4a8c..152b29406c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala @@ -68,44 +68,44 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F def overlay[T](base: ⇒ T) = new OverlaySetting(base) // attempting to add an item after the queue reaches this size (in items) will fail. - val maxItems = overlay(PersistentQueue.maxItems) + final val maxItems = overlay(PersistentQueue.maxItems) // attempting to add an item after the queue reaches this size (in bytes) will fail. - val maxSize = overlay(PersistentQueue.maxSize) + final val maxSize = overlay(PersistentQueue.maxSize) // attempting to add an item larger than this size (in bytes) will fail. - val maxItemSize = overlay(PersistentQueue.maxItemSize) + final val maxItemSize = overlay(PersistentQueue.maxItemSize) // maximum expiration time for this queue (seconds). - val maxAge = overlay(PersistentQueue.maxAge) + final val maxAge = overlay(PersistentQueue.maxAge) // maximum journal size before the journal should be rotated. - val maxJournalSize = overlay(PersistentQueue.maxJournalSize) + final val maxJournalSize = overlay(PersistentQueue.maxJournalSize) // maximum size of a queue before it drops into read-behind mode. - val maxMemorySize = overlay(PersistentQueue.maxMemorySize) + final val maxMemorySize = overlay(PersistentQueue.maxMemorySize) // maximum overflow (multiplier) of a journal file before we re-create it. - val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) + final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) // absolute maximum size of a journal file until we rebuild it, no matter what. - val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) + final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) // whether to drop older items (instead of newer) when the queue is full - val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) + final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) // whether to keep a journal file at all - val keepJournal = overlay(PersistentQueue.keepJournal) + final val keepJournal = overlay(PersistentQueue.keepJournal) // whether to sync the journal after each transaction - val syncJournal = overlay(PersistentQueue.syncJournal) + final val syncJournal = overlay(PersistentQueue.syncJournal) // (optional) move expired items over to this queue - val expiredQueue = overlay(PersistentQueue.expiredQueue) + final val expiredQueue = overlay(PersistentQueue.expiredQueue) private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log) - // track tentative removals + // track tentative remofinal vals private var xidCounter: Int = 0 private val openTransactions = new mutable.HashMap[Int, QItem] def openTransactionCount = openTransactions.size diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index b21878d00e..ff985b44cc 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -69,11 +69,15 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ * Conventional organization of durable mailbox settings: * * {{{ - * my-durable-dispatcher { - * mailbox-type = "my.durable.mailbox" - * my-durable-mailbox { - * setting1 = 1 - * setting2 = 2 + * akka { + * actor { + * my-durable-dispatcher { + * mailbox-type = "my.durable.mailbox" + * my-durable-mailbox { + * setting1 = 1 + * setting2 = 2 + * } + * } * } * } * }}} From 66bcca8a918839a0f2d7df64d5573f47a4354848 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Jun 2012 15:46:30 +0200 Subject: [PATCH 6/7] Shorter gossip interval still needed --- .../src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 3c74bc02e2..0f9a8a8c73 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -24,6 +24,8 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { nr-of-deputy-nodes = 0 + # FIXME remove this (use default) when ticket #2239 has been fixed + gossip-interval = 400 ms } akka.loglevel = INFO """)) From 738565883b8602388bcc76a2a363dac50996d5b4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Jun 2012 20:20:11 +0200 Subject: [PATCH 7/7] Add join-seed-node-timeout config, see #2219 --- akka-cluster/src/main/resources/reference.conf | 3 +++ .../src/main/scala/akka/cluster/Cluster.scala | 17 +++++++---------- .../scala/akka/cluster/ClusterSettings.scala | 1 + .../scala/akka/cluster/ClusterConfigSpec.scala | 1 + 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index f75c7f9018..60b934a864 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -15,6 +15,9 @@ akka { # Leave as empty if the node should be a singleton cluster. seed-nodes = [] + # how long to wait for one of the seed nodes to reply to initial join request + join-seed-node-timeout = 5s + # automatic join the seed-nodes at startup auto-join = on diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index a4765fc2cf..0cf79d7102 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -375,8 +375,7 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) yield self.path.toStringWithAddress(address) if (seedRoutees.nonEmpty) { - // FIXME config of within (use JoinInProgressTimeout when that is in master) - implicit val within = Timeout(5 seconds) + implicit val within = Timeout(cluster.clusterSettings.JoinSeedNodeTimeout) val seedRouter = context.actorOf( Props.empty.withRouter(ScatterGatherFirstCompletedRouter( routees = seedRoutees, within = within.duration))) @@ -679,6 +678,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ def isAvailable: Boolean = !isUnavailable(state.get) + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + def seedNodes: IndexedSeq[Address] = SeedNodes + /** * Registers a listener to subscribe to cluster membership changes. */ @@ -1374,14 +1379,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = addresses filterNot (_ == selfAddress) intersect seedNodes - /** - * INTERNAL API. - * - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. - */ - private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes - /** * INTERNAL API. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 12ed666680..c026b8c1a0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -25,6 +25,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr }.toIndexedSeq + final val JoinSeedNodeTimeout = Duration(getMilliseconds("akka.cluster.join-seed-node-timeout"), MILLISECONDS) final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 5e44b0a4bc..d5a9752e5e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -22,6 +22,7 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorAcceptableHeartbeatPause must be(3 seconds) SeedNodes must be(Seq.empty[String]) + JoinSeedNodeTimeout must be(5 seconds) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second)