diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 8d1d2fa965..86cde2fb47 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -214,5 +214,30 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout assert(elapsedTimeMs < 2000) // the precision is not ms exact cancellable.cancel() } + + "adjust for scheduler inaccuracy" taggedAs TimingTest in { + val startTime = System.nanoTime + val n = 33 + val latch = new TestLatch(n) + system.scheduler.schedule(150.millis, 150.millis) { + latch.countDown() + } + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(6.66 plusOrMinus (0.4)) + } + + "not be affected by long running task" taggedAs TimingTest in { + val startTime = System.nanoTime + val n = 22 + val latch = new TestLatch(n) + system.scheduler.schedule(225.millis, 225.millis) { + Thread.sleep(80) + latch.countDown() + } + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(4.4 plusOrMinus (0.3)) + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 1b4398feeb..897985f464 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -194,29 +194,22 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto import context.dispatcher // start periodic gossip to random nodes in cluster - val gossipTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], GossipInterval) { - self ! GossipTick - } + val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], + GossipInterval, self, GossipTick) // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - val failureDetectorReaperTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) { - self ! ReapUnreachableTick - } + val failureDetectorReaperTask = scheduler.schedule(PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], + UnreachableNodesReaperInterval, self, ReapUnreachableTick) // start periodic leader action management (only applies for the current leader) - private val leaderActionsTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], LeaderActionsInterval) { - self ! LeaderActionsTick - } + val leaderActionsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], + LeaderActionsInterval, self, LeaderActionsTick) // start periodic publish of current stats - private val publishStatsTask: Option[Cancellable] = + val publishStatsTask: Option[Cancellable] = if (PublishStatsInterval == Duration.Zero) None - else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) { - self ! PublishStatsTick - }) + else Some(scheduler.schedule(PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], + PublishStatsInterval, self, PublishStatsTick)) override def preStart(): Unit = { if (AutoJoin) self ! JoinSeedNodes(SeedNodes) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 232a890910..561f3c85ba 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -98,10 +98,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor) // start periodic heartbeat to other nodes in cluster - val heartbeatTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { - self ! HeartbeatTick - } + val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], + HeartbeatInterval, self, HeartbeatTick) override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 87bb15450b..7040c322fd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -62,16 +62,14 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto /** * Start periodic gossip to random nodes in cluster */ - val gossipTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], MetricsGossipInterval) { - self ! GossipTick - } + val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], + MetricsGossipInterval, self, GossipTick) /** * Start periodic metrics collection */ - val metricsTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], MetricsInterval) { - self ! MetricsTick - } + val metricsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], + MetricsInterval, self, MetricsTick) override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala deleted file mode 100644 index 9e6eedf659..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } -import akka.actor.{ Scheduler, Cancellable } -import scala.concurrent.util.Duration -import concurrent.ExecutionContext -import scala.concurrent.util.FiniteDuration - -/** - * INTERNAL API - */ -private[akka] object FixedRateTask { - def apply(scheduler: Scheduler, - initalDelay: FiniteDuration, - delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): FixedRateTask = - new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f }) -} - -/** - * INTERNAL API - * - * Task to be scheduled periodically at a fixed rate, compensating, on average, - * for inaccuracy in scheduler. It will start when constructed, using the - * initialDelay. - */ -private[akka] class FixedRateTask(scheduler: Scheduler, - initalDelay: FiniteDuration, - delay: FiniteDuration, - task: Runnable)(implicit executor: ExecutionContext) - extends Runnable with Cancellable { - - private val delayNanos = delay.toNanos - private val cancelled = new AtomicBoolean(false) - private val counter = new AtomicLong(0L) - private val startTime = System.nanoTime + initalDelay.toNanos - scheduler.scheduleOnce(initalDelay, this) - - def cancel(): Unit = cancelled.set(true) - - def isCancelled: Boolean = cancelled.get - - override final def run(): Unit = if (!isCancelled) try { - task.run() - } finally if (!isCancelled) { - val nextTime = startTime + delayNanos * counter.incrementAndGet - // it's ok to schedule with negative duration, will run asap - val nextDelay = Duration(nextTime - System.nanoTime, TimeUnit.NANOSECONDS) - try { - scheduler.scheduleOnce(nextDelay, this) - } catch { case e: IllegalStateException ⇒ /* will happen when scheduler is closed, nothing wrong */ } - } - -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 1bde3bfd3d..55b44a3c76 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -67,7 +67,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) clusterView.isSingletonCluster must be(true) - assertLeader(first) + awaitCond(clusterView.isLeader) } enterBarrier("after-3") diff --git a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala deleted file mode 100644 index e6590cf9c3..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit.AkkaSpec -import scala.concurrent.util.duration._ -import akka.testkit.TimingTest -import akka.testkit.TestLatch -import scala.concurrent.Await - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FixedRateTaskSpec extends AkkaSpec { - import system.dispatcher - "Task scheduled at fixed rate" must { - "adjust for scheduler inaccuracy" taggedAs TimingTest in { - val startTime = System.nanoTime - val n = 33 - val latch = new TestLatch(n) - FixedRateTask(system.scheduler, 150.millis, 150.millis) { - latch.countDown() - } - Await.ready(latch, 6.seconds) - val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis - rate must be(6.66 plusOrMinus (0.4)) - } - - "compensate for long running task" taggedAs TimingTest in { - val startTime = System.nanoTime - val n = 22 - val latch = new TestLatch(n) - FixedRateTask(system.scheduler, 225.millis, 225.millis) { - Thread.sleep(80) - latch.countDown() - } - Await.ready(latch, 6.seconds) - val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis - rate must be(4.4 plusOrMinus (0.3)) - } - } -} - diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala index 2288279a03..ea881f5a71 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala @@ -119,7 +119,7 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl "collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in { val latch = TestLatch(samples) - val task = FixedRateTask(system.scheduler, 0 seconds, interval) { + val task = system.scheduler.schedule(0 seconds, interval) { val sample = collector.sample assertCreatedUninitialized(sample.metrics) assertExpectedSampleSize(collector.isSigar, window, sample) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index adf1b3c9d6..ff1085c32e 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -51,7 +51,7 @@ object AkkaBuild extends Build { |import scala.concurrent.util.duration._ |import akka.util.Timeout |val config = ConfigFactory.parseString("akka.stdout-loglevel=INFO,akka.loglevel=DEBUG") - |val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=RemoteActorRefProvider").withFallback(config) + |val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=akka.remote.RemoteActorRefProvider").withFallback(config) |var system: ActorSystem = null |implicit def _system = system |def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") } @@ -436,11 +436,8 @@ object AkkaBuild extends Build { val excludeTestNames = SettingKey[Seq[String]]("exclude-test-names") val excludeTestTags = SettingKey[Set[String]]("exclude-test-tags") - val includeTestTags = SettingKey[Set[String]]("include-test-tags") val onlyTestTags = SettingKey[Set[String]]("only-test-tags") - val defaultExcludedTags = Set("timing", "long-running") - lazy val defaultMultiJvmOptions: Seq[String] = { import scala.collection.JavaConverters._ val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect { @@ -457,14 +454,7 @@ object AkkaBuild extends Build { // for excluding tests by tag use system property: -Dakka.test.tags.exclude= // note that it will not be used if you specify -Dakka.test.tags.only lazy val useExcludeTestTags: Set[String] = { - if (useOnlyTestTags.isEmpty) defaultExcludedTags ++ systemPropertyAsSeq("akka.test.tags.exclude").toSet - else Set.empty - } - - // for including tests by tag use system property: -Dakka.test.tags.include= - // note that it will not be used if you specify -Dakka.test.tags.only - lazy val useIncludeTestTags: Set[String] = { - if (useOnlyTestTags.isEmpty) systemPropertyAsSeq("akka.test.tags.include").toSet + if (useOnlyTestTags.isEmpty) systemPropertyAsSeq("akka.test.tags.exclude").toSet else Set.empty } @@ -472,8 +462,7 @@ object AkkaBuild extends Build { lazy val useOnlyTestTags: Set[String] = systemPropertyAsSeq("akka.test.tags.only").toSet def executeMultiJvmTests: Boolean = { - useOnlyTestTags.contains("long-running") || - !(useExcludeTestTags -- useIncludeTestTags).contains("long-running") + useOnlyTestTags.contains("long-running") || !useExcludeTestTags.contains("long-running") } def systemPropertyAsSeq(name: String): Seq[String] = { @@ -484,7 +473,7 @@ object AkkaBuild extends Build { val multiNodeEnabled = java.lang.Boolean.getBoolean("akka.test.multi-node") lazy val defaultMultiJvmScalatestOptions: Seq[String] = { - val excludeTags = (useExcludeTestTags -- useIncludeTestTags).toSeq + val excludeTags = useExcludeTestTags.toSeq Seq("-C", "org.scalatest.akka.QuietReporter") ++ (if (excludeTags.isEmpty) Seq.empty else Seq("-l", if (multiNodeEnabled) excludeTags.mkString("\"", " ", "\"") else excludeTags.mkString(" "))) ++ (if (useOnlyTestTags.isEmpty) Seq.empty else Seq("-n", if (multiNodeEnabled) useOnlyTestTags.mkString("\"", " ", "\"") else useOnlyTestTags.mkString(" "))) @@ -515,15 +504,13 @@ object AkkaBuild extends Build { excludeTestNames := useExcludeTestNames, excludeTestTags := useExcludeTestTags, - includeTestTags := useIncludeTestTags, onlyTestTags := useOnlyTestTags, // add filters for tests excluded by name testOptions in Test <++= excludeTestNames map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) }, - // add arguments for tests excluded by tag - includes override excludes (opposite to scalatest) - testOptions in Test <++= (excludeTestTags, includeTestTags) map { (excludes, includes) => - val tags = (excludes -- includes) + // add arguments for tests excluded by tag + testOptions in Test <++= excludeTestTags map { tags => if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-l", tags.mkString(" "))) },