From a7c8d7da1042d6ca766bc56e1f8cf815d596d7da Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Jun 2012 19:22:25 +0200 Subject: [PATCH 01/12] Remove unnecessary clock param in one of AccrualFailureDetector's constructors --- .../main/scala/akka/cluster/AccrualFailureDetector.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index b10962ce11..6962fc10d6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -57,7 +57,7 @@ object AccrualFailureDetector { * to this duration, with a with rather high standard deviation (since environment is unknown * in the beginning) * - * @clock The clock, returning current time in milliseconds, but can be faked for testing + * @param clock The clock, returning current time in milliseconds, but can be faked for testing * purposes. It is only used for measuring intervals (duration). * */ @@ -68,7 +68,7 @@ class AccrualFailureDetector( val minStdDeviation: Duration, val acceptableHeartbeatPause: Duration, val firstHeartbeatEstimate: Duration, - val clock: () ⇒ Long) extends FailureDetector { + val clock: () ⇒ Long = AccrualFailureDetector.realClock) extends FailureDetector { import AccrualFailureDetector._ @@ -77,8 +77,7 @@ class AccrualFailureDetector( */ def this( system: ActorSystem, - settings: ClusterSettings, - clock: () ⇒ Long = AccrualFailureDetector.realClock) = + settings: ClusterSettings) = this( system, settings.FailureDetectorThreshold, @@ -90,7 +89,7 @@ class AccrualFailureDetector( // first real heartbeat is sent. Initial heartbeat is added when joining. // FIXME this can be changed to HeartbeatInterval when ticket #2249 is fixed settings.GossipInterval * 3 + settings.HeartbeatInterval, - clock) + AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector") From 42078e70836ef593f392324c05211dd14db5a88a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 21 Jun 2012 10:58:35 +0200 Subject: [PATCH 02/12] Reintroduce 'seed' nodes, see #2219 * Implement the join to seed nodes process When a new node is started started it sends a message to all seed nodes and then sends join command to the one that answers first. * Configuration of seed-nodes and auto-join * New JoinSeedNodeSpec that verifies the auto join to seed nodes * In tests seed nodes are configured by overriding seedNodes function, since addresses are not known before start * Deputy nodes are the live members of the seed nodes (not sure if that will be the final solution, see ticket 2252 * Updated cluster.rst with latest info about deputy and seed nodes --- .../src/main/resources/reference.conf | 12 ++- .../src/main/scala/akka/cluster/Cluster.scala | 75 +++++++++++++++---- .../scala/akka/cluster/ClusterSettings.scala | 9 +-- .../scala/akka/cluster/JoinSeedNodeSpec.scala | 46 ++++++++++++ .../akka/cluster/MultiNodeClusterSpec.scala | 15 +++- .../scala/akka/cluster/NodeJoinSpec.scala | 2 + .../scala/akka/cluster/SunnyWeatherSpec.scala | 1 - .../akka/cluster/ClusterConfigSpec.scala | 4 +- .../test/scala/akka/cluster/ClusterSpec.scala | 31 +++++--- akka-docs/cluster/cluster.rst | 16 ++-- 10 files changed, 166 insertions(+), 45 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 33616f5812..f75c7f9018 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -8,9 +8,15 @@ akka { cluster { - # node to join - the full URI defined by a string on the form of "akka://system@hostname:port" - # leave as empty string if the node should be a singleton cluster - node-to-join = "" + # Initial contact points of the cluster. Nodes to join at startup if auto-join = on. + # The seed nodes also play the role of deputy nodes (the nodes responsible + # for breaking network partitions). + # Comma separated full URIs defined by a string on the form of "akka://system@hostname:port" + # Leave as empty if the node should be a singleton cluster. + seed-nodes = [] + + # automatic join the seed-nodes at startup + auto-join = on # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? auto-down = on diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3732019a50..a4765fc2cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -11,7 +11,7 @@ import akka.dispatch.Await import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.jsr166y.ThreadLocalRandom -import akka.pattern.ask +import akka.pattern._ import akka.remote._ import akka.routing._ import akka.util._ @@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable object ClusterUserAction { /** - * Command to join the cluster. Sent when a node (reprsesented by 'address') + * Command to join the cluster. Sent when a node (represented by 'address') * wants to join another node (the receiver). */ case class Join(address: Address) extends ClusterMessage + /** + * Start message of the process to join one of the seed nodes. + * The node sends `InitJoin` to all seed nodes, which replies + * with `InitJoinAck`. The first reply is used others are discarded. + * The node sends `Join` command to the seed node that replied first. + */ + case object JoinSeedNode extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case object InitJoin extends ClusterMessage + + /** + * @see JoinSeedNode + */ + case class InitJoinAck(address: Address) extends ClusterMessage + /** * Command to leave the cluster. */ @@ -343,11 +361,28 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val log = Logging(context.system, this) def receive = { - case Join(address) ⇒ cluster.joining(address) - case Down(address) ⇒ cluster.downing(address) - case Leave(address) ⇒ cluster.leaving(address) - case Exit(address) ⇒ cluster.exiting(address) - case Remove(address) ⇒ cluster.removing(address) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) + case InitJoinAck(address) ⇒ cluster.join(address) + case Join(address) ⇒ cluster.joining(address) + case Down(address) ⇒ cluster.downing(address) + case Leave(address) ⇒ cluster.leaving(address) + case Exit(address) ⇒ cluster.exiting(address) + case Remove(address) ⇒ cluster.removing(address) + } + + def joinSeedNode(): Unit = { + val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) + yield self.path.toStringWithAddress(address) + if (seedRoutees.nonEmpty) { + // FIXME config of within (use JoinInProgressTimeout when that is in master) + implicit val within = Timeout(5 seconds) + val seedRouter = context.actorOf( + Props.empty.withRouter(ScatterGatherFirstCompletedRouter( + routees = seedRoutees, within = within.duration))) + seedRouter ? InitJoin pipeTo self + seedRouter ! PoisonPill + } } override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) @@ -479,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress) - private val serialization = remote.serialization private val _isRunning = new AtomicBoolean(true) @@ -507,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) new AtomicReference[State](State(seenVersionedGossip)) } - // try to join the node defined in the 'akka.cluster.node-to-join' option - autoJoin() + // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' + if (AutoJoin) joinSeedNode() // ======================================================== // ===================== WORK DAEMONS ===================== @@ -927,9 +960,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from /** - * Joins the pre-configured contact point. + * Joins the pre-configured contact points. */ - private def autoJoin(): Unit = nodeToJoin foreach join + private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode /** * INTERNAL API. @@ -999,6 +1032,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 1. gossip to alive members val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) + // FIXME does this work as intended? See ticket #2252 // 2. gossip to unreachable members if (localUnreachableSize > 0) { val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize) @@ -1006,11 +1040,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) } + // FIXME does this work as intended? See ticket #2252 // 3. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) - if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) { - val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes) + if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) { + val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size) if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(deputies) } @@ -1337,7 +1372,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = - addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress) + addresses filterNot (_ == selfAddress) intersect seedNodes + + /** + * INTERNAL API. + * + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes /** * INTERNAL API. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 64ae1c28cb..12ed666680 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -22,17 +22,16 @@ class ClusterSettings(val config: Config, val systemName: String) { final val FailureDetectorAcceptableHeartbeatPause: Duration = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { - case "" ⇒ None - case AddressFromURIString(addr) ⇒ Some(addr) - } + final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { + case AddressFromURIString(addr) ⇒ addr + }.toIndexedSeq final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") - final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoJoin = getBoolean("akka.cluster.auto-join") final val AutoDown = getBoolean("akka.cluster.auto-down") final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala new file mode 100644 index 0000000000..38f03a4e66 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + val ordinary1 = role("ordinary1") + val ordinary2 = role("ordinary2") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy + +abstract class JoinSeedNodeSpec + extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) + with MultiNodeClusterSpec { + + import JoinSeedNodeMultiJvmSpec._ + + override def seedNodes = IndexedSeq(seed1, seed2) + + "A cluster with configured seed nodes" must { + "join the seed nodes at startup" taggedAs LongRunningTest in { + + startClusterNode() + enterBarrier("all-started") + + awaitUpConvergence(4) + + enterBarrier("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 9fd8746923..79e3a67e1e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -26,7 +26,6 @@ object MultiNodeClusterSpec { leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms - nr-of-deputy-nodes = 2 } akka.test { single-expect-default = 5 s @@ -77,10 +76,22 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu throw t } + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty + /** * The cluster node instance. Needs to be lazily created. */ - private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + override def seedNodes: IndexedSeq[Address] = { + val testSeedNodes = MultiNodeClusterSpec.this.seedNodes + if (testSeedNodes.isEmpty) super.seedNodes + else testSeedNodes map address + } + } /** * Get the cluster node to use. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index cee5efc0db..50656a6a9d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -36,6 +36,8 @@ abstract class NodeJoinSpec startClusterNode() } + enterBarrier("first-started") + runOn(second) { cluster.join(first) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index c283665b30..086c2fb00a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -21,7 +21,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { - nr-of-deputy-nodes = 0 # FIXME remove this (use default) when ticket #2239 has been fixed gossip-interval = 400 ms } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 58f0683c25..5e44b0a4bc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -21,14 +21,14 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName) FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorAcceptableHeartbeatPause must be(3 seconds) - NodeToJoin must be(None) + SeedNodes must be(Seq.empty[String]) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) - NrOfDeputyNodes must be(3) + AutoJoin must be(true) AutoDown must be(true) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 229ec7137d..a0bc7f6450 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -11,12 +11,13 @@ import akka.actor.ExtendedActorSystem import akka.actor.Address import java.util.concurrent.atomic.AtomicInteger import org.scalatest.BeforeAndAfter +import akka.remote.RemoteActorRefProvider object ClusterSpec { val config = """ akka.cluster { + auto-join = off auto-down = off - nr-of-deputy-nodes = 3 periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks } akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -31,12 +32,24 @@ object ClusterSpec { class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { import ClusterSpec._ + val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address + val addresses = IndexedSeq( + selfAddress, + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) + val deterministicRandom = new AtomicInteger val failureDetector = new FailureDetectorPuppet(system) val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { + // 3 deputy nodes (addresses index 1, 2, 3) + override def seedNodes = addresses.slice(1, 4) + override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { if (addresses.isEmpty) None else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size)) @@ -68,15 +81,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } - val selfAddress = cluster.self.address - val addresses = IndexedSeq( - selfAddress, - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), - Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) - def memberStatus(address: Address): Option[MemberStatus] = cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } @@ -89,6 +93,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "A Cluster" must { + "use the address of the remote transport" in { + cluster.selfAddress must be(selfAddress) + cluster.self.address must be(selfAddress) + } + "initially be singleton cluster and reach convergence immediately" in { cluster.isSingletonCluster must be(true) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) @@ -161,7 +170,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "gossip to duputy node" in { cluster._gossipToDeputyProbablity = 1.0 // always - // we have configured 2 deputy nodes + // we have configured 3 deputy nodes (seedNodes) cluster.gossip() // 1 is deputy cluster.gossip() // 2 is deputy cluster.gossip() // 3 is deputy diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 0126897dab..a9190420dc 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -183,14 +183,20 @@ according to the Failure Detector is considered unreachable. This means setting the unreachable node status to ``down`` automatically. +Seed Nodes +^^^^^^^^^^ + +The seed nodes are configured contact points for inital join of the cluster. +When a new node is started started it sends a message to all seed nodes and +then sends join command to the one that answers first. + +It is possible to turn off automatic join. + Deputy Nodes ^^^^^^^^^^^^ -After gossip convergence a set of ``deputy`` nodes for the cluster can be -determined. As with the ``leader``, there is no ``deputy`` election process, -the deputies can always be recognised deterministically by any node whenever there -is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number -of nodes (e.g. starting with the first node after the ``leader``) in sorted order. +The deputy nodes are the live members of the configured seed nodes. +It is preferred to use deputy nodes in different racks/data centers. The nodes defined as ``deputy`` nodes are just regular member nodes whose only "special role" is to help breaking logical partitions as seen in the gossip From 4e49b2c843ab08ddc08ee0ae98b9626b2a88eb02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 21 Jun 2012 13:16:35 +0200 Subject: [PATCH 03/12] Make MultiNodeSpec shut down the conductor after other nodes. See #2230 --- .../akka/remote/testconductor/Conductor.scala | 24 +++++++------ .../remote/testkit/MultiNodeSpecSpec.scala | 36 +++++++++++++++++++ .../remote/testconductor/BarrierSpec.scala | 20 ++++------- .../akka/remote/testkit/MultiNodeSpec.scala | 18 ++++++++-- .../test/scala/akka/testkit/AkkaSpec.scala | 3 ++ 5 files changed, 75 insertions(+), 26 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index b6265125b1..eba0fffe63 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -282,6 +282,8 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex import akka.actor.FSM._ import Controller._ + var roleName: RoleName = null + startWith(Initial, None) whenUnhandled { @@ -292,12 +294,15 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } onTermination { - case _ ⇒ controller ! ClientDisconnected + case _ ⇒ + controller ! ClientDisconnected(roleName) + channel.close() } when(Initial, stateTimeout = 10 seconds) { case Event(Hello(name, addr), _) ⇒ - controller ! NodeInfo(RoleName(name), addr, self) + roleName = RoleName(name) + controller ! NodeInfo(roleName, addr, self) goto(Ready) case Event(x: NetworkOp, _) ⇒ log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x) @@ -334,10 +339,6 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } initialize - - onTermination { - case _ ⇒ channel.close() - } } /** @@ -517,10 +518,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n) stay using d.copy(clients = clients + n) case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒ - if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect") - (clients find (_.name == name)) match { - case None ⇒ stay - case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + if (arrived.isEmpty) + stay using d.copy(clients = clients.filterNot(_.name == name)) + else { + (clients find (_.name == name)) match { + case None ⇒ stay + case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala new file mode 100644 index 0000000000..2a709a99a7 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.testkit + +import akka.testkit.LongRunningTest + +object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig { + commonConfig(debugConfig(on = false)) + + val node1 = role("node1") + val node2 = role("node2") + val node3 = role("node3") + val node4 = role("node4") +} + +class MultiNodeSpecSpecMultiJvmNode1 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec + +class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) { + + import MultiNodeSpecMultiJvmSpec._ + + def initialParticipants = 4 + + "A MultiNodeSpec" must { + + "wait for all nodes to remove themselves before we shut the conductor down" taggedAs LongRunningTest in { + enterBarrier("startup") + // this test is empty here since it only exercises the shutdown code in the MultiNodeSpec + } + + } +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index f418f4a717..8ff95d0831 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -59,14 +59,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { val b = getBarrier() b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A))) - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect"))) + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { @@ -264,12 +259,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor) expectMsg(ToClient(Done)) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 4d65a2084e..9f88f9e1c8 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -7,12 +7,13 @@ import java.net.InetSocketAddress import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } -import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem } +import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem } import akka.dispatch.Await import akka.dispatch.Await.Awaitable import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.testkit.AkkaSpec -import akka.util.{ Timeout, NonFatal, Duration } +import akka.util.{ Timeout, NonFatal } +import akka.util.duration._ /** * Configure the role names and participants of the test, including configuration settings. @@ -261,4 +262,17 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: // useful to see which jvm is running which role log.info("Role [{}] started", myself.name) + // wait for all nodes to remove themselves before we shut the conductor down + final override def beforeShutdown() = { + if (selfIndex == 0) { + testConductor.removeNode(myself) + within(testConductor.Settings.BarrierTimeout.duration) { + awaitCond { + val nodes = testConductor.getNodes.await + nodes.size < 1 || (nodes.size == 1 && nodes.head == myself) + } + } + } + } + } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 424c913662..f9ee989e1c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -74,6 +74,7 @@ abstract class AkkaSpec(_system: ActorSystem) } final override def afterAll { + beforeShutdown() system.shutdown() try system.awaitTermination(5 seconds) catch { case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) @@ -83,6 +84,8 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atStartup() {} + protected def beforeShutdown() {} + protected def atTermination() {} def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) { From 9710cceacb810428618c0803fc4f4e816f01b495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 21 Jun 2012 16:10:04 +0200 Subject: [PATCH 04/12] Changes based on review. See #2230 --- .../src/test/scala/akka/remote/testkit/MultiNodeSpec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 9f88f9e1c8..25bb8df7dc 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -268,8 +268,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: testConductor.removeNode(myself) within(testConductor.Settings.BarrierTimeout.duration) { awaitCond { - val nodes = testConductor.getNodes.await - nodes.size < 1 || (nodes.size == 1 && nodes.head == myself) + testConductor.getNodes.await.filterNot(_ == myself).isEmpty } } } From 5fe9dcaf4e126487a8ff5a9f4e416a7b6edf4b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 24 Jun 2012 14:43:46 +0200 Subject: [PATCH 05/12] Cleaned up stuff around file-based mailbox --- .../src/main/resources/reference.conf | 38 +++++++++---------- .../akka/actor/mailbox/FileBasedMailbox.scala | 9 ++--- .../mailbox/FileBasedMailboxSettings.scala | 32 ++++++++-------- .../mailbox/filequeue/PersistentQueue.scala | 26 ++++++------- .../akka/actor/mailbox/DurableMailbox.scala | 14 ++++--- 5 files changed, 61 insertions(+), 58 deletions(-) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf index f454716af0..1fb5cceeb1 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf @@ -13,50 +13,50 @@ akka { file-based { # directory below which this queue resides directory-path = "./_mb" - + # attempting to add an item after the queue reaches this size (in items) will fail. max-items = 2147483647 - + # attempting to add an item after the queue reaches this size (in bytes) will fail. max-size = 2147483647 bytes - + # attempting to add an item larger than this size (in bytes) will fail. max-item-size = 2147483647 bytes - + # maximum expiration time for this queue (seconds). max-age = 0s - + # maximum journal size before the journal should be rotated. max-journal-size = 16 MiB - + # maximum size of a queue before it drops into read-behind mode. max-memory-size = 128 MiB - + # maximum overflow (multiplier) of a journal file before we re-create it. max-journal-overflow = 10 - + # absolute maximum size of a journal file until we rebuild it, no matter what. max-journal-size-absolute = 9223372036854775807 bytes - + # whether to drop older items (instead of newer) when the queue is full - discard-old-when-full = on - + discard-old-when-full = on + # whether to keep a journal file at all - keep-journal = on - + keep-journal = on + # whether to sync the journal after each transaction sync-journal = off # circuit breaker configuration circuit-breaker { - # maximum number of failures before opening breaker - max-failures = 3 + # maximum number of failures before opening breaker + max-failures = 3 - # duration of time beyond which a call is assumed to be timed out and considered a failure - call-timeout = 3 seconds + # duration of time beyond which a call is assumed to be timed out and considered a failure + call-timeout = 3 seconds - # duration of time to wait until attempting to reset the breaker during which all calls fail-fast - reset-timeout = 30 seconds + # duration of time to wait until attempting to reset the breaker during which all calls fail-fast + reset-timeout = 30 seconds } } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index fccb6b5aea..1416e8f148 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -21,18 +21,17 @@ class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } - class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? - private val log = Logging(system, "FileBasedMessageQueue") - val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + private val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + + private val log = Logging(system, "FileBasedMessageQueue") private val queue = try { (new java.io.File(settings.QueuePath)) match { case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir) case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir) - case _ ⇒ //All good + case _ ⇒ // All good } val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log) queue.setup // replays journal diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala index dff4021d96..27088dfc92 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -16,20 +16,20 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use val config = initialize import config._ - val QueuePath: String = getString("directory-path") - val MaxItems: Int = getInt("max-items") - val MaxSize: Long = getBytes("max-size") - val MaxItemSize: Long = getBytes("max-item-size") - val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) - val MaxJournalSize: Long = getBytes("max-journal-size") - val MaxMemorySize: Long = getBytes("max-memory-size") - val MaxJournalOverflow: Int = getInt("max-journal-overflow") - val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") - val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") - val KeepJournal: Boolean = getBoolean("keep-journal") - val SyncJournal: Boolean = getBoolean("sync-journal") + final val QueuePath: String = getString("directory-path") + final val MaxItems: Int = getInt("max-items") + final val MaxSize: Long = getBytes("max-size") + final val MaxItemSize: Long = getBytes("max-item-size") + final val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS) + final val MaxJournalSize: Long = getBytes("max-journal-size") + final val MaxMemorySize: Long = getBytes("max-memory-size") + final val MaxJournalOverflow: Int = getInt("max-journal-overflow") + final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute") + final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full") + final val KeepJournal: Boolean = getBoolean("keep-journal") + final val SyncJournal: Boolean = getBoolean("sync-journal") - val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") - val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) - val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) -} \ No newline at end of file + final val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") + final val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) + final val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) +} diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala index 1a5ddf4a8c..152b29406c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala @@ -68,44 +68,44 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F def overlay[T](base: ⇒ T) = new OverlaySetting(base) // attempting to add an item after the queue reaches this size (in items) will fail. - val maxItems = overlay(PersistentQueue.maxItems) + final val maxItems = overlay(PersistentQueue.maxItems) // attempting to add an item after the queue reaches this size (in bytes) will fail. - val maxSize = overlay(PersistentQueue.maxSize) + final val maxSize = overlay(PersistentQueue.maxSize) // attempting to add an item larger than this size (in bytes) will fail. - val maxItemSize = overlay(PersistentQueue.maxItemSize) + final val maxItemSize = overlay(PersistentQueue.maxItemSize) // maximum expiration time for this queue (seconds). - val maxAge = overlay(PersistentQueue.maxAge) + final val maxAge = overlay(PersistentQueue.maxAge) // maximum journal size before the journal should be rotated. - val maxJournalSize = overlay(PersistentQueue.maxJournalSize) + final val maxJournalSize = overlay(PersistentQueue.maxJournalSize) // maximum size of a queue before it drops into read-behind mode. - val maxMemorySize = overlay(PersistentQueue.maxMemorySize) + final val maxMemorySize = overlay(PersistentQueue.maxMemorySize) // maximum overflow (multiplier) of a journal file before we re-create it. - val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) + final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow) // absolute maximum size of a journal file until we rebuild it, no matter what. - val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) + final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute) // whether to drop older items (instead of newer) when the queue is full - val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) + final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull) // whether to keep a journal file at all - val keepJournal = overlay(PersistentQueue.keepJournal) + final val keepJournal = overlay(PersistentQueue.keepJournal) // whether to sync the journal after each transaction - val syncJournal = overlay(PersistentQueue.syncJournal) + final val syncJournal = overlay(PersistentQueue.syncJournal) // (optional) move expired items over to this queue - val expiredQueue = overlay(PersistentQueue.expiredQueue) + final val expiredQueue = overlay(PersistentQueue.expiredQueue) private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log) - // track tentative removals + // track tentative remofinal vals private var xidCounter: Int = 0 private val openTransactions = new mutable.HashMap[Int, QItem] def openTransactionCount = openTransactions.size diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index b21878d00e..ff985b44cc 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -69,11 +69,15 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ * Conventional organization of durable mailbox settings: * * {{{ - * my-durable-dispatcher { - * mailbox-type = "my.durable.mailbox" - * my-durable-mailbox { - * setting1 = 1 - * setting2 = 2 + * akka { + * actor { + * my-durable-dispatcher { + * mailbox-type = "my.durable.mailbox" + * my-durable-mailbox { + * setting1 = 1 + * setting2 = 2 + * } + * } * } * } * }}} From cba64403a70ab8cbc47c32cf868f1ffdd79cd284 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Jun 2012 15:23:15 +0200 Subject: [PATCH 06/12] Don't gossip to unreachable, see #2263 * Also, ignore gossip from unreachable, see #2264 * Update gossip protocol in cluster doc --- .../src/main/scala/akka/cluster/Cluster.scala | 92 +++++++++---------- .../test/scala/akka/cluster/ClusterSpec.scala | 36 -------- akka-docs/cluster/cluster.rst | 9 +- 3 files changed, 44 insertions(+), 93 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 7593245587..55b3311dee 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -197,6 +197,9 @@ case class GossipOverview( seen: Map[Address, VectorClock] = Map.empty, unreachable: Set[Member] = Set.empty) { + def isNonDownUnreachable(address: Address): Boolean = + unreachable.exists { m ⇒ m.address == address && m.status != Down } + override def toString = "GossipOverview(seen = [" + seen.mkString(", ") + "], unreachable = [" + unreachable.mkString(", ") + @@ -751,7 +754,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localUnreachable = localGossip.overview.unreachable val alreadyMember = localMembers.exists(_.address == node) - val isUnreachable = localUnreachable.exists { m ⇒ m.address == node && m.status != Down } + val isUnreachable = localGossip.overview.isNonDownUnreachable(node) if (!alreadyMember && !isUnreachable) { @@ -898,46 +901,49 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localState = state.get val localGossip = localState.latestGossip - val winningGossip = - if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) { - // a fresh singleton cluster that is joining, no need to merge, use received gossip - remoteGossip + if (!localGossip.overview.isNonDownUnreachable(from)) { - } else if (remoteGossip.version <> localGossip.version) { - // concurrent - val mergedGossip = remoteGossip merge localGossip - val versionedMergedGossip = mergedGossip :+ vclockNode + val winningGossip = + if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) { + // a fresh singleton cluster that is joining, no need to merge, use received gossip + remoteGossip - log.debug( - """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""", - remoteGossip, localGossip, versionedMergedGossip) + } else if (remoteGossip.version <> localGossip.version) { + // concurrent + val mergedGossip = remoteGossip merge localGossip + val versionedMergedGossip = mergedGossip :+ vclockNode - versionedMergedGossip + log.debug( + """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""", + remoteGossip, localGossip, versionedMergedGossip) - } else if (remoteGossip.version < localGossip.version) { - // local gossip is newer - localGossip + versionedMergedGossip - } else { - // remote gossip is newer - remoteGossip + } else if (remoteGossip.version < localGossip.version) { + // local gossip is newer + localGossip + + } else { + // remote gossip is newer + remoteGossip + } + + val newJoinInProgress = + if (localState.joinInProgress.isEmpty) localState.joinInProgress + else localState.joinInProgress -- + winningGossip.members.map(_.address) -- + winningGossip.overview.unreachable.map(_.address) + + val newState = localState copy ( + latestGossip = winningGossip seen selfAddress, + joinInProgress = newJoinInProgress) + + // if we won the race then update else try again + if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update + else { + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + notifyMembershipChangeListeners(localState, newState) } - - val newJoinInProgress = - if (localState.joinInProgress.isEmpty) localState.joinInProgress - else localState.joinInProgress -- - winningGossip.members.map(_.address) -- - winningGossip.overview.unreachable.map(_.address) - - val newState = localState copy ( - latestGossip = winningGossip seen selfAddress, - joinInProgress = newJoinInProgress) - - // if we won the race then update else try again - if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update - else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - notifyMembershipChangeListeners(localState, newState) } } @@ -975,15 +981,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) peer } - /** - * INTERNAL API. - */ - private[cluster] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = - (membersSize + unreachableSize) match { - case 0 ⇒ 0.0 - case sum ⇒ unreachableSize.toDouble / sum - } - /** * INTERNAL API. */ @@ -1019,13 +1016,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 1. gossip to alive members val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) - // 2. gossip to unreachable members - if (localUnreachableSize > 0) { - val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize) - if (ThreadLocalRandom.current.nextDouble() < probability) - gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) - } - // 3. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 229ec7137d..3abdf2bf9b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -50,14 +50,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { testActor ! GossipTo(address) } - @volatile - var _gossipToUnreachableProbablity = 0.0 - - override def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = { - if (_gossipToUnreachableProbablity < 0.0) super.gossipToUnreachableProbablity(membersSize, unreachableSize) - else _gossipToUnreachableProbablity - } - @volatile var _gossipToDeputyProbablity = 0.0 @@ -81,7 +73,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } before { - cluster._gossipToUnreachableProbablity = 0.0 cluster._gossipToDeputyProbablity = 0.0 addresses foreach failureDetector.remove deterministicRandom.set(0) @@ -133,17 +124,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { expectNoMsg(1 second) } - "use certain probability for gossiping to unreachable node depending on the number of unreachable and live nodes" in { - cluster._gossipToUnreachableProbablity = -1.0 // use real impl - cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(9, 1)) - cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(10, 2)) - cluster.gossipToUnreachableProbablity(10, 5) must be < (cluster.gossipToUnreachableProbablity(10, 9)) - cluster.gossipToUnreachableProbablity(0, 10) must be <= (1.0) - cluster.gossipToUnreachableProbablity(1, 10) must be <= (1.0) - cluster.gossipToUnreachableProbablity(10, 0) must be(0.0 plusOrMinus (0.0001)) - cluster.gossipToUnreachableProbablity(0, 0) must be(0.0 plusOrMinus (0.0001)) - } - "use certain probability for gossiping to deputy node depending on the number of unreachable and live nodes" in { cluster._gossipToDeputyProbablity = -1.0 // use real impl cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(9, 1, 2)) @@ -178,22 +158,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } - "gossip to random unreachable node" in { - val dead = Set(addresses(1)) - dead foreach failureDetector.markNodeAsUnavailable - cluster._gossipToUnreachableProbablity = 1.0 // always - - cluster.reapUnreachableMembers() - cluster.latestGossip.overview.unreachable.map(_.address) must be(dead) - - cluster.gossip() - - expectMsg(GossipTo(addresses(2))) // first available - expectMsg(GossipTo(addresses(1))) // the unavailable - - expectNoMsg(1 second) - } - "gossip to random deputy node if number of live nodes is less than number of deputy nodes" in { cluster._gossipToDeputyProbablity = -1.0 // real impl // 0 and 2 still alive diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 0126897dab..833d56f51c 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -213,7 +213,7 @@ nodes involved in a gossip exchange. Periodically, the default is every 1 second, each node chooses another random node to initiate a round of gossip with. The choice of node is random but can -also include extra gossiping for unreachable nodes, ``deputy`` nodes, and nodes with +also include extra gossiping for ``deputy`` nodes, and nodes with either newer or older state versions. The gossip overview contains the current state version for all nodes and also a @@ -228,14 +228,11 @@ During each round of gossip exchange the following process is used: 1. Gossip to random live node (if any) -2. Gossip to random unreachable node with certain probability depending on the - number of unreachable and live nodes - -3. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live +2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with certain probability depending on number of unreachable, ``deputy``, and live nodes. -4. Gossip to random node with newer or older state information, based on the +3. Gossip to random node with newer or older state information, based on the current gossip overview, with some probability (?) The gossiper only sends the gossip overview to the chosen node. The recipient of From 66bcca8a918839a0f2d7df64d5573f47a4354848 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Jun 2012 15:46:30 +0200 Subject: [PATCH 07/12] Shorter gossip interval still needed --- .../src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 3c74bc02e2..0f9a8a8c73 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -24,6 +24,8 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { nr-of-deputy-nodes = 0 + # FIXME remove this (use default) when ticket #2239 has been fixed + gossip-interval = 400 ms } akka.loglevel = INFO """)) From 738565883b8602388bcc76a2a363dac50996d5b4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Jun 2012 20:20:11 +0200 Subject: [PATCH 08/12] Add join-seed-node-timeout config, see #2219 --- akka-cluster/src/main/resources/reference.conf | 3 +++ .../src/main/scala/akka/cluster/Cluster.scala | 17 +++++++---------- .../scala/akka/cluster/ClusterSettings.scala | 1 + .../scala/akka/cluster/ClusterConfigSpec.scala | 1 + 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index f75c7f9018..60b934a864 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -15,6 +15,9 @@ akka { # Leave as empty if the node should be a singleton cluster. seed-nodes = [] + # how long to wait for one of the seed nodes to reply to initial join request + join-seed-node-timeout = 5s + # automatic join the seed-nodes at startup auto-join = on diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index a4765fc2cf..0cf79d7102 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -375,8 +375,7 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) yield self.path.toStringWithAddress(address) if (seedRoutees.nonEmpty) { - // FIXME config of within (use JoinInProgressTimeout when that is in master) - implicit val within = Timeout(5 seconds) + implicit val within = Timeout(cluster.clusterSettings.JoinSeedNodeTimeout) val seedRouter = context.actorOf( Props.empty.withRouter(ScatterGatherFirstCompletedRouter( routees = seedRoutees, within = within.duration))) @@ -679,6 +678,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) */ def isAvailable: Boolean = !isUnavailable(state.get) + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + def seedNodes: IndexedSeq[Address] = SeedNodes + /** * Registers a listener to subscribe to cluster membership changes. */ @@ -1374,14 +1379,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = addresses filterNot (_ == selfAddress) intersect seedNodes - /** - * INTERNAL API. - * - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. - */ - private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes - /** * INTERNAL API. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 12ed666680..c026b8c1a0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -25,6 +25,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr }.toIndexedSeq + final val JoinSeedNodeTimeout = Duration(getMilliseconds("akka.cluster.join-seed-node-timeout"), MILLISECONDS) final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 5e44b0a4bc..d5a9752e5e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -22,6 +22,7 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorAcceptableHeartbeatPause must be(3 seconds) SeedNodes must be(Seq.empty[String]) + JoinSeedNodeTimeout must be(5 seconds) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) From 97bf8c4bb527bd917c39c36b6dce4f9adcbd5642 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Jun 2012 20:46:48 +0200 Subject: [PATCH 09/12] Cleanup of comments, see #2263 --- .../src/main/scala/akka/cluster/Cluster.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 55b3311dee..8232a762cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -439,15 +439,12 @@ trait ClusterNodeMBean { /** * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live * and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round - * of Gossip with it. Whenever it gets gossip updates it updates the Failure Detector with the liveness - * information. + * of Gossip with it. *

