diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala
index 312d733904..b276e4873c 100644
--- a/akka-actor/src/main/scala/akka/util/Duration.scala
+++ b/akka-actor/src/main/scala/akka/util/Duration.scala
@@ -42,10 +42,10 @@ case class Timer(timeout: Duration, throwExceptionOnTimeout: Boolean = false) {
}
}
-case class Deadline(d: Duration) {
- def +(other: Duration): Deadline = copy(d = d + other)
- def -(other: Duration): Deadline = copy(d = d - other)
- def -(other: Deadline): Duration = d - other.d
+case class Deadline(time: Duration) {
+ def +(other: Duration): Deadline = copy(time = time + other)
+ def -(other: Duration): Deadline = copy(time = time - other)
+ def -(other: Deadline): Duration = time - other.time
def timeLeft: Duration = this - Deadline.now
}
object Deadline {
diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf
new file mode 100644
index 0000000000..749c138a26
--- /dev/null
+++ b/akka-cluster/src/main/resources/reference.conf
@@ -0,0 +1,33 @@
+######################################
+# Akka Cluster Reference Config File #
+######################################
+
+# This the reference config file has all the default settings.
+# Make your edits/overrides in your application.conf.
+
+akka {
+
+ cluster {
+ seed-nodes = []
+ seed-node-connection-timeout = 30s
+ max-time-to-retry-joining-cluster = 30s
+
+ # accrual failure detection config
+ failure-detector {
+
+ # defines the failure detector threshold
+ # A low threshold is prone to generate many wrong suspicions but ensures
+ # a quick detection in the event of a real crash. Conversely, a high
+ # threshold generates fewer mistakes but needs more time to detect
+ # actual crashes
+ threshold = 8
+
+ max-sample-size = 1000
+ }
+
+ gossip {
+ initialDelay = 5s
+ frequency = 1s
+ }
+ }
+}
diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
index 892f7a026d..379bf98a6b 100644
--- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
@@ -23,7 +23,7 @@ import System.{ currentTimeMillis ⇒ newTimestamp }
*
* Default threshold is 8, but can be configured in the Akka config.
*/
-class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 1000, system: ActorSystem) {
+class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000) {
private final val PhiFactor = 1.0 / math.log(10.0)
@@ -54,7 +54,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10
*/
@tailrec
final def heartbeat(connection: Address) {
- log.info("Heartbeat from connection [{}] ", connection)
+ log.debug("Heartbeat from connection [{}] ", connection)
val oldState = state.get
val latestTimestamp = oldState.timestamps.get(connection)
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
new file mode 100644
index 0000000000..820290ea14
--- /dev/null
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -0,0 +1,26 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+package akka.cluster
+
+import com.typesafe.config.Config
+import akka.util.Duration
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import akka.config.ConfigurationException
+import scala.collection.JavaConverters._
+import akka.actor.Address
+import akka.actor.AddressExtractor
+
+class ClusterSettings(val config: Config, val systemName: String) {
+ import config._
+ // cluster config section
+ val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
+ val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
+ val SeedNodeConnectionTimeout = Duration(config.getMilliseconds("akka.cluster.seed-node-connection-timeout"), MILLISECONDS)
+ val MaxTimeToRetryJoiningCluster = Duration(config.getMilliseconds("akka.cluster.max-time-to-retry-joining-cluster"), MILLISECONDS)
+ val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS)
+ val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
+ val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
+ case AddressExtractor(addr) ⇒ addr
+ }
+}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala
index e234d6e158..c7b4e21773 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala
@@ -127,29 +127,28 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
currentGossip: Gossip,
memberMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener])
- // configuration
- private val remoteSettings = remote.remoteSettings
+ val remoteSettings = new RemoteSettings(system.settings.config, system.name)
+ val clusterSettings = new ClusterSettings(system.settings.config, system.name)
- private val protocol = "akka" // TODO should this be hardcoded?
- private val address = remote.transport.address
- private val memberFingerprint = address.##
+ val protocol = "akka" // TODO should this be hardcoded?
+ val address = remote.transport.address
- private val serialization = remote.serialization
- private val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize, system)
-
- private val initialDelayForGossip = remoteSettings.InitialDelayForGossip
- private val gossipFrequency = remoteSettings.GossipFrequency
-
- implicit val seedNodeConnectionTimeout = remoteSettings.SeedNodeConnectionTimeout
+ val memberFingerprint = address.##
+ val initialDelayForGossip = clusterSettings.InitialDelayForGossip
+ val gossipFrequency = clusterSettings.GossipFrequency
+ implicit val seedNodeConnectionTimeout = clusterSettings.SeedNodeConnectionTimeout
implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
// seed members
private val seeds: Set[Member] = {
- if (remoteSettings.SeedNodes.isEmpty) throw new ConfigurationException(
+ if (clusterSettings.SeedNodes.isEmpty) throw new ConfigurationException(
"At least one seed member must be defined in the configuration [akka.cluster.seed-members]")
- else remoteSettings.SeedNodes map (address ⇒ Member(address, MemberStatus.Up()))
+ else clusterSettings.SeedNodes map (address ⇒ Member(address, MemberStatus.Up()))
}
+ private val serialization = remote.serialization
+ private val failureDetector = new AccrualFailureDetector(system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
+
private val isRunning = new AtomicBoolean(true)
private val log = Logging(system, "Gossiper")
private val random = SecureRandom.getInstance("SHA1PRNG")
@@ -164,14 +163,11 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
log.info("Starting cluster Gossiper...")
// join the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip)
- joinCluster(Timer(remoteSettings.MaxTimeToRetryJoiningCluster))
+ joinCluster(Deadline(clusterSettings.MaxTimeToRetryJoiningCluster))
// start periodic gossip and cluster scrutinization
- val initateGossipCanceller = system.scheduler.schedule(
- Duration(initialDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(initateGossip())
-
- val scrutinizeCanceller = system.scheduler.schedule(
- Duration(initialDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(scrutinize())
+ val initateGossipCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(initateGossip())
+ val scrutinizeCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(scrutinize())
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
@@ -293,7 +289,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
/**
* Joins the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip).
*/
- private def joinCluster(timer: Timer) {
+ private def joinCluster(deadline: Deadline) {
val seedNodes = seedNodesWithoutMyself // filter out myself
if (!seedNodes.isEmpty) { // if we have seed members to contact
@@ -316,16 +312,16 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
case e: Exception ⇒
log.error(
"Could not join cluster through any of the seed members - retrying for another {} seconds",
- timer.timeLeft.toSeconds)
+ deadline.timeLeft.toSeconds)
// retry joining the cluster unless
// 1. Gossiper is shut down
// 2. The connection time window has expired
if (isRunning.get) {
- if (timer.timeLeft.toMillis > 0) joinCluster(timer) // recur
+ if (deadline.timeLeft.toMillis > 0) joinCluster(deadline) // recur
else throw new RemoteConnectionException(
"Could not join cluster (any of the seed members) - giving up after trying for " +
- timer.timeout.toSeconds + " seconds")
+ deadline.time.toSeconds + " seconds")
}
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
index a6a54de1d9..ef1f1be490 100644
--- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
@@ -8,10 +8,16 @@ import akka.AkkaException
class VectorClockException(message: String) extends AkkaException(message)
+/**
+ * Trait to be extended by classes that wants to be versioned using a VectorClock.
+ */
trait Versioned {
def version: VectorClock
}
+/**
+ * Utility methods for comparing Versioned instances.
+ */
object Versioned {
def latestVersionOf[T <: Versioned](versioned1: T, versioned2: T): T = {
(versioned1.version compare versioned2.version) match {
@@ -24,10 +30,11 @@ object Versioned {
/**
* Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks.
- *
+ * {{
* Reference:
* 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565.
* 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226
+ * }}
*/
case class VectorClock(
versions: Vector[VectorClock.Entry] = Vector.empty[VectorClock.Entry],
@@ -90,9 +97,11 @@ object VectorClock {
/**
* Compare two vector clocks. The outcomes will be one of the following:
*
- * 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j).
- * 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j).
- * 3. Clock 1 is AFTER clock 2 otherwise.
+ * {{
+ * 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j).
+ * 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j).
+ * 3. Clock 1 is AFTER clock 2 otherwise.
+ * }}
*
* @param v1 The first VectorClock
* @param v2 The second VectorClock
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
new file mode 100644
index 0000000000..240d1ad3ff
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster
+
+import akka.testkit.AkkaSpec
+import akka.util.duration._
+import akka.util.Duration
+
+@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
+class ClusterConfigSpec extends AkkaSpec(
+ """
+ akka {
+ actor {
+ provider = "akka.remote.RemoteActorRefProvider"
+ }
+ }
+ """) {
+
+ "Clustering" must {
+
+ "be able to parse generic cluster config elements" in {
+ val settings = new ClusterSettings(system.settings.config, system.name)
+ import settings._
+ FailureDetectorThreshold must be(8)
+ FailureDetectorMaxSampleSize must be(1000)
+ SeedNodeConnectionTimeout must be(30 seconds)
+ MaxTimeToRetryJoiningCluster must be(30 seconds)
+ InitialDelayForGossip must be(5 seconds)
+ GossipFrequency must be(1 second)
+ SeedNodes must be(Set())
+ }
+ }
+}
diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf
index 76f1980615..943b0d7122 100644
--- a/akka-remote/src/main/resources/reference.conf
+++ b/akka-remote/src/main/resources/reference.conf
@@ -118,42 +118,9 @@ akka {
reconnection-time-window = 600s
}
- # The dispatcher used for remote system messages
- compute-grid-dispatcher {
- # defaults to same settings as default-dispatcher
- name = ComputeGridDispatcher
- }
-
# The dispatcher used for the system actor "network-event-sender"
network-event-sender-dispatcher {
type = PinnedDispatcher
}
-
- }
-
- cluster {
- use-cluster = off
-
- seed-nodes = []
- seed-node-connection-timeout = 30s
- max-time-to-retry-joining-cluster = 30s
-
- # accrual failure detection config
- failure-detector {
-
- # defines the failure detector threshold
- # A low threshold is prone to generate many wrong suspicions but ensures
- # a quick detection in the event of a real crash. Conversely, a high
- # threshold generates fewer mistakes but needs more time to detect
- # actual crashes
- threshold = 8
-
- max-sample-size = 1000
- }
-
- gossip {
- initialDelay = 5s
- frequency = 1s
- }
}
}
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
index 0060233246..5c29d22161 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
@@ -13,26 +13,10 @@ import akka.actor.Address
import akka.actor.AddressExtractor
class RemoteSettings(val config: Config, val systemName: String) {
-
import config._
-
val RemoteTransport = getString("akka.remote.transport")
val LogReceive = getBoolean("akka.remote.log-received-messages")
val LogSend = getBoolean("akka.remote.log-sent-messages")
-
- // TODO cluster config will go into akka-cluster/reference.conf when we enable that module
- // cluster config section
- val UseCluster = getBoolean("akka.cluster.use-cluster")
- val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
- val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
- val SeedNodeConnectionTimeout = Duration(config.getMilliseconds("akka.cluster.seed-node-connection-timeout"), MILLISECONDS)
- val MaxTimeToRetryJoiningCluster = Duration(config.getMilliseconds("akka.cluster.max-time-to-retry-joining-cluster"), MILLISECONDS)
- val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS)
- val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
- val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
- case AddressExtractor(addr) ⇒ addr
- }
-
val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val UntrustedMode = getBoolean("akka.remote.untrusted-mode")
}
diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala
index b60b90b900..fbeaff5b6b 100644
--- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala
@@ -28,13 +28,6 @@ class RemoteConfigSpec extends AkkaSpec(
RemoteTransport must be("akka.remote.netty.NettyRemoteTransport")
UntrustedMode must be(false)
RemoteSystemDaemonAckTimeout must be(30 seconds)
-
- FailureDetectorThreshold must be(8)
- FailureDetectorMaxSampleSize must be(1000)
-
- InitialDelayForGossip must be(5 seconds)
- GossipFrequency must be(1 second)
- SeedNodes must be(Set())
}
"be able to parse Netty config elements" in {