diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 9ef93ef05d..827e511308 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -121,70 +121,70 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = { val continuousCancellable = new ContinuousCancellable - val task = new TimerTask with ContinuousScheduling { - def run(timeout: HWTimeout) { - receiver ! message - // Check if the receiver is still alive and kicking before reschedule the task - if (receiver.isTerminated) { - log.debug("Could not reschedule message to be sent because receiving actor has been terminated.") - } else { - scheduleNext(timeout, delay, continuousCancellable) - } - } - } - continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay)) - continuousCancellable + continuousCancellable.init( + hashedWheelTimer.newTimeout( + new TimerTask with ContinuousScheduling { + def run(timeout: HWTimeout) { + receiver ! message + // Check if the receiver is still alive and kicking before reschedule the task + if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor has been terminated.") + else scheduleNext(timeout, delay, continuousCancellable) + } + }, + initialDelay)) } def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = { val continuousCancellable = new ContinuousCancellable - val task = new TimerTask with ContinuousScheduling with Runnable { - def run = f - def run(timeout: HWTimeout) { - dispatcher execute this - scheduleNext(timeout, delay, continuousCancellable) - } - } - continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay)) - continuousCancellable + continuousCancellable.init( + hashedWheelTimer.newTimeout( + new TimerTask with ContinuousScheduling with Runnable { + def run = f + def run(timeout: HWTimeout) { + dispatcher.execute(this) + scheduleNext(timeout, delay, continuousCancellable) + } + }, + initialDelay)) } def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = { val continuousCancellable = new ContinuousCancellable - val task = new TimerTask with ContinuousScheduling { - def run(timeout: HWTimeout) { - dispatcher.execute(runnable) - scheduleNext(timeout, delay, continuousCancellable) - } - } - continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay)) - continuousCancellable + continuousCancellable.init( + hashedWheelTimer.newTimeout( + new TimerTask with ContinuousScheduling { + def run(timeout: HWTimeout) { + dispatcher.execute(runnable) + scheduleNext(timeout, delay, continuousCancellable) + } + }, + initialDelay)) } - def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = { - val task = new TimerTask() { - def run(timeout: HWTimeout) { dispatcher.execute(runnable) } - } - new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay)) - } + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = + new DefaultCancellable( + hashedWheelTimer.newTimeout( + new TimerTask() { + def run(timeout: HWTimeout): Unit = dispatcher.execute(runnable) + }, + delay)) - def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = { - val task = new TimerTask { - def run(timeout: HWTimeout) { - receiver ! message - } - } - new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay)) - } + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = + new DefaultCancellable( + hashedWheelTimer.newTimeout( + new TimerTask { + def run(timeout: HWTimeout): Unit = receiver ! message + }, + delay)) - def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = { - val task = new TimerTask { - def run(timeout: HWTimeout) { - dispatcher.execute(new Runnable { def run = f }) - } - } - new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay)) - } + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = + new DefaultCancellable( + hashedWheelTimer.newTimeout( + new TimerTask with Runnable { + def run = f + def run(timeout: HWTimeout): Unit = dispatcher.execute(this) + }, + delay)) private trait ContinuousScheduling { this: TimerTask ⇒ def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) { @@ -220,8 +220,9 @@ private[akka] class ContinuousCancellable extends Cancellable { @volatile private var cancelled = false - private[akka] def init(initialTimeout: HWTimeout): Unit = { + private[akka] def init(initialTimeout: HWTimeout): this.type = { delegate = initialTimeout + this } private[akka] def swap(newTimeout: HWTimeout): Unit = {