- * During each of these runs the member initiates gossip exchange according to following rules (as defined in the - * Cassandra documentation [http://wiki.apache.org/cassandra/ArchitectureGossip]: + * During each of these runs the member initiates gossip exchange according to following rules: *

  *   1) Gossip to random live member (if any)
- *   2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
- *   3) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
+ *   2) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
  *       gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
  * 
* @@ -1016,7 +1013,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 1. gossip to alive members val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) - // 3. gossip to a deputy nodes for facilitating partition healing + // 2. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) { From 86449fd12d9e3503ae09c3b423360a9b20ad84b3 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 26 Jun 2012 12:09:59 +1200 Subject: [PATCH 10/12] Explicitly name Akka.pdf in sphinx doc generation --- project/Sphinx.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/project/Sphinx.scala b/project/Sphinx.scala index 43b7e60358..4707215875 100644 --- a/project/Sphinx.scala +++ b/project/Sphinx.scala @@ -87,16 +87,15 @@ object Sphinx { def pdfTask = (sphinxLatex, streams) map { (latex, s) => { - val empty = (latex * "*.pdf").get.isEmpty + val pdf = latex / "Akka.pdf" def failed = sys.error("Failed to build Sphinx pdf documentation.") - if (empty) { + if (!pdf.exists) { s.log.info("Building Sphinx pdf documentation...") val logger = newLogger(s) val exitCode = Process(Seq("make", "all-pdf"), latex) ! logger if (exitCode != 0) failed + s.log.info("Sphinx pdf documentation created: %s" format pdf) } - val pdf = (latex * "*.pdf").get.headOption.getOrElse(failed) - if (empty) s.log.info("Sphinx pdf documentation created: %s" format pdf) pdf } } From 370c07b438a4e7193e759db56b12c13915d15ceb Mon Sep 17 00:00:00 2001 From: phaller Date: Thu, 31 May 2012 13:53:03 +0200 Subject: [PATCH 11/12] Correcting example in ScalaDoc for Stash --- akka-actor/src/main/scala/akka/actor/Stash.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 386bc0f070..2415b38618 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -15,7 +15,8 @@ import akka.AkkaException * class ActorWithProtocol extends Actor with Stash { * def receive = { * case "open" ⇒ - * unstashAll { + * unstashAll() + * context.become { * case "write" ⇒ // do writing... * case "close" ⇒ * unstashAll() From 8b17099f5000d2439706ffaa778b68f3284a5982 Mon Sep 17 00:00:00 2001 From: phaller Date: Tue, 12 Jun 2012 15:51:54 +0200 Subject: [PATCH 12/12] Adding Stash section in Actors docs (including docs for the Java API). Example code added to ActorDocSpec/UntypedActorDocTestBase. --- .../docs/actor/UntypedActorDocTestBase.java | 29 +++++++++++ akka-docs/java/untyped-actors.rst | 42 ++++++++++++++++ akka-docs/scala/actors.rst | 48 +++++++++++++++++++ .../scala/code/docs/actor/ActorDocSpec.scala | 20 ++++++++ 4 files changed, 139 insertions(+) diff --git a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java index c82ce30661..c2fb455cfb 100644 --- a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java @@ -50,6 +50,10 @@ import java.util.concurrent.TimeUnit; import java.util.ArrayList; //#import-askPipe +//#import-stash +import akka.actor.UntypedActorWithStash; +//#import-stash + import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; @@ -346,6 +350,31 @@ public class UntypedActorDocTestBase { //#hot-swap-actor + //#stash + public static class ActorWithProtocol extends UntypedActorWithStash { + private Boolean isOpen = false; + public void onReceive(Object msg) { + if (isOpen) { + if (msg.equals("write")) { + // do writing... + } else if (msg.equals("close")) { + unstashAll(); + isOpen = false; + } else { + stash(); + } + } else { + if (msg.equals("open")) { + unstashAll(); + isOpen = true; + } else { + stash(); + } + } + } + } + //#stash + //#watch public static class WatchActor extends UntypedActor { final ActorRef child = this.getContext().actorOf(Props.empty(), "child"); diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 57dbaa5604..a699cb7145 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -558,6 +558,48 @@ well. Use the ``getContext().unbecome`` method from within the Actor. if (message.equals("revert")) getContext().unbecome(); } + +Stash +===== + +The ``UntypedActorWithStash`` class enables an actor to temporarily stash away messages +that can not or should not be handled using the actor's current +behavior. Upon changing the actor's message handler, i.e., right +before invoking ``getContext().become()`` or ``getContext().unbecome()``, all +stashed messages can be "unstashed", thereby prepending them to the actor's +mailbox. This way, the stashed messages can be processed in the same +order as they have been received originally. + +.. warning:: + + Please note that the stash can only be used together with actors + that have a deque-based mailbox. For this, configure the + ``mailbox-type`` of the dispatcher to be a deque-based mailbox, such as + ``akka.dispatch.UnboundedDequeBasedMailbox`` (see :ref:`dispatchers-java`). + +Here is an example of the ``UntypedActorWithStash`` class in action: + +.. includecode:: code/docs/actor/UntypedActorDocTestBase.java#stash + +Invoking ``stash()`` adds the current message (the message that the +actor received last) to the actor's stash. It is typically invoked +when handling the default case in the actor's message handler to stash +messages that aren't handled by the other cases. It is illegal to +stash the same message twice; to do so results in an +``IllegalStateException`` being thrown. The stash may also be bounded +in which case invoking ``stash()`` may lead to a capacity violation, +which results in a ``StashOverflowException``. The capacity of the +stash can be configured using the ``stash-capacity`` setting (an ``Int``) of the +dispatcher's configuration. + +Invoking ``unstashAll()`` enqueues messages from the stash to the +actor's mailbox until the capacity of the mailbox (if any) has been +reached (note that messages from the stash are prepended to the +mailbox). In case a bounded mailbox overflows, a +``MessageQueueAppendFailedException`` is thrown. +The stash is guaranteed to be empty after calling ``unstashAll()``. + + Killing an Actor ================ diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 47a2318e53..d3a53408e2 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -620,6 +620,54 @@ Here's how you use the ``unbecome`` method: } +Stash +===== + +The `Stash` trait enables an actor to temporarily stash away messages +that can not or should not be handled using the actor's current +behavior. Upon changing the actor's message handler, i.e., right +before invoking ``context.become`` or ``context.unbecome``, all +stashed messages can be "unstashed", thereby prepending them to the actor's +mailbox. This way, the stashed messages can be processed in the same +order as they have been received originally. + +.. warning:: + + Please note that the ``Stash`` can only be used together with actors + that have a deque-based mailbox. For this, configure the + ``mailbox-type`` of the dispatcher to be a deque-based mailbox, such as + ``akka.dispatch.UnboundedDequeBasedMailbox`` (see :ref:`dispatchers-scala`). + +Here is an example of the ``Stash`` in action: + +.. includecode:: code/docs/actor/ActorDocSpec.scala#stash + +Invoking ``stash()`` adds the current message (the message that the +actor received last) to the actor's stash. It is typically invoked +when handling the default case in the actor's message handler to stash +messages that aren't handled by the other cases. It is illegal to +stash the same message twice; to do so results in an +``IllegalStateException`` being thrown. The stash may also be bounded +in which case invoking ``stash()`` may lead to a capacity violation, +which results in a ``StashOverflowException``. The capacity of the +stash can be configured using the ``stash-capacity`` setting (an ``Int``) of the +dispatcher's configuration. + +Invoking ``unstashAll()`` enqueues messages from the stash to the +actor's mailbox until the capacity of the mailbox (if any) has been +reached (note that messages from the stash are prepended to the +mailbox). In case a bounded mailbox overflows, a +``MessageQueueAppendFailedException`` is thrown. +The stash is guaranteed to be empty after calling ``unstashAll()``. + +.. warning:: + + Note that the ``Stash`` trait must be mixed into (a subclass of) the + ``Actor`` trait before any trait/class that overrides the ``preRestart`` + callback. This means it's not possible to write + ``Actor with MyActor with Stash`` if ``MyActor`` overrides ``preRestart``. + + Killing an Actor ================ diff --git a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala index ee05e95d42..108aba33b2 100644 --- a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala @@ -300,6 +300,26 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val actor = system.actorOf(Props(new HotSwapActor), name = "hot") } + "using Stash" in { + //#stash + import akka.actor.Stash + class ActorWithProtocol extends Actor with Stash { + def receive = { + case "open" ⇒ + unstashAll() + context.become { + case "write" ⇒ // do writing... + case "close" ⇒ + unstashAll() + context.unbecome() + case msg ⇒ stash() + } + case msg ⇒ stash() + } + } + //#stash + } + "using watch" in { //#watch import akka.actor.{ Actor, Props, Terminated }