From 0b59640820ae1586fe80248b8826abe242585521 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 31 Jan 2012 15:00:46 +0100 Subject: [PATCH] Fixed bunch of stuff based on feedback on pull request. Moved all cluster config to akka-cluster (and added test). MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/util/Duration.scala | 8 ++-- .../src/main/resources/reference.conf | 33 ++++++++++++++ .../akka/cluster/AccrualFailureDetector.scala | 4 +- .../scala/akka/cluster/ClusterSettings.scala | 26 +++++++++++ .../main/scala/akka/cluster/Gossiper.scala | 44 +++++++++---------- .../main/scala/akka/cluster/VectorClock.scala | 17 +++++-- .../akka/cluster/ClusterConfigSpec.scala | 35 +++++++++++++++ akka-remote/src/main/resources/reference.conf | 33 -------------- .../scala/akka/remote/RemoteSettings.scala | 16 ------- .../scala/akka/remote/RemoteConfigSpec.scala | 7 --- 10 files changed, 133 insertions(+), 90 deletions(-) create mode 100644 akka-cluster/src/main/resources/reference.conf create mode 100644 akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index 312d733904..b276e4873c 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -42,10 +42,10 @@ case class Timer(timeout: Duration, throwExceptionOnTimeout: Boolean = false) { } } -case class Deadline(d: Duration) { - def +(other: Duration): Deadline = copy(d = d + other) - def -(other: Duration): Deadline = copy(d = d - other) - def -(other: Deadline): Duration = d - other.d +case class Deadline(time: Duration) { + def +(other: Duration): Deadline = copy(time = time + other) + def -(other: Duration): Deadline = copy(time = time - other) + def -(other: Deadline): Duration = time - other.time def timeLeft: Duration = this - Deadline.now } object Deadline { diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf new file mode 100644 index 0000000000..749c138a26 --- /dev/null +++ b/akka-cluster/src/main/resources/reference.conf @@ -0,0 +1,33 @@ +###################################### +# Akka Cluster Reference Config File # +###################################### + +# This the reference config file has all the default settings. +# Make your edits/overrides in your application.conf. + +akka { + + cluster { + seed-nodes = [] + seed-node-connection-timeout = 30s + max-time-to-retry-joining-cluster = 30s + + # accrual failure detection config + failure-detector { + + # defines the failure detector threshold + # A low threshold is prone to generate many wrong suspicions but ensures + # a quick detection in the event of a real crash. Conversely, a high + # threshold generates fewer mistakes but needs more time to detect + # actual crashes + threshold = 8 + + max-sample-size = 1000 + } + + gossip { + initialDelay = 5s + frequency = 1s + } + } +} diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 892f7a026d..379bf98a6b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -23,7 +23,7 @@ import System.{ currentTimeMillis ⇒ newTimestamp } *

* Default threshold is 8, but can be configured in the Akka config. */ -class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 1000, system: ActorSystem) { +class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000) { private final val PhiFactor = 1.0 / math.log(10.0) @@ -54,7 +54,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 */ @tailrec final def heartbeat(connection: Address) { - log.info("Heartbeat from connection [{}] ", connection) + log.debug("Heartbeat from connection [{}] ", connection) val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala new file mode 100644 index 0000000000..820290ea14 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.config.ConfigurationException +import scala.collection.JavaConverters._ +import akka.actor.Address +import akka.actor.AddressExtractor + +class ClusterSettings(val config: Config, val systemName: String) { + import config._ + // cluster config section + val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") + val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") + val SeedNodeConnectionTimeout = Duration(config.getMilliseconds("akka.cluster.seed-node-connection-timeout"), MILLISECONDS) + val MaxTimeToRetryJoiningCluster = Duration(config.getMilliseconds("akka.cluster.max-time-to-retry-joining-cluster"), MILLISECONDS) + val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS) + val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) + val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { + case AddressExtractor(addr) ⇒ addr + } +} diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index e234d6e158..c7b4e21773 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -127,29 +127,28 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { currentGossip: Gossip, memberMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener]) - // configuration - private val remoteSettings = remote.remoteSettings + val remoteSettings = new RemoteSettings(system.settings.config, system.name) + val clusterSettings = new ClusterSettings(system.settings.config, system.name) - private val protocol = "akka" // TODO should this be hardcoded? - private val address = remote.transport.address - private val memberFingerprint = address.## + val protocol = "akka" // TODO should this be hardcoded? + val address = remote.transport.address - private val serialization = remote.serialization - private val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize, system) - - private val initialDelayForGossip = remoteSettings.InitialDelayForGossip - private val gossipFrequency = remoteSettings.GossipFrequency - - implicit val seedNodeConnectionTimeout = remoteSettings.SeedNodeConnectionTimeout + val memberFingerprint = address.## + val initialDelayForGossip = clusterSettings.InitialDelayForGossip + val gossipFrequency = clusterSettings.GossipFrequency + implicit val seedNodeConnectionTimeout = clusterSettings.SeedNodeConnectionTimeout implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) // seed members private val seeds: Set[Member] = { - if (remoteSettings.SeedNodes.isEmpty) throw new ConfigurationException( + if (clusterSettings.SeedNodes.isEmpty) throw new ConfigurationException( "At least one seed member must be defined in the configuration [akka.cluster.seed-members]") - else remoteSettings.SeedNodes map (address ⇒ Member(address, MemberStatus.Up())) + else clusterSettings.SeedNodes map (address ⇒ Member(address, MemberStatus.Up())) } + private val serialization = remote.serialization + private val failureDetector = new AccrualFailureDetector(system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + private val isRunning = new AtomicBoolean(true) private val log = Logging(system, "Gossiper") private val random = SecureRandom.getInstance("SHA1PRNG") @@ -164,14 +163,11 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { log.info("Starting cluster Gossiper...") // join the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip) - joinCluster(Timer(remoteSettings.MaxTimeToRetryJoiningCluster)) + joinCluster(Deadline(clusterSettings.MaxTimeToRetryJoiningCluster)) // start periodic gossip and cluster scrutinization - val initateGossipCanceller = system.scheduler.schedule( - Duration(initialDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(initateGossip()) - - val scrutinizeCanceller = system.scheduler.schedule( - Duration(initialDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(scrutinize()) + val initateGossipCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(initateGossip()) + val scrutinizeCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(scrutinize()) /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. @@ -293,7 +289,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { /** * Joins the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip). */ - private def joinCluster(timer: Timer) { + private def joinCluster(deadline: Deadline) { val seedNodes = seedNodesWithoutMyself // filter out myself if (!seedNodes.isEmpty) { // if we have seed members to contact @@ -316,16 +312,16 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { case e: Exception ⇒ log.error( "Could not join cluster through any of the seed members - retrying for another {} seconds", - timer.timeLeft.toSeconds) + deadline.timeLeft.toSeconds) // retry joining the cluster unless // 1. Gossiper is shut down // 2. The connection time window has expired if (isRunning.get) { - if (timer.timeLeft.toMillis > 0) joinCluster(timer) // recur + if (deadline.timeLeft.toMillis > 0) joinCluster(deadline) // recur else throw new RemoteConnectionException( "Could not join cluster (any of the seed members) - giving up after trying for " + - timer.timeout.toSeconds + " seconds") + deadline.time.toSeconds + " seconds") } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index a6a54de1d9..ef1f1be490 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -8,10 +8,16 @@ import akka.AkkaException class VectorClockException(message: String) extends AkkaException(message) +/** + * Trait to be extended by classes that wants to be versioned using a VectorClock. + */ trait Versioned { def version: VectorClock } +/** + * Utility methods for comparing Versioned instances. + */ object Versioned { def latestVersionOf[T <: Versioned](versioned1: T, versioned2: T): T = { (versioned1.version compare versioned2.version) match { @@ -24,10 +30,11 @@ object Versioned { /** * Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks. - * + * {{ * Reference: * 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565. * 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226 + * }} */ case class VectorClock( versions: Vector[VectorClock.Entry] = Vector.empty[VectorClock.Entry], @@ -90,9 +97,11 @@ object VectorClock { /** * Compare two vector clocks. The outcomes will be one of the following: *

- * 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j). - * 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). - * 3. Clock 1 is AFTER clock 2 otherwise. + * {{ + * 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j). + * 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). + * 3. Clock 1 is AFTER clock 2 otherwise. + * }} * * @param v1 The first VectorClock * @param v2 The second VectorClock diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala new file mode 100644 index 0000000000..240d1ad3ff --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit.AkkaSpec +import akka.util.duration._ +import akka.util.Duration + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterConfigSpec extends AkkaSpec( + """ + akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + } + """) { + + "Clustering" must { + + "be able to parse generic cluster config elements" in { + val settings = new ClusterSettings(system.settings.config, system.name) + import settings._ + FailureDetectorThreshold must be(8) + FailureDetectorMaxSampleSize must be(1000) + SeedNodeConnectionTimeout must be(30 seconds) + MaxTimeToRetryJoiningCluster must be(30 seconds) + InitialDelayForGossip must be(5 seconds) + GossipFrequency must be(1 second) + SeedNodes must be(Set()) + } + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 76f1980615..943b0d7122 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -118,42 +118,9 @@ akka { reconnection-time-window = 600s } - # The dispatcher used for remote system messages - compute-grid-dispatcher { - # defaults to same settings as default-dispatcher - name = ComputeGridDispatcher - } - # The dispatcher used for the system actor "network-event-sender" network-event-sender-dispatcher { type = PinnedDispatcher } - - } - - cluster { - use-cluster = off - - seed-nodes = [] - seed-node-connection-timeout = 30s - max-time-to-retry-joining-cluster = 30s - - # accrual failure detection config - failure-detector { - - # defines the failure detector threshold - # A low threshold is prone to generate many wrong suspicions but ensures - # a quick detection in the event of a real crash. Conversely, a high - # threshold generates fewer mistakes but needs more time to detect - # actual crashes - threshold = 8 - - max-sample-size = 1000 - } - - gossip { - initialDelay = 5s - frequency = 1s - } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 0060233246..5c29d22161 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -13,26 +13,10 @@ import akka.actor.Address import akka.actor.AddressExtractor class RemoteSettings(val config: Config, val systemName: String) { - import config._ - val RemoteTransport = getString("akka.remote.transport") val LogReceive = getBoolean("akka.remote.log-received-messages") val LogSend = getBoolean("akka.remote.log-sent-messages") - - // TODO cluster config will go into akka-cluster/reference.conf when we enable that module - // cluster config section - val UseCluster = getBoolean("akka.cluster.use-cluster") - val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") - val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - val SeedNodeConnectionTimeout = Duration(config.getMilliseconds("akka.cluster.seed-node-connection-timeout"), MILLISECONDS) - val MaxTimeToRetryJoiningCluster = Duration(config.getMilliseconds("akka.cluster.max-time-to-retry-joining-cluster"), MILLISECONDS) - val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS) - val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) - val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { - case AddressExtractor(addr) ⇒ addr - } - val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val UntrustedMode = getBoolean("akka.remote.untrusted-mode") } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index b60b90b900..fbeaff5b6b 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -28,13 +28,6 @@ class RemoteConfigSpec extends AkkaSpec( RemoteTransport must be("akka.remote.netty.NettyRemoteTransport") UntrustedMode must be(false) RemoteSystemDaemonAckTimeout must be(30 seconds) - - FailureDetectorThreshold must be(8) - FailureDetectorMaxSampleSize must be(1000) - - InitialDelayForGossip must be(5 seconds) - GossipFrequency must be(1 second) - SeedNodes must be(Set()) } "be able to parse Netty config elements" in {