From ac03696d88bf76eae5c760582b29f37f180493de Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Tue, 22 Nov 2011 15:26:21 +0100 Subject: [PATCH 1/8] Added test of HWT and parameterized HWT constructor arguments (used in ActorSystem), see #1291 --- .../test/scala/akka/actor/SchedulerSpec.scala | 23 +++++++++++++++++++ .../main/resources/akka-actor-reference.conf | 10 ++++++++ .../main/scala/akka/actor/ActorSystem.scala | 6 +++-- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index b7c0ba6c8a..e1023a20e1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -124,5 +124,28 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { // should be enough time for the ping countdown to recover and reach 6 pings assert(pingLatch.await(4, TimeUnit.SECONDS)) } + + "never fire prematurely" in { + val ticks = new CountDownLatch(300) + + case class Msg(ts: Long) + + val actor = actorOf(new Actor { + def receive = { + case Msg(ts) ⇒ + val now = System.currentTimeMillis + // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred + if (now - ts < 10) throw new RuntimeException("Interval is too small: " + (now - ts)) + ticks.countDown() + } + }) + + (1 to 300).foreach { i ⇒ + collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis()), 10, TimeUnit.MILLISECONDS)) + Thread.sleep(5) + } + + assert(ticks.await(2, TimeUnit.SECONDS) == true) + } } } diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index f6640073eb..6503d73195 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -214,6 +214,16 @@ akka { # } } + # Used to set the behavior of the scheduler. + # Changing the default values may change the system behavior drastically so make sure you know what you're doing! + # + scheduler { + # The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system. + # See http://www.jboss.org/netty/ + tickDuration = 100 # In milliseconds + ticksPerWheel = 512 + } + remote { # FIXME rename to transport layer = "akka.cluster.netty.NettyRemoteSupport" diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 0073176d32..db1998d738 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -93,6 +93,9 @@ object ActorSystem { val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala + val SchedulerTickDuration = getInt("akka.scheduler.tickDuration") + val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") + // FIXME move to cluster extension val ClusterEnabled = EnabledModules exists (_ == "cluster") val ClusterName = getString("akka.cluster.name") @@ -334,8 +337,7 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem { override def numberOfMessages = 0 } - // FIXME make this configurable - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, MILLISECONDS, 512)) + val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, MILLISECONDS, settings.SchedulerTicksPerWheel)) // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) From 7ca5a4161bb279d94dc1de1750cdfb966ca39151 Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Wed, 23 Nov 2011 11:07:16 +0100 Subject: [PATCH 2/8] Introduced Duration instead of explicit value + time unit in HWT, Scheduler and users of the schedule functionality. See #1291 --- .../test/scala/akka/actor/SchedulerSpec.scala | 25 ++++++------ .../netty/akka/util/HashedWheelTimer.java | 30 +++++++-------- .../java/org/jboss/netty/akka/util/Timer.java | 3 +- .../main/resources/akka-actor-reference.conf | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 4 +- .../scala/akka/actor/ActorRefProvider.scala | 30 +++++++-------- .../main/scala/akka/actor/ActorSystem.scala | 7 ++-- .../src/main/scala/akka/actor/FSM.scala | 7 ++-- .../src/main/scala/akka/actor/Scheduler.scala | 38 +++---------------- .../akka/dispatch/AbstractDispatcher.scala | 4 +- .../src/main/scala/akka/dispatch/Future.scala | 8 ++-- .../src/main/scala/akka/remote/Gossiper.scala | 5 ++- .../main/scala/DiningHakkersOnBecome.scala | 7 ++-- 13 files changed, 74 insertions(+), 96 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index e1023a20e1..f1d2ff5655 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -5,6 +5,7 @@ import org.multiverse.api.latches.StandardLatch import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec import akka.testkit.EventFilter +import akka.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { @@ -28,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectCancellable(system.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(tickActor, Tick, Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -49,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), Duration(50, TimeUnit.MILLISECONDS))) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -86,7 +87,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) + val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, Duration(1, TimeUnit.SECONDS))) timeout.cancel() } @@ -114,10 +115,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(actor, Ping, Duration(500, TimeUnit.MILLISECONDS), Duration(500, TimeUnit.MILLISECONDS))) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(actor, Crash, Duration(1000, TimeUnit.MILLISECONDS))) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) @@ -133,19 +134,19 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { val actor = actorOf(new Actor { def receive = { case Msg(ts) ⇒ - val now = System.currentTimeMillis - // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred - if (now - ts < 10) throw new RuntimeException("Interval is too small: " + (now - ts)) + val now = System.nanoTime() + // Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred + if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts)) ticks.countDown() } }) (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis()), 10, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), Duration(10, TimeUnit.MILLISECONDS))) Thread.sleep(5) } - assert(ticks.await(2, TimeUnit.SECONDS) == true) + assert(ticks.await(3, TimeUnit.SECONDS) == true) } } } diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java index 328d2dc39f..5dc9a064f6 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java @@ -16,6 +16,7 @@ package org.jboss.netty.akka.util; import akka.event.LoggingAdapter; +import akka.util.Duration; import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap; import org.jboss.netty.akka.util.internal.ReusableIterator; @@ -96,24 +97,24 @@ public class HashedWheelTimer implements Timer { * @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a * background {@link Thread} which is dedicated to * {@link TimerTask} execution. - * @param tickDuration the duration between tick - * @param unit the time unit of the {@code tickDuration} + * @param duration the duration between ticks * @param ticksPerWheel the size of the wheel */ public HashedWheelTimer( LoggingAdapter logger, ThreadFactory threadFactory, - long tickDuration, TimeUnit unit, int ticksPerWheel) { + Duration duration, + int ticksPerWheel) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } - if (unit == null) { - throw new NullPointerException("unit"); + if (duration == null) { + throw new NullPointerException("duration"); } - if (tickDuration <= 0) { + if (duration.toNanos() <= 0) { throw new IllegalArgumentException( - "tickDuration must be greater than 0: " + tickDuration); + "duration must be greater than 0 ns: " + duration.toNanos()); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException( @@ -128,14 +129,14 @@ public class HashedWheelTimer implements Timer { mask = wheel.length - 1; // Convert tickDuration to milliseconds. - this.tickDuration = tickDuration = unit.toMillis(tickDuration); + this.tickDuration = duration.toMillis(); // Prevent overflow. if (tickDuration == Long.MAX_VALUE || tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException( "tickDuration is too long: " + - tickDuration + ' ' + unit); + tickDuration + ' ' + duration.unit()); } roundDuration = tickDuration * wheel.length; @@ -231,23 +232,22 @@ public class HashedWheelTimer implements Timer { return Collections.unmodifiableSet(unprocessedTimeouts); } - public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + public Timeout newTimeout(TimerTask task, Duration delay) { final long currentTime = System.currentTimeMillis(); if (task == null) { throw new NullPointerException("task"); } - if (unit == null) { - throw new NullPointerException("unit"); + if (delay == null) { + throw new NullPointerException("delay"); } if (!workerThread.isAlive()) { start(); } - delay = unit.toMillis(delay); - HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); - scheduleTimeout(timeout, delay); + HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay.toMillis()); + scheduleTimeout(timeout, delay.toMillis()); return timeout; } diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java index 43ddec9604..b5bd8c6a7c 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java @@ -15,6 +15,7 @@ */ package org.jboss.netty.akka.util; +import akka.util.Duration; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -41,7 +42,7 @@ public interface Timer { * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ - Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + Timeout newTimeout(TimerTask task, Duration delay); /** * Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index 6503d73195..9bc6fd77ce 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -220,7 +220,7 @@ akka { scheduler { # The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system. # See http://www.jboss.org/netty/ - tickDuration = 100 # In milliseconds + tickDuration = 100ms # tick duration in milliseconds (should always be defined in milliseconds) ticksPerWheel = 512 } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 1a1b2b2d13..18915e2fbd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -9,7 +9,7 @@ import scala.annotation.tailrec import scala.collection.immutable.{ Stack, TreeMap } import java.util.concurrent.TimeUnit import akka.event.Logging.{ Debug, Warning, Error } -import akka.util.Helpers +import akka.util.{ Duration, Helpers } /** * The actor context - the view of the actor cell from the actor. @@ -395,7 +395,7 @@ private[akka] class ActorCell( val recvtimeout = receiveTimeout if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed - futureTimeout = Some(system.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) + futureTimeout = Some(system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout.get, TimeUnit.MILLISECONDS))) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d20f4a070e..2b93450a44 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -15,7 +15,7 @@ import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream } import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } import akka.AkkaException import com.eaio.uuid.UUID -import akka.util.{ Switch, Helpers } +import akka.util.{ Duration, Switch, Helpers } /** * Interface for all ActorRef providers to implement. @@ -390,20 +390,20 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)) + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)) + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay)) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)) + def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)) + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)) + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } } @@ -411,11 +411,11 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } - private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = { + private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message - timeout.getTimer.newTimeout(this, delay, timeUnit) + timeout.getTimer.newTimeout(this, delay) } } } @@ -423,11 +423,11 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { private def createSingleTask(f: () ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } - private def createContinuousTask(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): TimerTask = { + private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() - timeout.getTimer.newTimeout(this, delay, timeUnit) + timeout.getTimer.newTimeout(this, delay) } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index db1998d738..0914634ee9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -13,7 +13,6 @@ import com.eaio.uuid.UUID import akka.serialization.Serialization import akka.remote.RemoteAddress import org.jboss.netty.akka.util.HashedWheelTimer -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.NANOSECONDS import java.io.File @@ -22,9 +21,9 @@ import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigFactory import java.lang.reflect.InvocationTargetException -import java.util.concurrent.ConcurrentHashMap import akka.util.{ Helpers, Duration, ReflectiveAccess } import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentHashMap } object ActorSystem { @@ -93,7 +92,7 @@ object ActorSystem { val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala - val SchedulerTickDuration = getInt("akka.scheduler.tickDuration") + val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), TimeUnit.MILLISECONDS) val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") // FIXME move to cluster extension @@ -337,7 +336,7 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem { override def numberOfMessages = 0 } - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, MILLISECONDS, settings.SchedulerTicksPerWheel)) + val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)) // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index cfd9d511ed..d94bf45707 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -7,6 +7,7 @@ import akka.util._ import scala.collection.mutable import akka.event.Logging +import akka.util.Duration._ object FSM { @@ -33,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) + ref = Some(system.scheduler.schedule(actor, this, Duration(timeout.length, timeout.unit), Duration(timeout.length, timeout.unit))) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) + ref = Some(system.scheduler.scheduleOnce(actor, this, Duration(timeout.length, timeout.unit))) } } @@ -522,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) + timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), Duration(t.length, t.unit))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index d12dcb6329..3263f7a5c9 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -9,46 +9,20 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * - * Rework of David Pollak's ActorPing class in the Lift Project - * which is licensed under the Apache 2 License. */ package akka.actor -import java.util.concurrent._ import akka.util.Duration -import akka.AkkaException -case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) { - def this(msg: String) = this(msg, null) -} - -trait JScheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable -} - -abstract class Scheduler extends JScheduler { - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable - - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable - - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = - schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = - schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = - scheduleOnce(receiver, message, delay.length, delay.unit) - - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = - scheduleOnce(f, delay.length, delay.unit) +trait Scheduler { + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable + def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable } trait Cancellable { def cancel(): Unit - def isCancelled: Boolean } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 7998a514f8..208eae51ca 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -135,7 +135,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(shutdownAction, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -210,7 +210,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(this, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) else run() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b57ff39512..e9a3035ea8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) else func(DefaultPromise.this) } } } - val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) onComplete(_ ⇒ timeoutFuture.cancel()) false } else true @@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) } } } - dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) promise } } else this diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 636d8a67ec..24e6c15fab 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -19,6 +19,7 @@ import scala.collection.immutable.Map import scala.annotation.tailrec import com.google.protobuf.ByteString +import akka.util.Duration /** * Interface for node membership change listener. @@ -122,8 +123,8 @@ class Gossiper(remote: Remote) { { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - system.scheduler schedule (() ⇒ initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) - system.scheduler schedule (() ⇒ scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) + system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) + system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) } /** diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 1c86015df4..6c0b2c80be 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -5,6 +5,7 @@ package sample.fsm.dining.become import akka.actor.{ ActorRef, Actor, ActorSystem } import java.util.concurrent.TimeUnit +import akka.util.Duration /* * First we define our messages, they basically speak for themselves @@ -77,7 +78,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) become(eating) - system.scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Think, Duration(5, TimeUnit.SECONDS)) case Busy(chopstick) ⇒ become(thinking) @@ -106,7 +107,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think", name) - system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS)) } //All hakkers start in a non-eating state @@ -114,7 +115,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think", name) become(thinking) - system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS)) } } From e2ad1088b012d4919d38de9c5ed8f8c8f6ac214e Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Wed, 23 Nov 2011 15:15:44 +0100 Subject: [PATCH 3/8] Updated the scheduler implementation after feedback; changed Duration(x, timeunit) to more fluent 'x timeunit' and added ScalaDoc --- .../test/scala/akka/actor/SchedulerSpec.scala | 41 ++++++++++++++----- .../test/scala/akka/config/ConfigSpec.scala | 6 +-- .../akka/dispatch/MailboxConfigSpec.scala | 20 ++++----- .../TellThroughputPerformanceSpec.scala | 5 +-- .../main/resources/akka-actor-reference.conf | 11 +++-- .../scala/akka/actor/ActorRefProvider.scala | 6 +-- .../src/main/scala/akka/actor/FSM.scala | 6 +-- .../src/main/scala/akka/actor/Scheduler.scala | 34 ++++++++++++++- 8 files changed, 88 insertions(+), 41 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index f1d2ff5655..c9a8a60138 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -2,10 +2,10 @@ package akka.actor import org.scalatest.BeforeAndAfterEach import org.multiverse.api.latches.StandardLatch -import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec import akka.testkit.EventFilter -import akka.util.Duration +import akka.util.duration._ +import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { @@ -29,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectCancellable(system.scheduler.schedule(tickActor, Tick, Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -50,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, Duration(50, TimeUnit.MILLISECONDS))) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50 milliseconds)) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -87,7 +87,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, Duration(1, TimeUnit.SECONDS))) + val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second)) timeout.cancel() } @@ -115,10 +115,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, Duration(500, TimeUnit.MILLISECONDS), Duration(500, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds)) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, Duration(1000, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds)) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) @@ -142,11 +142,32 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), Duration(10, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), 10 milliseconds)) Thread.sleep(5) } assert(ticks.await(3, TimeUnit.SECONDS) == true) } + + "schedule with different initial delay and frequency" in { + val ticks = new CountDownLatch(3) + + case object Msg + + val actor = actorOf(new Actor { + def receive = { + case Msg ⇒ ticks.countDown() + } + }) + + val startTime = System.nanoTime() + val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds) + ticks.await(3, TimeUnit.SECONDS) + val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 + + assert(elapsedTimeMs > 1200) + assert(elapsedTimeMs < 1500) // the precision is not ms exact + cancellable.cancel() + } } } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 17ed0112f0..8744e8a6d9 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -5,12 +5,10 @@ package akka.config import akka.testkit.AkkaSpec -import akka.actor.ActorSystem -import java.io.File import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import scala.collection.JavaConverters._ -import java.util.concurrent.TimeUnit +import akka.util.duration._ import akka.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -37,7 +35,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) - settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS)) + settings.DispatcherDefaultShutdown must equal(1 second) getInt("akka.actor.default-dispatcher.throughput") must equal(5) settings.DispatcherThroughput must equal(5) getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 49fec62639..7af8f057d8 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,14 +1,8 @@ package akka.dispatch -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith -import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue } -import java.util.{ Queue } +import java.util.concurrent.{ TimeUnit, BlockingQueue } import akka.util._ -import akka.util.Duration._ -import akka.actor.{ LocalActorRef, Actor } +import akka.util.duration._ import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -23,7 +17,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val q = factory(config) ensureInitialMailboxState(config, q) - implicit val within = Duration(1, TimeUnit.SECONDS) + implicit val within = 1 second val f = spawn { q.dequeue @@ -33,7 +27,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } "create a bounded mailbox with 10 capacity and with push timeout" in { - val config = BoundedMailbox(10, Duration(10, TimeUnit.MILLISECONDS)) + val config = BoundedMailbox(10, 10 milliseconds) val q = factory(config) ensureInitialMailboxState(config, q) @@ -58,11 +52,11 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } "dequeue what was enqueued properly for bounded mailboxes" in { - testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond)) } "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { - testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds)) } } @@ -97,7 +91,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } def testEnqueueDequeue(config: MailboxType) { - implicit val within = Duration(10, TimeUnit.SECONDS) + implicit val within = 10 seconds val q = factory(config) ensureInitialMailboxState(config, q) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 75aef7708f..a49e837ac4 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -1,12 +1,11 @@ package akka.performance.microbench import akka.performance.workbench.PerformanceSpec -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics import akka.actor._ import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } import akka.dispatch._ -import java.util.concurrent.ThreadPoolExecutor.AbortPolicy import akka.util.Duration +import akka.util.duration._ // -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -15,7 +14,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, - Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(maxClients) .build diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index 9bc6fd77ce..b82ab3e4b3 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -218,9 +218,14 @@ akka { # Changing the default values may change the system behavior drastically so make sure you know what you're doing! # scheduler { - # The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system. - # See http://www.jboss.org/netty/ - tickDuration = 100ms # tick duration in milliseconds (should always be defined in milliseconds) + # The HashedWheelTimer (HWT) implementation from Jetty is used as the default scheduler in the system. + # + # HWT does not execute the scheduled tasks on exact time. + # It will, on every tick, check if there are any tasks behind the schedule and execute them. + # You can increase or decrease the accuracy of the execution timing by specifying smaller or larger tick duration. + # If you are scheduling a lot of tasks you should consider increasing the ticks per wheel. + # For more information see: http://www.jboss.org/netty/ + tickDuration = 100ms ticksPerWheel = 512 } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 2b93450a44..a69c08c4e2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -411,6 +411,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } + private def createSingleTask(f: () ⇒ Unit): TimerTask = + new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } + private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { @@ -420,9 +423,6 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { } } - private def createSingleTask(f: () ⇒ Unit): TimerTask = - new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } - private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index d94bf45707..76495843fc 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -34,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, Duration(timeout.length, timeout.unit), Duration(timeout.length, timeout.unit))) + ref = Some(system.scheduler.schedule(actor, this, timeout, timeout)) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, Duration(timeout.length, timeout.unit))) + ref = Some(system.scheduler.scheduleOnce(actor, this, timeout)) } } @@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), Duration(t.length, t.unit))) + timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t)) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 3263f7a5c9..52a63f1730 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -15,14 +15,44 @@ package akka.actor import akka.util.Duration trait Scheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable + /** + * Schedules a message to be sent repeatedly with an initial delay and frequency. + * E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set + * delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) + */ + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable + + /** + * Schedules a function to be run repeatedly with an initial delay and a frequency. + * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set + * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) + */ + def schedule(f: () ⇒ Unit, initialDelay: Duration, frequency: Duration): Cancellable + + /** + * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. + */ def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable + + /** + * Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. + */ def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable + + /** + * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. + */ def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable } trait Cancellable { + /** + * Cancels the underlying scheduled task. + */ def cancel(): Unit + + /** + * Checks if the underlying scheduled task has been cancelled. + */ def isCancelled: Boolean } \ No newline at end of file From ddb7b578e290c4400956feaf1f1cd092881dda59 Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Wed, 23 Nov 2011 16:52:05 +0100 Subject: [PATCH 4/8] Fixed some typos, see #1291 --- akka-actor/src/main/resources/akka-actor-reference.conf | 2 +- .../src/main/scala/DiningHakkersOnBecome.scala | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index b82ab3e4b3..d078db91fa 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -218,7 +218,7 @@ akka { # Changing the default values may change the system behavior drastically so make sure you know what you're doing! # scheduler { - # The HashedWheelTimer (HWT) implementation from Jetty is used as the default scheduler in the system. + # The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler in the system. # # HWT does not execute the scheduled tasks on exact time. # It will, on every tick, check if there are any tasks behind the schedule and execute them. diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 6c0b2c80be..2c23940c9f 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -4,8 +4,7 @@ package sample.fsm.dining.become //http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus/ import akka.actor.{ ActorRef, Actor, ActorSystem } -import java.util.concurrent.TimeUnit -import akka.util.Duration +import akka.util.duration._ /* * First we define our messages, they basically speak for themselves @@ -78,7 +77,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) become(eating) - system.scheduler.scheduleOnce(self, Think, Duration(5, TimeUnit.SECONDS)) + system.scheduler.scheduleOnce(self, Think, 5 seconds) case Busy(chopstick) ⇒ become(thinking) @@ -107,7 +106,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think", name) - system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS)) + system.scheduler.scheduleOnce(self, Eat, 5 seconds) } //All hakkers start in a non-eating state @@ -115,7 +114,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think", name) become(thinking) - system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS)) + system.scheduler.scheduleOnce(self, Eat, 5 seconds) } } From abcaf01525779f0e2d44b09756ff220132905949 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Nov 2011 19:19:08 +0100 Subject: [PATCH 5/8] Removing legacy comment --- .../test/scala/akka/actor/SchedulerSpec.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index b7c0ba6c8a..6edbe64108 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -58,22 +58,6 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { assert(countDownLatch.getCount == 1) } - /** - * ticket #372 - * FIXME rewrite the test so that registry is not used - */ - // "not create actors" in { - // object Ping - // val ticks = new CountDownLatch(1000) - // val actor = actorOf(new Actor { - // def receive = { case Ping ⇒ ticks.countDown } - // }) - // val numActors = system.registry.local.actors.length - // (1 to 1000).foreach(_ ⇒ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))) - // assert(ticks.await(10, TimeUnit.SECONDS)) - // assert(system.registry.local.actors.length === numActors) - // } - /** * ticket #372 */ From 463c6921d6da67c41e789aa144ae7fdd95da1992 Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Thu, 24 Nov 2011 09:23:28 +0100 Subject: [PATCH 6/8] Fixed failing test, see #1291 --- .../src/test/scala/akka/actor/SchedulerSpec.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index c9a8a60138..5830207c48 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -127,26 +127,26 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { } "never fire prematurely" in { - val ticks = new CountDownLatch(300) + val ticks = new CountDownLatch(5000) case class Msg(ts: Long) val actor = actorOf(new Actor { def receive = { case Msg(ts) ⇒ - val now = System.nanoTime() - // Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred - if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts)) + val now = System.currentTimeMillis + // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred + if (now - ts < 10) throw new RuntimeException("Interval is too small: " + (now - ts)) ticks.countDown() } }) - (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), 10 milliseconds)) + (1 to 5000).foreach { i ⇒ + collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis), 10 milliseconds)) Thread.sleep(5) } - assert(ticks.await(3, TimeUnit.SECONDS) == true) + assert(ticks.await(20, TimeUnit.SECONDS) == true) } "schedule with different initial delay and frequency" in { From 35d4d0456d280e416e44a81699d8d17b948ebd4e Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Thu, 24 Nov 2011 09:54:08 +0100 Subject: [PATCH 7/8] Decreased the time to wait in SchedulerSpec to make tests run a wee bit faster, see #1291 --- .../src/test/scala/akka/actor/SchedulerSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 5830207c48..124cc196e9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -127,7 +127,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { } "never fire prematurely" in { - val ticks = new CountDownLatch(5000) + val ticks = new CountDownLatch(300) case class Msg(ts: Long) @@ -141,12 +141,12 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { } }) - (1 to 5000).foreach { i ⇒ + (1 to 300).foreach { i ⇒ collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis), 10 milliseconds)) Thread.sleep(5) } - assert(ticks.await(20, TimeUnit.SECONDS) == true) + assert(ticks.await(3, TimeUnit.SECONDS) == true) } "schedule with different initial delay and frequency" in { From 4a64428e480024c8a8b1882b9a9541b79bbf534f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 24 Nov 2011 10:38:36 +0100 Subject: [PATCH 8/8] Moving the untrustedMode setting into the marshalling ops --- akka-remote/src/main/scala/akka/remote/Remote.scala | 10 ++++++---- .../scala/akka/remote/netty/NettyRemoteSupport.scala | 6 ++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 3e2cc979ab..4e96394141 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -277,6 +277,8 @@ trait RemoteMarshallingOps { def system: ActorSystem + protected def useUntrustedMode: Boolean + def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { val arp = AkkaRemoteProtocol.newBuilder arp.setMessage(rmp) @@ -319,15 +321,15 @@ trait RemoteMarshallingOps { messageBuilder } - def receiveMessage(remoteMessage: RemoteMessage, untrustedMode: Boolean) { + def receiveMessage(remoteMessage: RemoteMessage) { val recipient = remoteMessage.recipient remoteMessage.payload match { case Left(t) ⇒ throw t case Right(r) ⇒ r match { - case _: Terminate ⇒ if (untrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() - case _: AutoReceivedMessage if (untrustedMode) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") - case m ⇒ recipient.!(m)(remoteMessage.sender) + case _: Terminate ⇒ if (useUntrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() + case _: AutoReceivedMessage if (useUntrustedMode) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") + case m ⇒ recipient.!(m)(remoteMessage.sender) } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 914a1e4a71..2aaf195b76 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -292,7 +292,7 @@ class ActiveRemoteClientHandler( } case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ - client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport, client.loader), untrustedMode = false) //TODO FIXME Sensible or not? + client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport, client.loader)) case other ⇒ throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.remoteSupport, client.remoteAddress) @@ -359,6 +359,8 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock + override protected def useUntrustedMode = serverSettings.UNTRUSTED_MODE + protected[akka] def send(message: Any, senderOption: Option[ActorRef], recipientAddress: RemoteAddress, @@ -629,7 +631,7 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { event.getMessage match { case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ - remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader), UNTRUSTED_MODE) + remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader)) case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ val instruction = remote.getInstruction