diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 4640951322..4f70bc52f5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -29,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 milliseconds - collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(countDownLatch2.countDown())) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -44,7 +44,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { "should stop continuous scheduling if the receiving actor has been terminated" in { // run immediately and then every 100 milliseconds - collectCancellable(system.scheduler.schedule(testActor, "msg", 0 milliseconds, 100 milliseconds)) + collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, testActor, "msg")) // stop the actor and, hence, the continuous messaging from happening testActor ! PoisonPill @@ -59,14 +59,18 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) - // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds)) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50 milliseconds)) + // run after 300 millisec + collectCancellable(system.scheduler.scheduleOnce(300 milliseconds, tickActor, Tick)) + collectCancellable(system.scheduler.scheduleOnce(300 milliseconds)(countDownLatch.countDown())) + + // should not be run immediately + assert(countDownLatch.await(100, TimeUnit.MILLISECONDS) == false) + countDownLatch.getCount must be(3) // after 1 second the wait should fail - assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) + assert(countDownLatch.await(1, TimeUnit.SECONDS) == false) // should still be 1 left - assert(countDownLatch.getCount == 1) + countDownLatch.getCount must be(1) } /** @@ -81,7 +85,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second)) + val timeout = collectCancellable(system.scheduler.scheduleOnce(1 second, actor, Ping)) timeout.cancel() } @@ -109,10 +113,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds)) + collectCancellable(system.scheduler.schedule(500 milliseconds, 500 milliseconds, actor, Ping)) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash)) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) @@ -136,7 +140,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime), 10 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(10 milliseconds, actor, Msg(System.nanoTime))) Thread.sleep(5) } @@ -155,7 +159,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val startTime = System.nanoTime() - val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds) + val cancellable = system.scheduler.schedule(1 second, 100 milliseconds, actor, Msg) ticks.await(3, TimeUnit.SECONDS) val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6d41aafc5b..c7a37de589 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -404,7 +404,7 @@ private[akka] class ActorCell( if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout._1, TimeUnit.MILLISECONDS))) + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) } else cancelReceiveTimeout() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index c397d67be6..f6e0c19adc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -417,19 +417,19 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { */ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable { - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) + def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, receiver, message), initialDelay)) - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay)) + def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay)) - def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) private def createSingleTask(runnable: Runnable): TimerTask = @@ -446,14 +446,14 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } - private def createSingleTask(f: () ⇒ Unit): TimerTask = + private def createSingleTask(f: ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(f) + dispatcher.dispatchTask(() ⇒ f) } } - private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { + private def createContinuousTask(delay: Duration, receiver: ActorRef, message: Any): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { // Check if the receiver is still alive and kicking before sending it a message and reschedule the task @@ -467,10 +467,10 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } - private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { + private def createContinuousTask(delay: Duration, f: ⇒ Unit): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(f) + dispatcher.dispatchTask(() ⇒ f) timeout.getTimer.newTimeout(this, delay) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 9d064834bd..fef236d5fd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -398,7 +398,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor case x: Closeable ⇒ // Let dispatchers shutdown first. // Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay. - x.scheduleOnce(() ⇒ { x.close(); dispatcher.shutdown() }, settings.DispatcherDefaultShutdown * 4) + x.scheduleOnce(settings.DispatcherDefaultShutdown * 4) { + x.close() + dispatcher.shutdown() + } case _ ⇒ } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 76495843fc..0dc961ab0b 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -34,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, timeout, timeout)) + ref = Some(system.scheduler.schedule(timeout, timeout, actor, this)) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, timeout)) + ref = Some(system.scheduler.scheduleOnce(timeout, actor, this)) } } @@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t)) + timeoutFuture = Some(system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 52a63f1730..5b32a86a60 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -20,29 +20,29 @@ trait Scheduler { * E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set * delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) */ - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable + def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable /** * Schedules a function to be run repeatedly with an initial delay and a frequency. * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) */ - def schedule(f: () ⇒ Unit, initialDelay: Duration, frequency: Duration): Cancellable + def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable /** * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. */ - def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable /** * Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. */ - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable /** * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. */ - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable } trait Cancellable { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 99557e33c8..203520853a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -136,7 +136,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, shutdownTimeout) + scheduler.scheduleOnce(shutdownTimeout, shutdownAction) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -211,7 +211,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, shutdownTimeout) + scheduler.scheduleOnce(shutdownTimeout, this) else run() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b9def99301..1cd0f575cd 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -14,7 +14,7 @@ import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } import scala.util.continuations._ import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS } +import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } @@ -853,7 +853,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi @tailrec private def awaitUnsafe(waitTimeNanos: Long): Boolean = { if (value.isEmpty && waitTimeNanos > 0) { - val ms = NANOS.toMillis(waitTimeNanos) + val ms = NANOSECONDS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec val start = currentTimeInNanos try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } @@ -877,7 +877,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi else Long.MaxValue //If both are infinite, use Long.MaxValue if (awaitUnsafe(waitNanos)) this - else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds") + else throw new FutureTimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds") } def await = await(timeout.duration) @@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), NANOSECONDS), this) else func(DefaultPromise.this) } } } - val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) + val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) onComplete(_ ⇒ timeoutFuture.cancel()) false } else true @@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), NANOSECONDS), this) else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418 } } } - dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) + dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) promise } } else this @@ -999,7 +999,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } @inline - private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? + private def currentTimeInNanos: Long = MILLISECONDS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs. @inline private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 119a4a43a6..40f4cee2dd 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -130,8 +130,8 @@ class Gossiper(remote: Remote) { { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) - system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) + system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(initateGossip()) + system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(scrutinize()) } /** diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 78449edc1b..b574cd8888 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -80,7 +80,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name)) become(eating) - system.scheduler.scheduleOnce(self, Think, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Think) case Busy(chopstick) ⇒ become(thinking) @@ -109,7 +109,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think".format(name)) - system.scheduler.scheduleOnce(self, Eat, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Eat) } //All hakkers start in a non-eating state @@ -117,7 +117,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think".format(name)) become(thinking) - system.scheduler.scheduleOnce(self, Eat, 5 seconds) + system.scheduler.scheduleOnce(5 seconds, self, Eat) } }