diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 73cb24e92c..4da4dd6620 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -8,9 +8,18 @@ akka { cluster { - # node to join - the full URI defined by a string on the form of "akka://system@hostname:port" - # leave as empty string if the node should be a singleton cluster - node-to-join = "" + # Initial contact points of the cluster. Nodes to join at startup if auto-join = on. + # The seed nodes also play the role of deputy nodes (the nodes responsible + # for breaking network partitions). + # Comma separated full URIs defined by a string on the form of "akka://system@hostname:port" + # Leave as empty if the node should be a singleton cluster. + seed-nodes = [] + + # how long to wait for one of the seed nodes to reply to initial join request + seed-node-timeout = 5s + + # automatic join the seed-nodes at startup + auto-join = on # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? auto-down = on diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index db5f21607b..c397d065e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -57,7 +57,7 @@ object AccrualFailureDetector { * to this duration, with a with rather high standard deviation (since environment is unknown * in the beginning) * - * @clock The clock, returning current time in milliseconds, but can be faked for testing + * @param clock The clock, returning current time in milliseconds, but can be faked for testing * purposes. It is only used for measuring intervals (duration). * */ @@ -68,7 +68,7 @@ class AccrualFailureDetector( val minStdDeviation: Duration, val acceptableHeartbeatPause: Duration, val firstHeartbeatEstimate: Duration, - val clock: () ⇒ Long) extends FailureDetector { + val clock: () ⇒ Long = AccrualFailureDetector.realClock) extends FailureDetector { import AccrualFailureDetector._ @@ -77,8 +77,7 @@ class AccrualFailureDetector( */ def this( system: ActorSystem, - settings: ClusterSettings, - clock: () ⇒ Long = AccrualFailureDetector.realClock) = + settings: ClusterSettings) = this( system, settings.FailureDetectorThreshold, @@ -86,7 +85,7 @@ class AccrualFailureDetector( settings.FailureDetectorAcceptableHeartbeatPause, settings.FailureDetectorMinStdDeviation, settings.HeartbeatInterval, - clock) + AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector") diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 8232a762cf..caecf3906b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -11,7 +11,7 @@ import akka.dispatch.Await import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.jsr166y.ThreadLocalRandom -import akka.pattern.ask +import akka.pattern._ import akka.remote._ import akka.routing._ import akka.util._ @@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable object ClusterUserAction { /** - * Command to join the cluster. Sent when a node (reprsesented by 'address') + * Command to join the cluster. Sent when a node (represented by 'address') * wants to join another node (the receiver). */ case class Join(address: Address) extends ClusterMessage + /** + * Start message of the process to join one of the seed nodes. + * The node sends `InitJoin` to all seed nodes, which replies + * with `InitJoinAck`. The first reply is used others are discarded. + * The node sends `Join` command to the seed node that replied first. + */ + case object JoinSeedNode extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case object InitJoin extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case class InitJoinAck(address: Address) extends ClusterMessage + /** * Command to leave the cluster. */ @@ -346,11 +364,27 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val log = Logging(context.system, this) def receive = { - case Join(address) ⇒ cluster.joining(address) - case Down(address) ⇒ cluster.downing(address) - case Leave(address) ⇒ cluster.leaving(address) - case Exit(address) ⇒ cluster.exiting(address) - case Remove(address) ⇒ cluster.removing(address) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) + case InitJoinAck(address) ⇒ cluster.join(address) + case Join(address) ⇒ cluster.joining(address) + case Down(address) ⇒ cluster.downing(address) + case Leave(address) ⇒ cluster.leaving(address) + case Exit(address) ⇒ cluster.exiting(address) + case Remove(address) ⇒ cluster.removing(address) + } + + def joinSeedNode(): Unit = { + val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) + yield self.path.toStringWithAddress(address) + if (seedRoutees.nonEmpty) { + implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout) + val seedRouter = context.actorOf( + Props.empty.withRouter(ScatterGatherFirstCompletedRouter( + routees = seedRoutees, within = within.duration))) + seedRouter ? InitJoin pipeTo self + seedRouter ! PoisonPill + } } override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) @@ -480,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress) - private val serialization = remote.serialization private val _isRunning = new AtomicBoolean(true) @@ -508,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) new AtomicReference[State](State(seenVersionedGossip)) } - // try to join the node defined in the 'akka.cluster.node-to-join' option - autoJoin() + // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' + if (AutoJoin) joinSeedNode() // ======================================================== // ===================== WORK DAEMONS ===================== @@ -647,6 +679,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ def isAvailable: Boolean = !isUnavailable(state.get) + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + def seedNodes: IndexedSeq[Address] = SeedNodes + /** * Registers a listener to subscribe to cluster membership changes. */ @@ -950,9 +988,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from /** - * Joins the pre-configured contact point. + * Joins the pre-configured contact points. */ - private def autoJoin(): Unit = nodeToJoin foreach join + private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode /** * INTERNAL API. @@ -1016,8 +1054,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 2. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) - if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) { - val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes) + if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) { + val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size) if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(deputies) } @@ -1360,7 +1398,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = - addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress) + addresses filterNot (_ == selfAddress) intersect seedNodes /** * INTERNAL API. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index ba5d2a0b03..08a9b5160d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -22,10 +22,10 @@ class ClusterSettings(val config: Config, val systemName: String) { final val FailureDetectorAcceptableHeartbeatPause: Duration = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { - case "" ⇒ None - case AddressFromURIString(addr) ⇒ Some(addr) - } + final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { + case AddressFromURIString(addr) ⇒ addr + }.toIndexedSeq + final val SeedNodeTimeout: Duration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS) final val PeriodicTasksInitialDelay: Duration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val GossipInterval: Duration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) @@ -33,6 +33,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) final val NrOfGossipDaemons: Int = getInt("akka.cluster.nr-of-gossip-daemons") final val NrOfDeputyNodes: Int = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala new file mode 100644 index 0000000000..38f03a4e66 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + val ordinary1 = role("ordinary1") + val ordinary2 = role("ordinary2") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy + +abstract class JoinSeedNodeSpec + extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) + with MultiNodeClusterSpec { + + import JoinSeedNodeMultiJvmSpec._ + + override def seedNodes = IndexedSeq(seed1, seed2) + + "A cluster with configured seed nodes" must { + "join the seed nodes at startup" taggedAs LongRunningTest in { + + startClusterNode() + enterBarrier("all-started") + + awaitUpConvergence(4) + + enterBarrier("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 9fd8746923..79e3a67e1e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -26,7 +26,6 @@ object MultiNodeClusterSpec { leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms - nr-of-deputy-nodes = 2 } akka.test { single-expect-default = 5 s @@ -77,10 +76,22 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu throw t } + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty + /** * The cluster node instance. Needs to be lazily created. */ - private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + override def seedNodes: IndexedSeq[Address] = { + val testSeedNodes = MultiNodeClusterSpec.this.seedNodes + if (testSeedNodes.isEmpty) super.seedNodes + else testSeedNodes map address + } + } /** * Get the cluster node to use. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index cee5efc0db..50656a6a9d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -36,6 +36,8 @@ abstract class NodeJoinSpec startClusterNode() } + enterBarrier("first-started") + runOn(second) { cluster.join(first) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 3c74bc02e2..d661f0cc51 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -23,7 +23,8 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.cluster { - nr-of-deputy-nodes = 0 + # FIXME remove this (use default) when ticket #2239 has been fixed + gossip-interval = 400 ms } akka.loglevel = INFO """)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index e8d68303a0..92e219a540 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -21,7 +21,8 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName) FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorAcceptableHeartbeatPause must be(3 seconds) - NodeToJoin must be(None) + SeedNodes must be(Seq.empty[String]) + SeedNodeTimeout must be(5 seconds) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) @@ -29,7 +30,7 @@ class ClusterConfigSpec extends AkkaSpec { UnreachableNodesReaperInterval must be(1 second) JoinTimeout must be(60 seconds) NrOfGossipDaemons must be(4) - NrOfDeputyNodes must be(3) + AutoJoin must be(true) AutoDown must be(true) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 3abdf2bf9b..6f70193715 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -11,12 +11,13 @@ import akka.actor.ExtendedActorSystem import akka.actor.Address import java.util.concurrent.atomic.AtomicInteger import org.scalatest.BeforeAndAfter +import akka.remote.RemoteActorRefProvider object ClusterSpec { val config = """ akka.cluster { + auto-join = off auto-down = off - nr-of-deputy-nodes = 3 periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks } akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -31,12 +32,24 @@ object ClusterSpec { class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { import ClusterSpec._ + val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address + val addresses = IndexedSeq( + selfAddress, + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) + val deterministicRandom = new AtomicInteger val failureDetector = new FailureDetectorPuppet(system) val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + // 3 deputy nodes (addresses index 1, 2, 3) + override def seedNodes = addresses.slice(1, 4) + override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { if (addresses.isEmpty) None else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size)) @@ -60,15 +73,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } - val selfAddress = cluster.self.address - val addresses = IndexedSeq( - selfAddress, - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) - def memberStatus(address: Address): Option[MemberStatus] = cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } @@ -80,6 +84,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "A Cluster" must { + "use the address of the remote transport" in { + cluster.selfAddress must be(selfAddress) + cluster.self.address must be(selfAddress) + } + "initially be singleton cluster and reach convergence immediately" in { cluster.isSingletonCluster must be(true) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) @@ -141,7 +150,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "gossip to duputy node" in { cluster._gossipToDeputyProbablity = 1.0 // always - // we have configured 2 deputy nodes + // we have configured 3 deputy nodes (seedNodes) cluster.gossip() // 1 is deputy cluster.gossip() // 2 is deputy cluster.gossip() // 3 is deputy diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 833d56f51c..1812c33561 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -183,14 +183,20 @@ according to the Failure Detector is considered unreachable. This means setting the unreachable node status to ``down`` automatically. +Seed Nodes +^^^^^^^^^^ + +The seed nodes are configured contact points for inital join of the cluster. +When a new node is started started it sends a message to all seed nodes and +then sends join command to the one that answers first. + +It is possible to turn off automatic join. + Deputy Nodes ^^^^^^^^^^^^ -After gossip convergence a set of ``deputy`` nodes for the cluster can be -determined. As with the ``leader``, there is no ``deputy`` election process, -the deputies can always be recognised deterministically by any node whenever there -is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number -of nodes (e.g. starting with the first node after the ``leader``) in sorted order. +The deputy nodes are the live members of the configured seed nodes. +It is preferred to use deputy nodes in different racks/data centers. The nodes defined as ``deputy`` nodes are just regular member nodes whose only "special role" is to help breaking logical partitions as seen in the gossip diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf index f454716af0..1fb5cceeb1 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf @@ -13,50 +13,50 @@ akka { file-based { # directory below which this queue resides directory-path = "./_mb" - + # attempting to add an item after the queue reaches this size (in items) will fail. max-items = 2147483647 - + # attempting to add an item after the queue reaches this size (in bytes) will fail. max-size = 2147483647 bytes - + # attempting to add an item larger than this size (in bytes) will fail. max-item-size = 2147483647 bytes - + # maximum expiration time for this queue (seconds). max-age = 0s - + # maximum journal size before the journal should be rotated. max-journal-size = 16 MiB - + # maximum size of a queue before it drops into read-behind mode. max-memory-size = 128 MiB - + # maximum overflow (multiplier) of a journal file before we re-create it. max-journal-overflow = 10 - + # absolute maximum size of a journal file until we rebuild it, no matter what. max-journal-size-absolute = 9223372036854775807 bytes - + # whether to drop older items (instead of newer) when the queue is full - discard-old-when-full = on - + discard-old-when-full = on + # whether to keep a journal file at all - keep-journal = on - + keep-journal = on + # whether to sync the journal after each transaction sync-journal = off # circuit breaker configuration circuit-breaker { - # maximum number of failures before opening breaker - max-failures = 3 + # maximum number of failures before opening breaker + max-failures = 3 - # duration of time beyond which a call is assumed to be timed out and considered a failure - call-timeout = 3 seconds + # duration of time beyond which a call is assumed to be timed out and considered a failure + call-timeout = 3 seconds - # duration of time to wait until attempting to reset the breaker during which all calls fail-fast - reset-timeout = 30 seconds + # duration of time to wait until attempting to reset the breaker during which all calls fail-fast + reset-timeout = 30 seconds } } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index 8d2ce5b897..c703bf0b49 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -34,7 +34,7 @@ class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val (new java.io.File(settings.QueuePath)) match { case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir) case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir) - case _ ⇒ //All good + case _ ⇒ // All good } val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log) queue.setup // replays journal diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala index dff4021d96..27088dfc92 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -16,20 +16,20 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use val config = initialize import config._ - val QueuePath: String = getString("directory-path") - val MaxItems: Int = getInt("max-items") - val MaxSize: Long = getBytes("max-size") - val MaxItemSize: Long = getBytes("max-item-size") - val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) - val MaxJournalSize: Long = getBytes("max-journal-size") - val MaxMemorySize: Long = getBytes("max-memory-size") - val MaxJournalOverflow: Int = getInt("max-journal-overflow") - val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") - val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") - val KeepJournal: Boolean = getBoolean("keep-journal") - val SyncJournal: Boolean = getBoolean("sync-journal") + final val QueuePath: String = getString("directory-path") + final val MaxItems: Int = getInt("max-items") + final val MaxSize: Long = getBytes("max-size") + final val MaxItemSize: Long = getBytes("max-item-size") + final val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) + final val MaxJournalSize: Long = getBytes("max-journal-size") + final val MaxMemorySize: Long = getBytes("max-memory-size") + final val MaxJournalOverflow: Int = getInt("max-journal-overflow") + final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") + final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") + final val KeepJournal: Boolean = getBoolean("keep-journal") + final val SyncJournal: Boolean = getBoolean("sync-journal") - val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") - val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) - val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) -} \ No newline at end of file + final val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") + final val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) + final val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala index 1a5ddf4a8c..152b29406c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala @@ -68,44 +68,44 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F def overlay[T](base: ⇒ T) = new OverlaySetting(base) // attempting to add an item after the queue reaches this size (in items) will fail. - val maxItems = overlay(PersistentQueue.maxItems) + final val maxItems = overlay(PersistentQueue.maxItems) // attempting to add an item after the queue reaches this size (in bytes) will fail. - val maxSize = overlay(PersistentQueue.maxSize) + final val maxSize = overlay(PersistentQueue.maxSize) // attempting to add an item larger than this size (in bytes) will fail. - val maxItemSize = overlay(PersistentQueue.maxItemSize) + final val maxItemSize = overlay(PersistentQueue.maxItemSize) // maximum expiration time for this queue (seconds). - val maxAge = overlay(PersistentQueue.maxAge) + final val maxAge = overlay(PersistentQueue.maxAge) // maximum journal size before the journal should be rotated. - val maxJournalSize = overlay(PersistentQueue.maxJournalSize) + final val maxJournalSize = overlay(PersistentQueue.maxJournalSize) // maximum size of a queue before it drops into read-behind mode. - val maxMemorySize = overlay(PersistentQueue.maxMemorySize) + final val maxMemorySize = overlay(PersistentQueue.maxMemorySize) // maximum overflow (multiplier) of a journal file before we re-create it. - val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) + final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) // absolute maximum size of a journal file until we rebuild it, no matter what. - val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) + final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) // whether to drop older items (instead of newer) when the queue is full - val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) + final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) // whether to keep a journal file at all - val keepJournal = overlay(PersistentQueue.keepJournal) + final val keepJournal = overlay(PersistentQueue.keepJournal) // whether to sync the journal after each transaction - val syncJournal = overlay(PersistentQueue.syncJournal) + final val syncJournal = overlay(PersistentQueue.syncJournal) // (optional) move expired items over to this queue - val expiredQueue = overlay(PersistentQueue.expiredQueue) + final val expiredQueue = overlay(PersistentQueue.expiredQueue) private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log) - // track tentative removals + // track tentative remofinal vals private var xidCounter: Int = 0 private val openTransactions = new mutable.HashMap[Int, QItem] def openTransactionCount = openTransactions.size diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index e3bb5858f7..79ece7625d 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -68,11 +68,15 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ * Conventional organization of durable mailbox settings: * * {{{ - * my-durable-dispatcher { - * mailbox-type = "my.durable.mailbox" - * my-durable-mailbox { - * setting1 = 1 - * setting2 = 2 + * akka { + * actor { + * my-durable-dispatcher { + * mailbox-type = "my.durable.mailbox" + * my-durable-mailbox { + * setting1 = 1 + * setting2 = 2 + * } + * } * } * } * }}} diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index b6265125b1..eba0fffe63 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -282,6 +282,8 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex import akka.actor.FSM._ import Controller._ + var roleName: RoleName = null + startWith(Initial, None) whenUnhandled { @@ -292,12 +294,15 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } onTermination { - case _ ⇒ controller ! ClientDisconnected + case _ ⇒ + controller ! ClientDisconnected(roleName) + channel.close() } when(Initial, stateTimeout = 10 seconds) { case Event(Hello(name, addr), _) ⇒ - controller ! NodeInfo(RoleName(name), addr, self) + roleName = RoleName(name) + controller ! NodeInfo(roleName, addr, self) goto(Ready) case Event(x: NetworkOp, _) ⇒ log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x) @@ -334,10 +339,6 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } initialize - - onTermination { - case _ ⇒ channel.close() - } } /** @@ -517,10 +518,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n) stay using d.copy(clients = clients + n) case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒ - if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect") - (clients find (_.name == name)) match { - case None ⇒ stay - case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + if (arrived.isEmpty) + stay using d.copy(clients = clients.filterNot(_.name == name)) + else { + (clients find (_.name == name)) match { + case None ⇒ stay + case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala new file mode 100644 index 0000000000..2a709a99a7 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.testkit + +import akka.testkit.LongRunningTest + +object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig { + commonConfig(debugConfig(on = false)) + + val node1 = role("node1") + val node2 = role("node2") + val node3 = role("node3") + val node4 = role("node4") +} + +class MultiNodeSpecSpecMultiJvmNode1 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec + +class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) { + + import MultiNodeSpecMultiJvmSpec._ + + def initialParticipants = 4 + + "A MultiNodeSpec" must { + + "wait for all nodes to remove themselves before we shut the conductor down" taggedAs LongRunningTest in { + enterBarrier("startup") + // this test is empty here since it only exercises the shutdown code in the MultiNodeSpec + } + + } +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index f418f4a717..8ff95d0831 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -59,14 +59,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { val b = getBarrier() b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A))) - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect"))) + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { @@ -264,12 +259,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor) expectMsg(ToClient(Done)) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 4d65a2084e..25bb8df7dc 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -7,12 +7,13 @@ import java.net.InetSocketAddress import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } -import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem } +import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem } import akka.dispatch.Await import akka.dispatch.Await.Awaitable import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.testkit.AkkaSpec -import akka.util.{ Timeout, NonFatal, Duration } +import akka.util.{ Timeout, NonFatal } +import akka.util.duration._ /** * Configure the role names and participants of the test, including configuration settings. @@ -261,4 +262,16 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: // useful to see which jvm is running which role log.info("Role [{}] started", myself.name) + // wait for all nodes to remove themselves before we shut the conductor down + final override def beforeShutdown() = { + if (selfIndex == 0) { + testConductor.removeNode(myself) + within(testConductor.Settings.BarrierTimeout.duration) { + awaitCond { + testConductor.getNodes.await.filterNot(_ == myself).isEmpty + } + } + } + } + } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 424c913662..f9ee989e1c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -74,6 +74,7 @@ abstract class AkkaSpec(_system: ActorSystem) } final override def afterAll { + beforeShutdown() system.shutdown() try system.awaitTermination(5 seconds) catch { case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) @@ -83,6 +84,8 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atStartup() {} + protected def beforeShutdown() {} + protected def atTermination() {} def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) { diff --git a/repl b/repl deleted file mode 100644 index 701b021b35..0000000000 --- a/repl +++ /dev/null @@ -1,9 +0,0 @@ -import akka.actor._ -import akka.dispatch.{ Future, Promise } -import com.typesafe.config.ConfigFactory -val config=ConfigFactory.parseString("akka.daemonic=on") -val sys=ActorSystem("repl", config.withFallback(ConfigFactory.load())).asInstanceOf[ExtendedActorSystem] -implicit val ec=sys.dispatcher -import akka.util.duration._ -import akka.util.Timeout -implicit val timeout=Timeout(5 seconds)