diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf
index 73cb24e92c..4da4dd6620 100644
--- a/akka-cluster/src/main/resources/reference.conf
+++ b/akka-cluster/src/main/resources/reference.conf
@@ -8,9 +8,18 @@
akka {
cluster {
- # node to join - the full URI defined by a string on the form of "akka://system@hostname:port"
- # leave as empty string if the node should be a singleton cluster
- node-to-join = ""
+ # Initial contact points of the cluster. Nodes to join at startup if auto-join = on.
+ # The seed nodes also play the role of deputy nodes (the nodes responsible
+ # for breaking network partitions).
+ # Comma separated full URIs defined by a string on the form of "akka://system@hostname:port"
+ # Leave as empty if the node should be a singleton cluster.
+ seed-nodes = []
+
+ # how long to wait for one of the seed nodes to reply to initial join request
+ seed-node-timeout = 5s
+
+ # automatic join the seed-nodes at startup
+ auto-join = on
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
auto-down = on
diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
index db5f21607b..c397d065e5 100644
--- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
@@ -57,7 +57,7 @@ object AccrualFailureDetector {
* to this duration, with a with rather high standard deviation (since environment is unknown
* in the beginning)
*
- * @clock The clock, returning current time in milliseconds, but can be faked for testing
+ * @param clock The clock, returning current time in milliseconds, but can be faked for testing
* purposes. It is only used for measuring intervals (duration).
*
*/
@@ -68,7 +68,7 @@ class AccrualFailureDetector(
val minStdDeviation: Duration,
val acceptableHeartbeatPause: Duration,
val firstHeartbeatEstimate: Duration,
- val clock: () ⇒ Long) extends FailureDetector {
+ val clock: () ⇒ Long = AccrualFailureDetector.realClock) extends FailureDetector {
import AccrualFailureDetector._
@@ -77,8 +77,7 @@ class AccrualFailureDetector(
*/
def this(
system: ActorSystem,
- settings: ClusterSettings,
- clock: () ⇒ Long = AccrualFailureDetector.realClock) =
+ settings: ClusterSettings) =
this(
system,
settings.FailureDetectorThreshold,
@@ -86,7 +85,7 @@ class AccrualFailureDetector(
settings.FailureDetectorAcceptableHeartbeatPause,
settings.FailureDetectorMinStdDeviation,
settings.HeartbeatInterval,
- clock)
+ AccrualFailureDetector.realClock)
private val log = Logging(system, "FailureDetector")
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 7593245587..caecf3906b 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -11,7 +11,7 @@ import akka.dispatch.Await
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.jsr166y.ThreadLocalRandom
-import akka.pattern.ask
+import akka.pattern._
import akka.remote._
import akka.routing._
import akka.util._
@@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable
object ClusterUserAction {
/**
- * Command to join the cluster. Sent when a node (reprsesented by 'address')
+ * Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver).
*/
case class Join(address: Address) extends ClusterMessage
+ /**
+ * Start message of the process to join one of the seed nodes.
+ * The node sends `InitJoin` to all seed nodes, which replies
+ * with `InitJoinAck`. The first reply is used others are discarded.
+ * The node sends `Join` command to the seed node that replied first.
+ */
+ case object JoinSeedNode extends ClusterMessage
+
+ /**
+ * @see JoinSeedNode
+ */
+ case object InitJoin extends ClusterMessage
+
+ /**
+ * @see JoinSeedNode
+ */
+ case class InitJoinAck(address: Address) extends ClusterMessage
+
/**
* Command to leave the cluster.
*/
@@ -197,6 +215,9 @@ case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty,
unreachable: Set[Member] = Set.empty) {
+ def isNonDownUnreachable(address: Address): Boolean =
+ unreachable.exists { m ⇒ m.address == address && m.status != Down }
+
override def toString =
"GossipOverview(seen = [" + seen.mkString(", ") +
"], unreachable = [" + unreachable.mkString(", ") +
@@ -343,11 +364,27 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
val log = Logging(context.system, this)
def receive = {
- case Join(address) ⇒ cluster.joining(address)
- case Down(address) ⇒ cluster.downing(address)
- case Leave(address) ⇒ cluster.leaving(address)
- case Exit(address) ⇒ cluster.exiting(address)
- case Remove(address) ⇒ cluster.removing(address)
+ case JoinSeedNode ⇒ joinSeedNode()
+ case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress)
+ case InitJoinAck(address) ⇒ cluster.join(address)
+ case Join(address) ⇒ cluster.joining(address)
+ case Down(address) ⇒ cluster.downing(address)
+ case Leave(address) ⇒ cluster.leaving(address)
+ case Exit(address) ⇒ cluster.exiting(address)
+ case Remove(address) ⇒ cluster.removing(address)
+ }
+
+ def joinSeedNode(): Unit = {
+ val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress)
+ yield self.path.toStringWithAddress(address)
+ if (seedRoutees.nonEmpty) {
+ implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout)
+ val seedRouter = context.actorOf(
+ Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
+ routees = seedRoutees, within = within.duration)))
+ seedRouter ? InitJoin pipeTo self
+ seedRouter ! PoisonPill
+ }
}
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
@@ -436,15 +473,12 @@ trait ClusterNodeMBean {
/**
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
- * of Gossip with it. Whenever it gets gossip updates it updates the Failure Detector with the liveness
- * information.
+ * of Gossip with it.
*
- * During each of these runs the member initiates gossip exchange according to following rules (as defined in the
- * Cassandra documentation [http://wiki.apache.org/cassandra/ArchitectureGossip]:
+ * During each of these runs the member initiates gossip exchange according to following rules:
*
* 1) Gossip to random live member (if any)
- * 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
- * 3) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
+ * 2) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
*
*
@@ -480,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
- private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress)
-
private val serialization = remote.serialization
private val _isRunning = new AtomicBoolean(true)
@@ -508,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
new AtomicReference[State](State(seenVersionedGossip))
}
- // try to join the node defined in the 'akka.cluster.node-to-join' option
- autoJoin()
+ // try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
+ if (AutoJoin) joinSeedNode()
// ========================================================
// ===================== WORK DAEMONS =====================
@@ -647,6 +679,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
def isAvailable: Boolean = !isUnavailable(state.get)
+ /**
+ * Make it possible to override/configure seedNodes from tests without
+ * specifying in config. Addresses are unknown before startup time.
+ */
+ def seedNodes: IndexedSeq[Address] = SeedNodes
+
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@@ -751,7 +789,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localUnreachable = localGossip.overview.unreachable
val alreadyMember = localMembers.exists(_.address == node)
- val isUnreachable = localUnreachable.exists { m ⇒ m.address == node && m.status != Down }
+ val isUnreachable = localGossip.overview.isNonDownUnreachable(node)
if (!alreadyMember && !isUnreachable) {
@@ -898,46 +936,49 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localState = state.get
val localGossip = localState.latestGossip
- val winningGossip =
- if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
- // a fresh singleton cluster that is joining, no need to merge, use received gossip
- remoteGossip
+ if (!localGossip.overview.isNonDownUnreachable(from)) {
- } else if (remoteGossip.version <> localGossip.version) {
- // concurrent
- val mergedGossip = remoteGossip merge localGossip
- val versionedMergedGossip = mergedGossip :+ vclockNode
+ val winningGossip =
+ if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
+ // a fresh singleton cluster that is joining, no need to merge, use received gossip
+ remoteGossip
- log.debug(
- """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
- remoteGossip, localGossip, versionedMergedGossip)
+ } else if (remoteGossip.version <> localGossip.version) {
+ // concurrent
+ val mergedGossip = remoteGossip merge localGossip
+ val versionedMergedGossip = mergedGossip :+ vclockNode
- versionedMergedGossip
+ log.debug(
+ """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
+ remoteGossip, localGossip, versionedMergedGossip)
- } else if (remoteGossip.version < localGossip.version) {
- // local gossip is newer
- localGossip
+ versionedMergedGossip
- } else {
- // remote gossip is newer
- remoteGossip
+ } else if (remoteGossip.version < localGossip.version) {
+ // local gossip is newer
+ localGossip
+
+ } else {
+ // remote gossip is newer
+ remoteGossip
+ }
+
+ val newJoinInProgress =
+ if (localState.joinInProgress.isEmpty) localState.joinInProgress
+ else localState.joinInProgress --
+ winningGossip.members.map(_.address) --
+ winningGossip.overview.unreachable.map(_.address)
+
+ val newState = localState copy (
+ latestGossip = winningGossip seen selfAddress,
+ joinInProgress = newJoinInProgress)
+
+ // if we won the race then update else try again
+ if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
+ else {
+ log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
+ notifyMembershipChangeListeners(localState, newState)
}
-
- val newJoinInProgress =
- if (localState.joinInProgress.isEmpty) localState.joinInProgress
- else localState.joinInProgress --
- winningGossip.members.map(_.address) --
- winningGossip.overview.unreachable.map(_.address)
-
- val newState = localState copy (
- latestGossip = winningGossip seen selfAddress,
- joinInProgress = newJoinInProgress)
-
- // if we won the race then update else try again
- if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
- else {
- log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
- notifyMembershipChangeListeners(localState, newState)
}
}
@@ -947,9 +988,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
/**
- * Joins the pre-configured contact point.
+ * Joins the pre-configured contact points.
*/
- private def autoJoin(): Unit = nodeToJoin foreach join
+ private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode
/**
* INTERNAL API.
@@ -975,15 +1016,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
peer
}
- /**
- * INTERNAL API.
- */
- private[cluster] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
- (membersSize + unreachableSize) match {
- case 0 ⇒ 0.0
- case sum ⇒ unreachableSize.toDouble / sum
- }
-
/**
* INTERNAL API.
*/
@@ -1019,18 +1051,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// 1. gossip to alive members
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
- // 2. gossip to unreachable members
- if (localUnreachableSize > 0) {
- val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize)
- if (ThreadLocalRandom.current.nextDouble() < probability)
- gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
- }
-
- // 3. gossip to a deputy nodes for facilitating partition healing
+ // 2. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes(localMemberAddresses)
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
- if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) {
- val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes)
+ if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) {
+ val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size)
if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(deputies)
}
@@ -1373,7 +1398,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
*/
private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] =
- addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
+ addresses filterNot (_ == selfAddress) intersect seedNodes
/**
* INTERNAL API.
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
index ba5d2a0b03..08a9b5160d 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -22,10 +22,10 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val FailureDetectorAcceptableHeartbeatPause: Duration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
- final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
- case "" ⇒ None
- case AddressFromURIString(addr) ⇒ Some(addr)
- }
+ final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
+ case AddressFromURIString(addr) ⇒ addr
+ }.toIndexedSeq
+ final val SeedNodeTimeout: Duration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS)
final val PeriodicTasksInitialDelay: Duration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval: Duration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
@@ -33,6 +33,7 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
final val NrOfGossipDaemons: Int = getInt("akka.cluster.nr-of-gossip-daemons")
final val NrOfDeputyNodes: Int = getInt("akka.cluster.nr-of-deputy-nodes")
+ final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join")
final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down")
final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS)
final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala
new file mode 100644
index 0000000000..38f03a4e66
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+package akka.cluster
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfter
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+import akka.util.duration._
+
+object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
+ val seed1 = role("seed1")
+ val seed2 = role("seed2")
+ val ordinary1 = role("ordinary1")
+ val ordinary2 = role("ordinary2")
+
+ commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
+}
+
+class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
+class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
+class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
+class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
+
+abstract class JoinSeedNodeSpec
+ extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec)
+ with MultiNodeClusterSpec {
+
+ import JoinSeedNodeMultiJvmSpec._
+
+ override def seedNodes = IndexedSeq(seed1, seed2)
+
+ "A cluster with configured seed nodes" must {
+ "join the seed nodes at startup" taggedAs LongRunningTest in {
+
+ startClusterNode()
+ enterBarrier("all-started")
+
+ awaitUpConvergence(4)
+
+ enterBarrier("after")
+ }
+ }
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
index 9fd8746923..79e3a67e1e 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
@@ -26,7 +26,6 @@ object MultiNodeClusterSpec {
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 200 ms
periodic-tasks-initial-delay = 300 ms
- nr-of-deputy-nodes = 2
}
akka.test {
single-expect-default = 5 s
@@ -77,10 +76,22 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
throw t
}
+ /**
+ * Make it possible to override/configure seedNodes from tests without
+ * specifying in config. Addresses are unknown before startup time.
+ */
+ protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty
+
/**
* The cluster node instance. Needs to be lazily created.
*/
- private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector)
+ private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
+ override def seedNodes: IndexedSeq[Address] = {
+ val testSeedNodes = MultiNodeClusterSpec.this.seedNodes
+ if (testSeedNodes.isEmpty) super.seedNodes
+ else testSeedNodes map address
+ }
+ }
/**
* Get the cluster node to use.
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala
index cee5efc0db..50656a6a9d 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala
@@ -36,6 +36,8 @@ abstract class NodeJoinSpec
startClusterNode()
}
+ enterBarrier("first-started")
+
runOn(second) {
cluster.join(first)
}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala
index 3c74bc02e2..d661f0cc51 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala
@@ -23,7 +23,8 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
// not MultiNodeClusterSpec.clusterConfig
commonConfig(ConfigFactory.parseString("""
akka.cluster {
- nr-of-deputy-nodes = 0
+ # FIXME remove this (use default) when ticket #2239 has been fixed
+ gossip-interval = 400 ms
}
akka.loglevel = INFO
"""))
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
index e8d68303a0..92e219a540 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
@@ -21,7 +21,8 @@ class ClusterConfigSpec extends AkkaSpec {
FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName)
FailureDetectorMinStdDeviation must be(100 millis)
FailureDetectorAcceptableHeartbeatPause must be(3 seconds)
- NodeToJoin must be(None)
+ SeedNodes must be(Seq.empty[String])
+ SeedNodeTimeout must be(5 seconds)
PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second)
HeartbeatInterval must be(1 second)
@@ -29,7 +30,7 @@ class ClusterConfigSpec extends AkkaSpec {
UnreachableNodesReaperInterval must be(1 second)
JoinTimeout must be(60 seconds)
NrOfGossipDaemons must be(4)
- NrOfDeputyNodes must be(3)
+ AutoJoin must be(true)
AutoDown must be(true)
SchedulerTickDuration must be(33 millis)
SchedulerTicksPerWheel must be(512)
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala
index 229ec7137d..6f70193715 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala
@@ -11,12 +11,13 @@ import akka.actor.ExtendedActorSystem
import akka.actor.Address
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.BeforeAndAfter
+import akka.remote.RemoteActorRefProvider
object ClusterSpec {
val config = """
akka.cluster {
+ auto-join = off
auto-down = off
- nr-of-deputy-nodes = 3
periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
@@ -31,12 +32,24 @@ object ClusterSpec {
class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
import ClusterSpec._
+ val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address
+ val addresses = IndexedSeq(
+ selfAddress,
+ Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1),
+ Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2),
+ Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3),
+ Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4),
+ Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5))
+
val deterministicRandom = new AtomicInteger
val failureDetector = new FailureDetectorPuppet(system)
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
+ // 3 deputy nodes (addresses index 1, 2, 3)
+ override def seedNodes = addresses.slice(1, 4)
+
override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = {
if (addresses.isEmpty) None
else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size))
@@ -50,14 +63,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
testActor ! GossipTo(address)
}
- @volatile
- var _gossipToUnreachableProbablity = 0.0
-
- override def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = {
- if (_gossipToUnreachableProbablity < 0.0) super.gossipToUnreachableProbablity(membersSize, unreachableSize)
- else _gossipToUnreachableProbablity
- }
-
@volatile
var _gossipToDeputyProbablity = 0.0
@@ -68,20 +73,10 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
}
- val selfAddress = cluster.self.address
- val addresses = IndexedSeq(
- selfAddress,
- Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1),
- Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2),
- Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3),
- Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4),
- Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5))
-
def memberStatus(address: Address): Option[MemberStatus] =
cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status }
before {
- cluster._gossipToUnreachableProbablity = 0.0
cluster._gossipToDeputyProbablity = 0.0
addresses foreach failureDetector.remove
deterministicRandom.set(0)
@@ -89,6 +84,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
"A Cluster" must {
+ "use the address of the remote transport" in {
+ cluster.selfAddress must be(selfAddress)
+ cluster.self.address must be(selfAddress)
+ }
+
"initially be singleton cluster and reach convergence immediately" in {
cluster.isSingletonCluster must be(true)
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
@@ -133,17 +133,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
expectNoMsg(1 second)
}
- "use certain probability for gossiping to unreachable node depending on the number of unreachable and live nodes" in {
- cluster._gossipToUnreachableProbablity = -1.0 // use real impl
- cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(9, 1))
- cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(10, 2))
- cluster.gossipToUnreachableProbablity(10, 5) must be < (cluster.gossipToUnreachableProbablity(10, 9))
- cluster.gossipToUnreachableProbablity(0, 10) must be <= (1.0)
- cluster.gossipToUnreachableProbablity(1, 10) must be <= (1.0)
- cluster.gossipToUnreachableProbablity(10, 0) must be(0.0 plusOrMinus (0.0001))
- cluster.gossipToUnreachableProbablity(0, 0) must be(0.0 plusOrMinus (0.0001))
- }
-
"use certain probability for gossiping to deputy node depending on the number of unreachable and live nodes" in {
cluster._gossipToDeputyProbablity = -1.0 // use real impl
cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(9, 1, 2))
@@ -161,7 +150,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
"gossip to duputy node" in {
cluster._gossipToDeputyProbablity = 1.0 // always
- // we have configured 2 deputy nodes
+ // we have configured 3 deputy nodes (seedNodes)
cluster.gossip() // 1 is deputy
cluster.gossip() // 2 is deputy
cluster.gossip() // 3 is deputy
@@ -178,22 +167,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
}
- "gossip to random unreachable node" in {
- val dead = Set(addresses(1))
- dead foreach failureDetector.markNodeAsUnavailable
- cluster._gossipToUnreachableProbablity = 1.0 // always
-
- cluster.reapUnreachableMembers()
- cluster.latestGossip.overview.unreachable.map(_.address) must be(dead)
-
- cluster.gossip()
-
- expectMsg(GossipTo(addresses(2))) // first available
- expectMsg(GossipTo(addresses(1))) // the unavailable
-
- expectNoMsg(1 second)
- }
-
"gossip to random deputy node if number of live nodes is less than number of deputy nodes" in {
cluster._gossipToDeputyProbablity = -1.0 // real impl
// 0 and 2 still alive
diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst
index 0126897dab..1812c33561 100644
--- a/akka-docs/cluster/cluster.rst
+++ b/akka-docs/cluster/cluster.rst
@@ -183,14 +183,20 @@ according to the Failure Detector is considered unreachable. This means setting
the unreachable node status to ``down`` automatically.
+Seed Nodes
+^^^^^^^^^^
+
+The seed nodes are configured contact points for inital join of the cluster.
+When a new node is started started it sends a message to all seed nodes and
+then sends join command to the one that answers first.
+
+It is possible to turn off automatic join.
+
Deputy Nodes
^^^^^^^^^^^^
-After gossip convergence a set of ``deputy`` nodes for the cluster can be
-determined. As with the ``leader``, there is no ``deputy`` election process,
-the deputies can always be recognised deterministically by any node whenever there
-is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number
-of nodes (e.g. starting with the first node after the ``leader``) in sorted order.
+The deputy nodes are the live members of the configured seed nodes.
+It is preferred to use deputy nodes in different racks/data centers.
The nodes defined as ``deputy`` nodes are just regular member nodes whose only
"special role" is to help breaking logical partitions as seen in the gossip
@@ -213,7 +219,7 @@ nodes involved in a gossip exchange.
Periodically, the default is every 1 second, each node chooses another random
node to initiate a round of gossip with. The choice of node is random but can
-also include extra gossiping for unreachable nodes, ``deputy`` nodes, and nodes with
+also include extra gossiping for ``deputy`` nodes, and nodes with
either newer or older state versions.
The gossip overview contains the current state version for all nodes and also a
@@ -228,14 +234,11 @@ During each round of gossip exchange the following process is used:
1. Gossip to random live node (if any)
-2. Gossip to random unreachable node with certain probability depending on the
- number of unreachable and live nodes
-
-3. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
+2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with
certain probability depending on number of unreachable, ``deputy``, and live nodes.
-4. Gossip to random node with newer or older state information, based on the
+3. Gossip to random node with newer or older state information, based on the
current gossip overview, with some probability (?)
The gossiper only sends the gossip overview to the chosen node. The recipient of
diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst
index 8a35a09092..a699cb7145 100644
--- a/akka-docs/java/untyped-actors.rst
+++ b/akka-docs/java/untyped-actors.rst
@@ -575,7 +575,7 @@ order as they have been received originally.
Please note that the stash can only be used together with actors
that have a deque-based mailbox. For this, configure the
``mailbox-type`` of the dispatcher to be a deque-based mailbox, such as
- ``akka.dispatch.UnboundedDequeBasedMailbox``.
+ ``akka.dispatch.UnboundedDequeBasedMailbox`` (see :ref:`dispatchers-java`).
Here is an example of the ``UntypedActorWithStash`` class in action:
@@ -585,7 +585,7 @@ Invoking ``stash()`` adds the current message (the message that the
actor received last) to the actor's stash. It is typically invoked
when handling the default case in the actor's message handler to stash
messages that aren't handled by the other cases. It is illegal to
-stash the same message twice; to do so results in a
+stash the same message twice; to do so results in an
``IllegalStateException`` being thrown. The stash may also be bounded
in which case invoking ``stash()`` may lead to a capacity violation,
which results in a ``StashOverflowException``. The capacity of the
diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst
index 725f1383dc..d3a53408e2 100644
--- a/akka-docs/scala/actors.rst
+++ b/akka-docs/scala/actors.rst
@@ -636,7 +636,7 @@ order as they have been received originally.
Please note that the ``Stash`` can only be used together with actors
that have a deque-based mailbox. For this, configure the
``mailbox-type`` of the dispatcher to be a deque-based mailbox, such as
- ``akka.dispatch.UnboundedDequeBasedMailbox``.
+ ``akka.dispatch.UnboundedDequeBasedMailbox`` (see :ref:`dispatchers-scala`).
Here is an example of the ``Stash`` in action:
@@ -646,7 +646,7 @@ Invoking ``stash()`` adds the current message (the message that the
actor received last) to the actor's stash. It is typically invoked
when handling the default case in the actor's message handler to stash
messages that aren't handled by the other cases. It is illegal to
-stash the same message twice; to do so results in a
+stash the same message twice; to do so results in an
``IllegalStateException`` being thrown. The stash may also be bounded
in which case invoking ``stash()`` may lead to a capacity violation,
which results in a ``StashOverflowException``. The capacity of the
diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
index f454716af0..1fb5cceeb1 100644
--- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
+++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
@@ -13,50 +13,50 @@ akka {
file-based {
# directory below which this queue resides
directory-path = "./_mb"
-
+
# attempting to add an item after the queue reaches this size (in items) will fail.
max-items = 2147483647
-
+
# attempting to add an item after the queue reaches this size (in bytes) will fail.
max-size = 2147483647 bytes
-
+
# attempting to add an item larger than this size (in bytes) will fail.
max-item-size = 2147483647 bytes
-
+
# maximum expiration time for this queue (seconds).
max-age = 0s
-
+
# maximum journal size before the journal should be rotated.
max-journal-size = 16 MiB
-
+
# maximum size of a queue before it drops into read-behind mode.
max-memory-size = 128 MiB
-
+
# maximum overflow (multiplier) of a journal file before we re-create it.
max-journal-overflow = 10
-
+
# absolute maximum size of a journal file until we rebuild it, no matter what.
max-journal-size-absolute = 9223372036854775807 bytes
-
+
# whether to drop older items (instead of newer) when the queue is full
- discard-old-when-full = on
-
+ discard-old-when-full = on
+
# whether to keep a journal file at all
- keep-journal = on
-
+ keep-journal = on
+
# whether to sync the journal after each transaction
sync-journal = off
# circuit breaker configuration
circuit-breaker {
- # maximum number of failures before opening breaker
- max-failures = 3
+ # maximum number of failures before opening breaker
+ max-failures = 3
- # duration of time beyond which a call is assumed to be timed out and considered a failure
- call-timeout = 3 seconds
+ # duration of time beyond which a call is assumed to be timed out and considered a failure
+ call-timeout = 3 seconds
- # duration of time to wait until attempting to reset the breaker during which all calls fail-fast
- reset-timeout = 30 seconds
+ # duration of time to wait until attempting to reset the breaker during which all calls fail-fast
+ reset-timeout = 30 seconds
}
}
}
diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala
index 8d2ce5b897..c703bf0b49 100644
--- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala
@@ -34,7 +34,7 @@ class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val
(new java.io.File(settings.QueuePath)) match {
case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir)
case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir)
- case _ ⇒ //All good
+ case _ ⇒ // All good
}
val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log)
queue.setup // replays journal
diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala
index dff4021d96..27088dfc92 100644
--- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala
+++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala
@@ -16,20 +16,20 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use
val config = initialize
import config._
- val QueuePath: String = getString("directory-path")
- val MaxItems: Int = getInt("max-items")
- val MaxSize: Long = getBytes("max-size")
- val MaxItemSize: Long = getBytes("max-item-size")
- val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS)
- val MaxJournalSize: Long = getBytes("max-journal-size")
- val MaxMemorySize: Long = getBytes("max-memory-size")
- val MaxJournalOverflow: Int = getInt("max-journal-overflow")
- val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
- val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
- val KeepJournal: Boolean = getBoolean("keep-journal")
- val SyncJournal: Boolean = getBoolean("sync-journal")
+ final val QueuePath: String = getString("directory-path")
+ final val MaxItems: Int = getInt("max-items")
+ final val MaxSize: Long = getBytes("max-size")
+ final val MaxItemSize: Long = getBytes("max-item-size")
+ final val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS)
+ final val MaxJournalSize: Long = getBytes("max-journal-size")
+ final val MaxMemorySize: Long = getBytes("max-memory-size")
+ final val MaxJournalOverflow: Int = getInt("max-journal-overflow")
+ final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
+ final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
+ final val KeepJournal: Boolean = getBoolean("keep-journal")
+ final val SyncJournal: Boolean = getBoolean("sync-journal")
- val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures")
- val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
- val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
-}
\ No newline at end of file
+ final val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures")
+ final val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
+ final val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
+}
diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala
index 1a5ddf4a8c..152b29406c 100644
--- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala
+++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala
@@ -68,44 +68,44 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F
def overlay[T](base: ⇒ T) = new OverlaySetting(base)
// attempting to add an item after the queue reaches this size (in items) will fail.
- val maxItems = overlay(PersistentQueue.maxItems)
+ final val maxItems = overlay(PersistentQueue.maxItems)
// attempting to add an item after the queue reaches this size (in bytes) will fail.
- val maxSize = overlay(PersistentQueue.maxSize)
+ final val maxSize = overlay(PersistentQueue.maxSize)
// attempting to add an item larger than this size (in bytes) will fail.
- val maxItemSize = overlay(PersistentQueue.maxItemSize)
+ final val maxItemSize = overlay(PersistentQueue.maxItemSize)
// maximum expiration time for this queue (seconds).
- val maxAge = overlay(PersistentQueue.maxAge)
+ final val maxAge = overlay(PersistentQueue.maxAge)
// maximum journal size before the journal should be rotated.
- val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
+ final val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
// maximum size of a queue before it drops into read-behind mode.
- val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
+ final val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
// maximum overflow (multiplier) of a journal file before we re-create it.
- val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
+ final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
// absolute maximum size of a journal file until we rebuild it, no matter what.
- val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
+ final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
// whether to drop older items (instead of newer) when the queue is full
- val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
+ final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
// whether to keep a journal file at all
- val keepJournal = overlay(PersistentQueue.keepJournal)
+ final val keepJournal = overlay(PersistentQueue.keepJournal)
// whether to sync the journal after each transaction
- val syncJournal = overlay(PersistentQueue.syncJournal)
+ final val syncJournal = overlay(PersistentQueue.syncJournal)
// (optional) move expired items over to this queue
- val expiredQueue = overlay(PersistentQueue.expiredQueue)
+ final val expiredQueue = overlay(PersistentQueue.expiredQueue)
private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log)
- // track tentative removals
+ // track tentative remofinal vals
private var xidCounter: Int = 0
private val openTransactions = new mutable.HashMap[Int, QItem]
def openTransactionCount = openTransactions.size
diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala
index e3bb5858f7..79ece7625d 100644
--- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala
+++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala
@@ -68,11 +68,15 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
* Conventional organization of durable mailbox settings:
*
* {{{
- * my-durable-dispatcher {
- * mailbox-type = "my.durable.mailbox"
- * my-durable-mailbox {
- * setting1 = 1
- * setting2 = 2
+ * akka {
+ * actor {
+ * my-durable-dispatcher {
+ * mailbox-type = "my.durable.mailbox"
+ * my-durable-mailbox {
+ * setting1 = 1
+ * setting2 = 2
+ * }
+ * }
* }
* }
* }}}
diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala
index b6265125b1..eba0fffe63 100644
--- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala
+++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala
@@ -282,6 +282,8 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
import akka.actor.FSM._
import Controller._
+ var roleName: RoleName = null
+
startWith(Initial, None)
whenUnhandled {
@@ -292,12 +294,15 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
}
onTermination {
- case _ ⇒ controller ! ClientDisconnected
+ case _ ⇒
+ controller ! ClientDisconnected(roleName)
+ channel.close()
}
when(Initial, stateTimeout = 10 seconds) {
case Event(Hello(name, addr), _) ⇒
- controller ! NodeInfo(RoleName(name), addr, self)
+ roleName = RoleName(name)
+ controller ! NodeInfo(roleName, addr, self)
goto(Ready)
case Event(x: NetworkOp, _) ⇒
log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x)
@@ -334,10 +339,6 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
}
initialize
-
- onTermination {
- case _ ⇒ channel.close()
- }
}
/**
@@ -517,10 +518,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n)
stay using d.copy(clients = clients + n)
case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒
- if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect")
- (clients find (_.name == name)) match {
- case None ⇒ stay
- case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
+ if (arrived.isEmpty)
+ stay using d.copy(clients = clients.filterNot(_.name == name))
+ else {
+ (clients find (_.name == name)) match {
+ case None ⇒ stay
+ case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
+ }
}
}
diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala
new file mode 100644
index 0000000000..2a709a99a7
--- /dev/null
+++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+package akka.remote.testkit
+
+import akka.testkit.LongRunningTest
+
+object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig {
+ commonConfig(debugConfig(on = false))
+
+ val node1 = role("node1")
+ val node2 = role("node2")
+ val node3 = role("node3")
+ val node4 = role("node4")
+}
+
+class MultiNodeSpecSpecMultiJvmNode1 extends MultiNodeSpecSpec
+class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec
+class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec
+class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec
+
+class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) {
+
+ import MultiNodeSpecMultiJvmSpec._
+
+ def initialParticipants = 4
+
+ "A MultiNodeSpec" must {
+
+ "wait for all nodes to remove themselves before we shut the conductor down" taggedAs LongRunningTest in {
+ enterBarrier("startup")
+ // this test is empty here since it only exercises the shutdown code in the MultiNodeSpec
+ }
+
+ }
+}
diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala
index f418f4a717..8ff95d0831 100644
--- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala
+++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala
@@ -59,14 +59,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
val b = getBarrier()
b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters)
b ! ClientDisconnected(B)
- EventFilter[ClientLost](occurrences = 1) intercept {
- b ! ClientDisconnected(A)
- }
- expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A)))
- EventFilter[BarrierEmpty](occurrences = 1) intercept {
- b ! ClientDisconnected(A)
- }
- expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect")))
+ expectNoMsg(1 second)
+ b ! ClientDisconnected(A)
+ expectNoMsg(1 second)
}
"fail entering barrier when nobody registered" taggedAs TimingTest in {
@@ -264,12 +259,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor)
expectMsg(ToClient(Done))
b ! ClientDisconnected(B)
- EventFilter[ClientLost](occurrences = 1) intercept {
- b ! ClientDisconnected(A)
- }
- EventFilter[BarrierEmpty](occurrences = 1) intercept {
- b ! ClientDisconnected(A)
- }
+ expectNoMsg(1 second)
+ b ! ClientDisconnected(A)
+ expectNoMsg(1 second)
}
"fail entering barrier when nobody registered" taggedAs TimingTest in {
diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala
index 4d65a2084e..25bb8df7dc 100644
--- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala
+++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala
@@ -7,12 +7,13 @@ import java.net.InetSocketAddress
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
-import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem }
+import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem }
import akka.dispatch.Await
import akka.dispatch.Await.Awaitable
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
import akka.testkit.AkkaSpec
-import akka.util.{ Timeout, NonFatal, Duration }
+import akka.util.{ Timeout, NonFatal }
+import akka.util.duration._
/**
* Configure the role names and participants of the test, including configuration settings.
@@ -261,4 +262,16 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
// useful to see which jvm is running which role
log.info("Role [{}] started", myself.name)
+ // wait for all nodes to remove themselves before we shut the conductor down
+ final override def beforeShutdown() = {
+ if (selfIndex == 0) {
+ testConductor.removeNode(myself)
+ within(testConductor.Settings.BarrierTimeout.duration) {
+ awaitCond {
+ testConductor.getNodes.await.filterNot(_ == myself).isEmpty
+ }
+ }
+ }
+ }
+
}
diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
index 424c913662..f9ee989e1c 100644
--- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
+++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
@@ -74,6 +74,7 @@ abstract class AkkaSpec(_system: ActorSystem)
}
final override def afterAll {
+ beforeShutdown()
system.shutdown()
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
@@ -83,6 +84,8 @@ abstract class AkkaSpec(_system: ActorSystem)
protected def atStartup() {}
+ protected def beforeShutdown() {}
+
protected def atTermination() {}
def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) {
diff --git a/project/Sphinx.scala b/project/Sphinx.scala
index 43b7e60358..4707215875 100644
--- a/project/Sphinx.scala
+++ b/project/Sphinx.scala
@@ -87,16 +87,15 @@ object Sphinx {
def pdfTask = (sphinxLatex, streams) map {
(latex, s) => {
- val empty = (latex * "*.pdf").get.isEmpty
+ val pdf = latex / "Akka.pdf"
def failed = sys.error("Failed to build Sphinx pdf documentation.")
- if (empty) {
+ if (!pdf.exists) {
s.log.info("Building Sphinx pdf documentation...")
val logger = newLogger(s)
val exitCode = Process(Seq("make", "all-pdf"), latex) ! logger
if (exitCode != 0) failed
+ s.log.info("Sphinx pdf documentation created: %s" format pdf)
}
- val pdf = (latex * "*.pdf").get.headOption.getOrElse(failed)
- if (empty) s.log.info("Sphinx pdf documentation created: %s" format pdf)
pdf
}
}
diff --git a/repl b/repl
deleted file mode 100644
index 701b021b35..0000000000
--- a/repl
+++ /dev/null
@@ -1,9 +0,0 @@
-import akka.actor._
-import akka.dispatch.{ Future, Promise }
-import com.typesafe.config.ConfigFactory
-val config=ConfigFactory.parseString("akka.daemonic=on")
-val sys=ActorSystem("repl", config.withFallback(ConfigFactory.load())).asInstanceOf[ExtendedActorSystem]
-implicit val ec=sys.dispatcher
-import akka.util.duration._
-import akka.util.Timeout
-implicit val timeout=Timeout(5 seconds)