diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index e1023a20e1..f1d2ff5655 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -5,6 +5,7 @@ import org.multiverse.api.latches.StandardLatch import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec import akka.testkit.EventFilter +import akka.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { @@ -28,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectCancellable(system.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(tickActor, Tick, Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -49,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), Duration(50, TimeUnit.MILLISECONDS))) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -86,7 +87,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) + val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, Duration(1, TimeUnit.SECONDS))) timeout.cancel() } @@ -114,10 +115,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(actor, Ping, Duration(500, TimeUnit.MILLISECONDS), Duration(500, TimeUnit.MILLISECONDS))) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(actor, Crash, Duration(1000, TimeUnit.MILLISECONDS))) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) @@ -133,19 +134,19 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { val actor = actorOf(new Actor { def receive = { case Msg(ts) ⇒ - val now = System.currentTimeMillis - // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred - if (now - ts < 10) throw new RuntimeException("Interval is too small: " + (now - ts)) + val now = System.nanoTime() + // Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred + if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts)) ticks.countDown() } }) (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis()), 10, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), Duration(10, TimeUnit.MILLISECONDS))) Thread.sleep(5) } - assert(ticks.await(2, TimeUnit.SECONDS) == true) + assert(ticks.await(3, TimeUnit.SECONDS) == true) } } } diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java index 328d2dc39f..5dc9a064f6 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java @@ -16,6 +16,7 @@ package org.jboss.netty.akka.util; import akka.event.LoggingAdapter; +import akka.util.Duration; import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap; import org.jboss.netty.akka.util.internal.ReusableIterator; @@ -96,24 +97,24 @@ public class HashedWheelTimer implements Timer { * @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a * background {@link Thread} which is dedicated to * {@link TimerTask} execution. - * @param tickDuration the duration between tick - * @param unit the time unit of the {@code tickDuration} + * @param duration the duration between ticks * @param ticksPerWheel the size of the wheel */ public HashedWheelTimer( LoggingAdapter logger, ThreadFactory threadFactory, - long tickDuration, TimeUnit unit, int ticksPerWheel) { + Duration duration, + int ticksPerWheel) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } - if (unit == null) { - throw new NullPointerException("unit"); + if (duration == null) { + throw new NullPointerException("duration"); } - if (tickDuration <= 0) { + if (duration.toNanos() <= 0) { throw new IllegalArgumentException( - "tickDuration must be greater than 0: " + tickDuration); + "duration must be greater than 0 ns: " + duration.toNanos()); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException( @@ -128,14 +129,14 @@ public class HashedWheelTimer implements Timer { mask = wheel.length - 1; // Convert tickDuration to milliseconds. - this.tickDuration = tickDuration = unit.toMillis(tickDuration); + this.tickDuration = duration.toMillis(); // Prevent overflow. if (tickDuration == Long.MAX_VALUE || tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException( "tickDuration is too long: " + - tickDuration + ' ' + unit); + tickDuration + ' ' + duration.unit()); } roundDuration = tickDuration * wheel.length; @@ -231,23 +232,22 @@ public class HashedWheelTimer implements Timer { return Collections.unmodifiableSet(unprocessedTimeouts); } - public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + public Timeout newTimeout(TimerTask task, Duration delay) { final long currentTime = System.currentTimeMillis(); if (task == null) { throw new NullPointerException("task"); } - if (unit == null) { - throw new NullPointerException("unit"); + if (delay == null) { + throw new NullPointerException("delay"); } if (!workerThread.isAlive()) { start(); } - delay = unit.toMillis(delay); - HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); - scheduleTimeout(timeout, delay); + HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay.toMillis()); + scheduleTimeout(timeout, delay.toMillis()); return timeout; } diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java index 43ddec9604..b5bd8c6a7c 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java @@ -15,6 +15,7 @@ */ package org.jboss.netty.akka.util; +import akka.util.Duration; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -41,7 +42,7 @@ public interface Timer { * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ - Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + Timeout newTimeout(TimerTask task, Duration delay); /** * Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index 6503d73195..9bc6fd77ce 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -220,7 +220,7 @@ akka { scheduler { # The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system. # See http://www.jboss.org/netty/ - tickDuration = 100 # In milliseconds + tickDuration = 100ms # tick duration in milliseconds (should always be defined in milliseconds) ticksPerWheel = 512 } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 1a1b2b2d13..18915e2fbd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -9,7 +9,7 @@ import scala.annotation.tailrec import scala.collection.immutable.{ Stack, TreeMap } import java.util.concurrent.TimeUnit import akka.event.Logging.{ Debug, Warning, Error } -import akka.util.Helpers +import akka.util.{ Duration, Helpers } /** * The actor context - the view of the actor cell from the actor. @@ -395,7 +395,7 @@ private[akka] class ActorCell( val recvtimeout = receiveTimeout if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed - futureTimeout = Some(system.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) + futureTimeout = Some(system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout.get, TimeUnit.MILLISECONDS))) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d20f4a070e..2b93450a44 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -15,7 +15,7 @@ import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream } import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } import akka.AkkaException import com.eaio.uuid.UUID -import akka.util.{ Switch, Helpers } +import akka.util.{ Duration, Switch, Helpers } /** * Interface for all ActorRef providers to implement. @@ -390,20 +390,20 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)) + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)) + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay)) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)) + def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)) + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)) + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } } @@ -411,11 +411,11 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } - private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = { + private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message - timeout.getTimer.newTimeout(this, delay, timeUnit) + timeout.getTimer.newTimeout(this, delay) } } } @@ -423,11 +423,11 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { private def createSingleTask(f: () ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } - private def createContinuousTask(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): TimerTask = { + private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() - timeout.getTimer.newTimeout(this, delay, timeUnit) + timeout.getTimer.newTimeout(this, delay) } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index db1998d738..0914634ee9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -13,7 +13,6 @@ import com.eaio.uuid.UUID import akka.serialization.Serialization import akka.remote.RemoteAddress import org.jboss.netty.akka.util.HashedWheelTimer -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.NANOSECONDS import java.io.File @@ -22,9 +21,9 @@ import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigFactory import java.lang.reflect.InvocationTargetException -import java.util.concurrent.ConcurrentHashMap import akka.util.{ Helpers, Duration, ReflectiveAccess } import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentHashMap } object ActorSystem { @@ -93,7 +92,7 @@ object ActorSystem { val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala - val SchedulerTickDuration = getInt("akka.scheduler.tickDuration") + val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), TimeUnit.MILLISECONDS) val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") // FIXME move to cluster extension @@ -337,7 +336,7 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem { override def numberOfMessages = 0 } - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, MILLISECONDS, settings.SchedulerTicksPerWheel)) + val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)) // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index cfd9d511ed..d94bf45707 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -7,6 +7,7 @@ import akka.util._ import scala.collection.mutable import akka.event.Logging +import akka.util.Duration._ object FSM { @@ -33,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) + ref = Some(system.scheduler.schedule(actor, this, Duration(timeout.length, timeout.unit), Duration(timeout.length, timeout.unit))) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) + ref = Some(system.scheduler.scheduleOnce(actor, this, Duration(timeout.length, timeout.unit))) } } @@ -522,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) + timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), Duration(t.length, t.unit))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index d12dcb6329..3263f7a5c9 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -9,46 +9,20 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * - * Rework of David Pollak's ActorPing class in the Lift Project - * which is licensed under the Apache 2 License. */ package akka.actor -import java.util.concurrent._ import akka.util.Duration -import akka.AkkaException -case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) { - def this(msg: String) = this(msg, null) -} - -trait JScheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable -} - -abstract class Scheduler extends JScheduler { - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable - - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable - - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = - schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = - schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = - scheduleOnce(receiver, message, delay.length, delay.unit) - - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = - scheduleOnce(f, delay.length, delay.unit) +trait Scheduler { + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable + def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable } trait Cancellable { def cancel(): Unit - def isCancelled: Boolean } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 7998a514f8..208eae51ca 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -135,7 +135,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(shutdownAction, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -210,7 +210,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(this, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) else run() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b57ff39512..e9a3035ea8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) else func(DefaultPromise.this) } } } - val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) onComplete(_ ⇒ timeoutFuture.cancel()) false } else true @@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) } } } - dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) promise } } else this diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 636d8a67ec..24e6c15fab 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -19,6 +19,7 @@ import scala.collection.immutable.Map import scala.annotation.tailrec import com.google.protobuf.ByteString +import akka.util.Duration /** * Interface for node membership change listener. @@ -122,8 +123,8 @@ class Gossiper(remote: Remote) { { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - system.scheduler schedule (() ⇒ initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) - system.scheduler schedule (() ⇒ scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) + system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) + system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) } /** diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 1c86015df4..6c0b2c80be 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -5,6 +5,7 @@ package sample.fsm.dining.become import akka.actor.{ ActorRef, Actor, ActorSystem } import java.util.concurrent.TimeUnit +import akka.util.Duration /* * First we define our messages, they basically speak for themselves @@ -77,7 +78,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) become(eating) - system.scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Think, Duration(5, TimeUnit.SECONDS)) case Busy(chopstick) ⇒ become(thinking) @@ -106,7 +107,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think", name) - system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS)) } //All hakkers start in a non-eating state @@ -114,7 +115,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think", name) become(thinking) - system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS)) } }