Merge branch 'master' into wip-2284-heartbeat-scalability-patriknw
Conflicts: akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
This commit is contained in:
commit
668d5a5013
9 changed files with 48 additions and 148 deletions
|
|
@ -214,5 +214,30 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
|
|||
assert(elapsedTimeMs < 2000) // the precision is not ms exact
|
||||
cancellable.cancel()
|
||||
}
|
||||
|
||||
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
|
||||
val startTime = System.nanoTime
|
||||
val n = 33
|
||||
val latch = new TestLatch(n)
|
||||
system.scheduler.schedule(150.millis, 150.millis) {
|
||||
latch.countDown()
|
||||
}
|
||||
Await.ready(latch, 6.seconds)
|
||||
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
|
||||
rate must be(6.66 plusOrMinus (0.4))
|
||||
}
|
||||
|
||||
"not be affected by long running task" taggedAs TimingTest in {
|
||||
val startTime = System.nanoTime
|
||||
val n = 22
|
||||
val latch = new TestLatch(n)
|
||||
system.scheduler.schedule(225.millis, 225.millis) {
|
||||
Thread.sleep(80)
|
||||
latch.countDown()
|
||||
}
|
||||
Await.ready(latch, 6.seconds)
|
||||
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
|
||||
rate must be(4.4 plusOrMinus (0.3))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -194,29 +194,22 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
import context.dispatcher
|
||||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
val gossipTask =
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], GossipInterval) {
|
||||
self ! GossipTick
|
||||
}
|
||||
val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration],
|
||||
GossipInterval, self, GossipTick)
|
||||
|
||||
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
||||
val failureDetectorReaperTask =
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) {
|
||||
self ! ReapUnreachableTick
|
||||
}
|
||||
val failureDetectorReaperTask = scheduler.schedule(PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration],
|
||||
UnreachableNodesReaperInterval, self, ReapUnreachableTick)
|
||||
|
||||
// start periodic leader action management (only applies for the current leader)
|
||||
private val leaderActionsTask =
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], LeaderActionsInterval) {
|
||||
self ! LeaderActionsTick
|
||||
}
|
||||
val leaderActionsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration],
|
||||
LeaderActionsInterval, self, LeaderActionsTick)
|
||||
|
||||
// start periodic publish of current stats
|
||||
private val publishStatsTask: Option[Cancellable] =
|
||||
val publishStatsTask: Option[Cancellable] =
|
||||
if (PublishStatsInterval == Duration.Zero) None
|
||||
else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) {
|
||||
self ! PublishStatsTick
|
||||
})
|
||||
else Some(scheduler.schedule(PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration],
|
||||
PublishStatsInterval, self, PublishStatsTick))
|
||||
|
||||
override def preStart(): Unit = {
|
||||
if (AutoJoin) self ! JoinSeedNodes(SeedNodes)
|
||||
|
|
|
|||
|
|
@ -98,10 +98,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
|
||||
|
||||
// start periodic heartbeat to other nodes in cluster
|
||||
val heartbeatTask =
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
|
||||
self ! HeartbeatTick
|
||||
}
|
||||
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration],
|
||||
HeartbeatInterval, self, HeartbeatTick)
|
||||
|
||||
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
|
||||
|
||||
|
|
|
|||
|
|
@ -62,16 +62,14 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
|||
/**
|
||||
* Start periodic gossip to random nodes in cluster
|
||||
*/
|
||||
val gossipTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], MetricsGossipInterval) {
|
||||
self ! GossipTick
|
||||
}
|
||||
val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration],
|
||||
MetricsGossipInterval, self, GossipTick)
|
||||
|
||||
/**
|
||||
* Start periodic metrics collection
|
||||
*/
|
||||
val metricsTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], MetricsInterval) {
|
||||
self ! MetricsTick
|
||||
}
|
||||
val metricsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration],
|
||||
MetricsInterval, self, MetricsTick)
|
||||
|
||||
override def preStart(): Unit = {
|
||||
cluster.subscribe(self, classOf[MemberEvent])
|
||||
|
|
|
|||
|
|
@ -1,58 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
|
||||
import akka.actor.{ Scheduler, Cancellable }
|
||||
import scala.concurrent.util.Duration
|
||||
import concurrent.ExecutionContext
|
||||
import scala.concurrent.util.FiniteDuration
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object FixedRateTask {
|
||||
def apply(scheduler: Scheduler,
|
||||
initalDelay: FiniteDuration,
|
||||
delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): FixedRateTask =
|
||||
new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f })
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Task to be scheduled periodically at a fixed rate, compensating, on average,
|
||||
* for inaccuracy in scheduler. It will start when constructed, using the
|
||||
* initialDelay.
|
||||
*/
|
||||
private[akka] class FixedRateTask(scheduler: Scheduler,
|
||||
initalDelay: FiniteDuration,
|
||||
delay: FiniteDuration,
|
||||
task: Runnable)(implicit executor: ExecutionContext)
|
||||
extends Runnable with Cancellable {
|
||||
|
||||
private val delayNanos = delay.toNanos
|
||||
private val cancelled = new AtomicBoolean(false)
|
||||
private val counter = new AtomicLong(0L)
|
||||
private val startTime = System.nanoTime + initalDelay.toNanos
|
||||
scheduler.scheduleOnce(initalDelay, this)
|
||||
|
||||
def cancel(): Unit = cancelled.set(true)
|
||||
|
||||
def isCancelled: Boolean = cancelled.get
|
||||
|
||||
override final def run(): Unit = if (!isCancelled) try {
|
||||
task.run()
|
||||
} finally if (!isCancelled) {
|
||||
val nextTime = startTime + delayNanos * counter.incrementAndGet
|
||||
// it's ok to schedule with negative duration, will run asap
|
||||
val nextDelay = Duration(nextTime - System.nanoTime, TimeUnit.NANOSECONDS)
|
||||
try {
|
||||
scheduler.scheduleOnce(nextDelay, this)
|
||||
} catch { case e: IllegalStateException ⇒ /* will happen when scheduler is closed, nothing wrong */ }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -67,7 +67,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo
|
|||
|
||||
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)
|
||||
clusterView.isSingletonCluster must be(true)
|
||||
assertLeader(first)
|
||||
awaitCond(clusterView.isLeader)
|
||||
}
|
||||
|
||||
enterBarrier("after-3")
|
||||
|
|
|
|||
|
|
@ -1,43 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.testkit.TimingTest
|
||||
import akka.testkit.TestLatch
|
||||
import scala.concurrent.Await
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class FixedRateTaskSpec extends AkkaSpec {
|
||||
import system.dispatcher
|
||||
"Task scheduled at fixed rate" must {
|
||||
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
|
||||
val startTime = System.nanoTime
|
||||
val n = 33
|
||||
val latch = new TestLatch(n)
|
||||
FixedRateTask(system.scheduler, 150.millis, 150.millis) {
|
||||
latch.countDown()
|
||||
}
|
||||
Await.ready(latch, 6.seconds)
|
||||
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
|
||||
rate must be(6.66 plusOrMinus (0.4))
|
||||
}
|
||||
|
||||
"compensate for long running task" taggedAs TimingTest in {
|
||||
val startTime = System.nanoTime
|
||||
val n = 22
|
||||
val latch = new TestLatch(n)
|
||||
FixedRateTask(system.scheduler, 225.millis, 225.millis) {
|
||||
Thread.sleep(80)
|
||||
latch.countDown()
|
||||
}
|
||||
Await.ready(latch, 6.seconds)
|
||||
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
|
||||
rate must be(4.4 plusOrMinus (0.3))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -119,7 +119,7 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
|
|||
|
||||
"collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in {
|
||||
val latch = TestLatch(samples)
|
||||
val task = FixedRateTask(system.scheduler, 0 seconds, interval) {
|
||||
val task = system.scheduler.schedule(0 seconds, interval) {
|
||||
val sample = collector.sample
|
||||
assertCreatedUninitialized(sample.metrics)
|
||||
assertExpectedSampleSize(collector.isSigar, window, sample)
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ object AkkaBuild extends Build {
|
|||
|import scala.concurrent.util.duration._
|
||||
|import akka.util.Timeout
|
||||
|val config = ConfigFactory.parseString("akka.stdout-loglevel=INFO,akka.loglevel=DEBUG")
|
||||
|val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=RemoteActorRefProvider").withFallback(config)
|
||||
|val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=akka.remote.RemoteActorRefProvider").withFallback(config)
|
||||
|var system: ActorSystem = null
|
||||
|implicit def _system = system
|
||||
|def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") }
|
||||
|
|
@ -436,11 +436,8 @@ object AkkaBuild extends Build {
|
|||
|
||||
val excludeTestNames = SettingKey[Seq[String]]("exclude-test-names")
|
||||
val excludeTestTags = SettingKey[Set[String]]("exclude-test-tags")
|
||||
val includeTestTags = SettingKey[Set[String]]("include-test-tags")
|
||||
val onlyTestTags = SettingKey[Set[String]]("only-test-tags")
|
||||
|
||||
val defaultExcludedTags = Set("timing", "long-running")
|
||||
|
||||
lazy val defaultMultiJvmOptions: Seq[String] = {
|
||||
import scala.collection.JavaConverters._
|
||||
val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect {
|
||||
|
|
@ -457,14 +454,7 @@ object AkkaBuild extends Build {
|
|||
// for excluding tests by tag use system property: -Dakka.test.tags.exclude=<tag name>
|
||||
// note that it will not be used if you specify -Dakka.test.tags.only
|
||||
lazy val useExcludeTestTags: Set[String] = {
|
||||
if (useOnlyTestTags.isEmpty) defaultExcludedTags ++ systemPropertyAsSeq("akka.test.tags.exclude").toSet
|
||||
else Set.empty
|
||||
}
|
||||
|
||||
// for including tests by tag use system property: -Dakka.test.tags.include=<tag name>
|
||||
// note that it will not be used if you specify -Dakka.test.tags.only
|
||||
lazy val useIncludeTestTags: Set[String] = {
|
||||
if (useOnlyTestTags.isEmpty) systemPropertyAsSeq("akka.test.tags.include").toSet
|
||||
if (useOnlyTestTags.isEmpty) systemPropertyAsSeq("akka.test.tags.exclude").toSet
|
||||
else Set.empty
|
||||
}
|
||||
|
||||
|
|
@ -472,8 +462,7 @@ object AkkaBuild extends Build {
|
|||
lazy val useOnlyTestTags: Set[String] = systemPropertyAsSeq("akka.test.tags.only").toSet
|
||||
|
||||
def executeMultiJvmTests: Boolean = {
|
||||
useOnlyTestTags.contains("long-running") ||
|
||||
!(useExcludeTestTags -- useIncludeTestTags).contains("long-running")
|
||||
useOnlyTestTags.contains("long-running") || !useExcludeTestTags.contains("long-running")
|
||||
}
|
||||
|
||||
def systemPropertyAsSeq(name: String): Seq[String] = {
|
||||
|
|
@ -484,7 +473,7 @@ object AkkaBuild extends Build {
|
|||
val multiNodeEnabled = java.lang.Boolean.getBoolean("akka.test.multi-node")
|
||||
|
||||
lazy val defaultMultiJvmScalatestOptions: Seq[String] = {
|
||||
val excludeTags = (useExcludeTestTags -- useIncludeTestTags).toSeq
|
||||
val excludeTags = useExcludeTestTags.toSeq
|
||||
Seq("-C", "org.scalatest.akka.QuietReporter") ++
|
||||
(if (excludeTags.isEmpty) Seq.empty else Seq("-l", if (multiNodeEnabled) excludeTags.mkString("\"", " ", "\"") else excludeTags.mkString(" "))) ++
|
||||
(if (useOnlyTestTags.isEmpty) Seq.empty else Seq("-n", if (multiNodeEnabled) useOnlyTestTags.mkString("\"", " ", "\"") else useOnlyTestTags.mkString(" ")))
|
||||
|
|
@ -515,15 +504,13 @@ object AkkaBuild extends Build {
|
|||
|
||||
excludeTestNames := useExcludeTestNames,
|
||||
excludeTestTags := useExcludeTestTags,
|
||||
includeTestTags := useIncludeTestTags,
|
||||
onlyTestTags := useOnlyTestTags,
|
||||
|
||||
// add filters for tests excluded by name
|
||||
testOptions in Test <++= excludeTestNames map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) },
|
||||
|
||||
// add arguments for tests excluded by tag - includes override excludes (opposite to scalatest)
|
||||
testOptions in Test <++= (excludeTestTags, includeTestTags) map { (excludes, includes) =>
|
||||
val tags = (excludes -- includes)
|
||||
// add arguments for tests excluded by tag
|
||||
testOptions in Test <++= excludeTestTags map { tags =>
|
||||
if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-l", tags.mkString(" ")))
|
||||
},
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue