Introduced Duration instead of explicit value + time unit in HWT, Scheduler and users of the schedule functionality. See #1291

This commit is contained in:
Henrik Engstrom 2011-11-23 11:07:16 +01:00
parent ac03696d88
commit 7ca5a4161b
13 changed files with 74 additions and 96 deletions

View file

@ -5,6 +5,7 @@ import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.EventFilter import akka.testkit.EventFilter
import akka.util.Duration
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
@ -28,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() } def receive = { case Tick countDownLatch.countDown() }
}) })
// run every 50 millisec // run every 50 millisec
collectCancellable(system.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.schedule(tickActor, Tick, Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS)))
// after max 1 second it should be executed at least the 3 times already // after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS)) assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3) val countDownLatch2 = new CountDownLatch(3)
collectCancellable(system.scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.schedule(() countDownLatch2.countDown(), Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS)))
// after max 1 second it should be executed at least the 3 times already // after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(2, TimeUnit.SECONDS)) assert(countDownLatch2.await(2, TimeUnit.SECONDS))
@ -49,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
// run every 50 millisec // run every 50 millisec
collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, Duration(50, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.scheduleOnce(() countDownLatch.countDown(), Duration(50, TimeUnit.MILLISECONDS)))
// after 1 second the wait should fail // after 1 second the wait should fail
assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) assert(countDownLatch.await(2, TimeUnit.SECONDS) == false)
@ -86,7 +87,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
(1 to 10).foreach { i (1 to 10).foreach { i
val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, Duration(1, TimeUnit.SECONDS)))
timeout.cancel() timeout.cancel()
} }
@ -114,10 +115,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
val actor = (supervisor ? props).as[ActorRef].get val actor = (supervisor ? props).as[ActorRef].get
collectCancellable(system.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.schedule(actor, Ping, Duration(500, TimeUnit.MILLISECONDS), Duration(500, TimeUnit.MILLISECONDS)))
// appx 2 pings before crash // appx 2 pings before crash
EventFilter[Exception]("CRASH", occurrences = 1) intercept { EventFilter[Exception]("CRASH", occurrences = 1) intercept {
collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.scheduleOnce(actor, Crash, Duration(1000, TimeUnit.MILLISECONDS)))
} }
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
@ -133,19 +134,19 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
val actor = actorOf(new Actor { val actor = actorOf(new Actor {
def receive = { def receive = {
case Msg(ts) case Msg(ts)
val now = System.currentTimeMillis val now = System.nanoTime()
// Make sure that no message has been dispatched before the scheduled time (10ms) has occurred // Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred
if (now - ts < 10) throw new RuntimeException("Interval is too small: " + (now - ts)) if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts))
ticks.countDown() ticks.countDown()
} }
}) })
(1 to 300).foreach { i (1 to 300).foreach { i
collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis()), 10, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), Duration(10, TimeUnit.MILLISECONDS)))
Thread.sleep(5) Thread.sleep(5)
} }
assert(ticks.await(2, TimeUnit.SECONDS) == true) assert(ticks.await(3, TimeUnit.SECONDS) == true)
} }
} }
} }

View file

@ -16,6 +16,7 @@
package org.jboss.netty.akka.util; package org.jboss.netty.akka.util;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import akka.util.Duration;
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap; import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap;
import org.jboss.netty.akka.util.internal.ReusableIterator; import org.jboss.netty.akka.util.internal.ReusableIterator;
@ -96,24 +97,24 @@ public class HashedWheelTimer implements Timer {
* @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a * @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a
* background {@link Thread} which is dedicated to * background {@link Thread} which is dedicated to
* {@link TimerTask} execution. * {@link TimerTask} execution.
* @param tickDuration the duration between tick * @param duration the duration between ticks
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel * @param ticksPerWheel the size of the wheel
*/ */
public HashedWheelTimer( public HashedWheelTimer(
LoggingAdapter logger, LoggingAdapter logger,
ThreadFactory threadFactory, ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) { Duration duration,
int ticksPerWheel) {
if (threadFactory == null) { if (threadFactory == null) {
throw new NullPointerException("threadFactory"); throw new NullPointerException("threadFactory");
} }
if (unit == null) { if (duration == null) {
throw new NullPointerException("unit"); throw new NullPointerException("duration");
} }
if (tickDuration <= 0) { if (duration.toNanos() <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"tickDuration must be greater than 0: " + tickDuration); "duration must be greater than 0 ns: " + duration.toNanos());
} }
if (ticksPerWheel <= 0) { if (ticksPerWheel <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -128,14 +129,14 @@ public class HashedWheelTimer implements Timer {
mask = wheel.length - 1; mask = wheel.length - 1;
// Convert tickDuration to milliseconds. // Convert tickDuration to milliseconds.
this.tickDuration = tickDuration = unit.toMillis(tickDuration); this.tickDuration = duration.toMillis();
// Prevent overflow. // Prevent overflow.
if (tickDuration == Long.MAX_VALUE || if (tickDuration == Long.MAX_VALUE ||
tickDuration >= Long.MAX_VALUE / wheel.length) { tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"tickDuration is too long: " + "tickDuration is too long: " +
tickDuration + ' ' + unit); tickDuration + ' ' + duration.unit());
} }
roundDuration = tickDuration * wheel.length; roundDuration = tickDuration * wheel.length;
@ -231,23 +232,22 @@ public class HashedWheelTimer implements Timer {
return Collections.unmodifiableSet(unprocessedTimeouts); return Collections.unmodifiableSet(unprocessedTimeouts);
} }
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { public Timeout newTimeout(TimerTask task, Duration delay) {
final long currentTime = System.currentTimeMillis(); final long currentTime = System.currentTimeMillis();
if (task == null) { if (task == null) {
throw new NullPointerException("task"); throw new NullPointerException("task");
} }
if (unit == null) { if (delay == null) {
throw new NullPointerException("unit"); throw new NullPointerException("delay");
} }
if (!workerThread.isAlive()) { if (!workerThread.isAlive()) {
start(); start();
} }
delay = unit.toMillis(delay); HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay.toMillis());
HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); scheduleTimeout(timeout, delay.toMillis());
scheduleTimeout(timeout, delay);
return timeout; return timeout;
} }

View file

@ -15,6 +15,7 @@
*/ */
package org.jboss.netty.akka.util; package org.jboss.netty.akka.util;
import akka.util.Duration;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,7 +42,7 @@ public interface Timer {
* @throws IllegalStateException if this timer has been * @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already * {@linkplain #stop() stopped} already
*/ */
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); Timeout newTimeout(TimerTask task, Duration delay);
/** /**
* Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all * Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all

View file

@ -220,7 +220,7 @@ akka {
scheduler { scheduler {
# The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system. # The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system.
# See http://www.jboss.org/netty/ # See http://www.jboss.org/netty/
tickDuration = 100 # In milliseconds tickDuration = 100ms # tick duration in milliseconds (should always be defined in milliseconds)
ticksPerWheel = 512 ticksPerWheel = 512
} }

View file

@ -9,7 +9,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.{ Stack, TreeMap } import scala.collection.immutable.{ Stack, TreeMap }
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.event.Logging.{ Debug, Warning, Error } import akka.event.Logging.{ Debug, Warning, Error }
import akka.util.Helpers import akka.util.{ Duration, Helpers }
/** /**
* The actor context - the view of the actor cell from the actor. * The actor context - the view of the actor cell from the actor.
@ -395,7 +395,7 @@ private[akka] class ActorCell(
val recvtimeout = receiveTimeout val recvtimeout = receiveTimeout
if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed //Only reschedule if desired and there are currently no more messages to be processed
futureTimeout = Some(system.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) futureTimeout = Some(system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout.get, TimeUnit.MILLISECONDS)))
} }
} }

View file

@ -15,7 +15,7 @@ import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream }
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter }
import akka.AkkaException import akka.AkkaException
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.util.{ Switch, Helpers } import akka.util.{ Duration, Switch, Helpers }
/** /**
* Interface for all ActorRef providers to implement. * Interface for all ActorRef providers to implement.
@ -390,20 +390,20 @@ class LocalDeathWatch extends DeathWatch with ActorClassification {
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay))
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay))
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay))
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay))
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable = def scheduleOnce(f: () Unit, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay))
private def createSingleTask(runnable: Runnable): TimerTask = private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } } new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } }
@ -411,11 +411,11 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = private def createSingleTask(receiver: ActorRef, message: Any): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = { private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = {
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
receiver ! message receiver ! message
timeout.getTimer.newTimeout(this, delay, timeUnit) timeout.getTimer.newTimeout(this, delay)
} }
} }
} }
@ -423,11 +423,11 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
private def createSingleTask(f: () Unit): TimerTask = private def createSingleTask(f: () Unit): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } }
private def createContinuousTask(f: () Unit, delay: Long, timeUnit: TimeUnit): TimerTask = { private def createContinuousTask(f: () Unit, delay: Duration): TimerTask = {
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
f() f()
timeout.getTimer.newTimeout(this, delay, timeUnit) timeout.getTimer.newTimeout(this, delay)
} }
} }
} }

View file

@ -13,7 +13,6 @@ import com.eaio.uuid.UUID
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.remote.RemoteAddress import akka.remote.RemoteAddress
import org.jboss.netty.akka.util.HashedWheelTimer import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.TimeUnit.NANOSECONDS
import java.io.File import java.io.File
@ -22,9 +21,9 @@ import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ConcurrentHashMap
import akka.util.{ Helpers, Duration, ReflectiveAccess } import akka.util.{ Helpers, Duration, ReflectiveAccess }
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentHashMap }
object ActorSystem { object ActorSystem {
@ -93,7 +92,7 @@ object ActorSystem {
val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala
val SchedulerTickDuration = getInt("akka.scheduler.tickDuration") val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), TimeUnit.MILLISECONDS)
val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
// FIXME move to cluster extension // FIXME move to cluster extension
@ -337,7 +336,7 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem {
override def numberOfMessages = 0 override def numberOfMessages = 0
} }
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, MILLISECONDS, settings.SchedulerTicksPerWheel)) val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel))
// TODO correctly pull its config from the config // TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))

View file

@ -7,6 +7,7 @@ import akka.util._
import scala.collection.mutable import scala.collection.mutable
import akka.event.Logging import akka.event.Logging
import akka.util.Duration._
object FSM { object FSM {
@ -33,9 +34,9 @@ object FSM {
def schedule(actor: ActorRef, timeout: Duration) { def schedule(actor: ActorRef, timeout: Duration) {
if (repeat) { if (repeat) {
ref = Some(system.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) ref = Some(system.scheduler.schedule(actor, this, Duration(timeout.length, timeout.unit), Duration(timeout.length, timeout.unit)))
} else { } else {
ref = Some(system.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) ref = Some(system.scheduler.scheduleOnce(actor, this, Duration(timeout.length, timeout.unit)))
} }
} }
@ -522,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement {
if (timeout.isDefined) { if (timeout.isDefined) {
val t = timeout.get val t = timeout.get
if (t.finite_? && t.length >= 0) { if (t.finite_? && t.length >= 0) {
timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), Duration(t.length, t.unit)))
} }
} }
} }

View file

@ -9,46 +9,20 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*
* Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License.
*/ */
package akka.actor package akka.actor
import java.util.concurrent._
import akka.util.Duration import akka.util.Duration
import akka.AkkaException
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) { trait Scheduler {
def this(msg: String) = this(msg, null) def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable
} def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable
def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable
trait JScheduler { def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable def scheduleOnce(f: () Unit, delay: Duration): Cancellable
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable
}
abstract class Scheduler extends JScheduler {
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable =
schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable =
schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable =
scheduleOnce(receiver, message, delay.length, delay.unit)
def scheduleOnce(f: () Unit, delay: Duration): Cancellable =
scheduleOnce(f, delay.length, delay.unit)
} }
trait Cancellable { trait Cancellable {
def cancel(): Unit def cancel(): Unit
def isCancelled: Boolean def isCancelled: Boolean
} }

View file

@ -135,7 +135,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
shutdownScheduleUpdater.get(this) match { shutdownScheduleUpdater.get(this) match {
case UNSCHEDULED case UNSCHEDULED
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
scheduler.scheduleOnce(shutdownAction, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) scheduler.scheduleOnce(shutdownAction, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS))
() ()
} else ifSensibleToDoSoThenScheduleShutdown() } else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED case SCHEDULED
@ -210,7 +210,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
} }
case RESCHEDULED case RESCHEDULED
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
scheduler.scheduleOnce(this, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) scheduler.scheduleOnce(this, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS))
else run() else run()
} }
} }

View file

@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable { val runnable = new Runnable {
def run() { def run() {
if (!isCompleted) { if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
else func(DefaultPromise.this) else func(DefaultPromise.this)
} }
} }
} }
val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS))
onComplete(_ timeoutFuture.cancel()) onComplete(_ timeoutFuture.cancel())
false false
} else true } else true
@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable { val runnable = new Runnable {
def run() { def run() {
if (!isCompleted) { if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
else promise complete (try { Right(fallback) } catch { case e Left(e) }) else promise complete (try { Right(fallback) } catch { case e Left(e) })
} }
} }
} }
dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS))
promise promise
} }
} else this } else this

View file

@ -19,6 +19,7 @@ import scala.collection.immutable.Map
import scala.annotation.tailrec import scala.annotation.tailrec
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import akka.util.Duration
/** /**
* Interface for node membership change listener. * Interface for node membership change listener.
@ -122,8 +123,8 @@ class Gossiper(remote: Remote) {
{ {
// start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between
system.scheduler schedule (() initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) system.scheduler schedule (() initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
system.scheduler schedule (() scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) system.scheduler schedule (() scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
} }
/** /**

View file

@ -5,6 +5,7 @@ package sample.fsm.dining.become
import akka.actor.{ ActorRef, Actor, ActorSystem } import akka.actor.{ ActorRef, Actor, ActorSystem }
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.util.Duration
/* /*
* First we define our messages, they basically speak for themselves * First we define our messages, they basically speak for themselves
@ -77,7 +78,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Taken(`chopstickToWaitFor`) case Taken(`chopstickToWaitFor`)
println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address)
become(eating) become(eating)
system.scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS) system.scheduler.scheduleOnce(self, Think, Duration(5, TimeUnit.SECONDS))
case Busy(chopstick) case Busy(chopstick)
become(thinking) become(thinking)
@ -106,7 +107,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
left ! Put(self) left ! Put(self)
right ! Put(self) right ! Put(self)
println("%s puts down his chopsticks and starts to think", name) println("%s puts down his chopsticks and starts to think", name)
system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS))
} }
//All hakkers start in a non-eating state //All hakkers start in a non-eating state
@ -114,7 +115,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Think case Think
println("%s starts to think", name) println("%s starts to think", name)
become(thinking) become(thinking)
system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) system.scheduler.scheduleOnce(self, Eat, Duration(5, TimeUnit.SECONDS))
} }
} }