Changed signatures of Scheduler for better api of by-name blocks

This commit is contained in:
Patrik Nordwall 2011-12-02 17:13:46 +01:00
parent af1ee4fb5a
commit 1f665ab4c6
10 changed files with 57 additions and 50 deletions

View file

@ -29,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() } def receive = { case Tick countDownLatch.countDown() }
}) })
// run every 50 milliseconds // run every 50 milliseconds
collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds)) collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick))
// after max 1 second it should be executed at least the 3 times already // after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS)) assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3) val countDownLatch2 = new CountDownLatch(3)
collectCancellable(system.scheduler.schedule(() countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds)) collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(countDownLatch2.countDown()))
// after max 1 second it should be executed at least the 3 times already // after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(2, TimeUnit.SECONDS)) assert(countDownLatch2.await(2, TimeUnit.SECONDS))
@ -44,7 +44,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
"should stop continuous scheduling if the receiving actor has been terminated" in { "should stop continuous scheduling if the receiving actor has been terminated" in {
// run immediately and then every 100 milliseconds // run immediately and then every 100 milliseconds
collectCancellable(system.scheduler.schedule(testActor, "msg", 0 milliseconds, 100 milliseconds)) collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, testActor, "msg"))
// stop the actor and, hence, the continuous messaging from happening // stop the actor and, hence, the continuous messaging from happening
testActor ! PoisonPill testActor ! PoisonPill
@ -59,14 +59,18 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() } def receive = { case Tick countDownLatch.countDown() }
}) })
// run every 50 millisec // run after 300 millisec
collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds)) collectCancellable(system.scheduler.scheduleOnce(300 milliseconds, tickActor, Tick))
collectCancellable(system.scheduler.scheduleOnce(() countDownLatch.countDown(), 50 milliseconds)) collectCancellable(system.scheduler.scheduleOnce(300 milliseconds)(countDownLatch.countDown()))
// should not be run immediately
assert(countDownLatch.await(100, TimeUnit.MILLISECONDS) == false)
countDownLatch.getCount must be(3)
// after 1 second the wait should fail // after 1 second the wait should fail
assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)
// should still be 1 left // should still be 1 left
assert(countDownLatch.getCount == 1) countDownLatch.getCount must be(1)
} }
/** /**
@ -81,7 +85,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
(1 to 10).foreach { i (1 to 10).foreach { i
val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second)) val timeout = collectCancellable(system.scheduler.scheduleOnce(1 second, actor, Ping))
timeout.cancel() timeout.cancel()
} }
@ -109,10 +113,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
val actor = (supervisor ? props).as[ActorRef].get val actor = (supervisor ? props).as[ActorRef].get
collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds)) collectCancellable(system.scheduler.schedule(500 milliseconds, 500 milliseconds, actor, Ping))
// appx 2 pings before crash // appx 2 pings before crash
EventFilter[Exception]("CRASH", occurrences = 1) intercept { EventFilter[Exception]("CRASH", occurrences = 1) intercept {
collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds)) collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash))
} }
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
@ -136,7 +140,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
(1 to 300).foreach { i (1 to 300).foreach { i
collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime), 10 milliseconds)) collectCancellable(system.scheduler.scheduleOnce(10 milliseconds, actor, Msg(System.nanoTime)))
Thread.sleep(5) Thread.sleep(5)
} }
@ -155,7 +159,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
val startTime = System.nanoTime() val startTime = System.nanoTime()
val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds) val cancellable = system.scheduler.schedule(1 second, 100 milliseconds, actor, Msg)
ticks.await(3, TimeUnit.SECONDS) ticks.await(3, TimeUnit.SECONDS)
val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000

View file

@ -404,7 +404,7 @@ private[akka] class ActorCell(
if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) {
recvtimeout._2.cancel() //Cancel any ongoing future recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed //Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout._1, TimeUnit.MILLISECONDS))) receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout))
} else cancelReceiveTimeout() } else cancelReceiveTimeout()
} }

View file

@ -417,19 +417,19 @@ class LocalDeathWatch extends DeathWatch with ActorClassification {
*/ */
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: MessageDispatcher) extends Scheduler with Closeable { class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: MessageDispatcher) extends Scheduler with Closeable {
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, receiver, message), initialDelay))
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable = def schedule(initialDelay: Duration, delay: Duration)(f: Unit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay)) new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay))
def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay))
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay))
def scheduleOnce(f: () Unit, delay: Duration): Cancellable = def scheduleOnce(delay: Duration)(f: Unit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay))
private def createSingleTask(runnable: Runnable): TimerTask = private def createSingleTask(runnable: Runnable): TimerTask =
@ -446,14 +446,14 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
} }
} }
private def createSingleTask(f: () Unit): TimerTask = private def createSingleTask(f: Unit): TimerTask =
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(f) dispatcher.dispatchTask(() f)
} }
} }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { private def createContinuousTask(delay: Duration, receiver: ActorRef, message: Any): TimerTask = {
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
// Check if the receiver is still alive and kicking before sending it a message and reschedule the task // Check if the receiver is still alive and kicking before sending it a message and reschedule the task
@ -467,10 +467,10 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
} }
} }
private def createContinuousTask(f: () Unit, delay: Duration): TimerTask = { private def createContinuousTask(delay: Duration, f: Unit): TimerTask = {
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(f) dispatcher.dispatchTask(() f)
timeout.getTimer.newTimeout(this, delay) timeout.getTimer.newTimeout(this, delay)
} }
} }

View file

@ -398,7 +398,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
case x: Closeable case x: Closeable
// Let dispatchers shutdown first. // Let dispatchers shutdown first.
// Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay. // Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay.
x.scheduleOnce(() { x.close(); dispatcher.shutdown() }, settings.DispatcherDefaultShutdown * 4) x.scheduleOnce(settings.DispatcherDefaultShutdown * 4) {
x.close()
dispatcher.shutdown()
}
case _ case _
} }

View file

@ -34,9 +34,9 @@ object FSM {
def schedule(actor: ActorRef, timeout: Duration) { def schedule(actor: ActorRef, timeout: Duration) {
if (repeat) { if (repeat) {
ref = Some(system.scheduler.schedule(actor, this, timeout, timeout)) ref = Some(system.scheduler.schedule(timeout, timeout, actor, this))
} else { } else {
ref = Some(system.scheduler.scheduleOnce(actor, this, timeout)) ref = Some(system.scheduler.scheduleOnce(timeout, actor, this))
} }
} }
@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement {
if (timeout.isDefined) { if (timeout.isDefined) {
val t = timeout.get val t = timeout.get
if (t.finite_? && t.length >= 0) { if (t.finite_? && t.length >= 0) {
timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t)) timeoutFuture = Some(system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation)))
} }
} }
} }

View file

@ -20,29 +20,29 @@ trait Scheduler {
* E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set * E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set
* delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) * delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS)
*/ */
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable
/** /**
* Schedules a function to be run repeatedly with an initial delay and a frequency. * Schedules a function to be run repeatedly with an initial delay and a frequency.
* E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set
* delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS)
*/ */
def schedule(f: () Unit, initialDelay: Duration, frequency: Duration): Cancellable def schedule(initialDelay: Duration, frequency: Duration)(f: Unit): Cancellable
/** /**
* Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed.
*/ */
def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable
/** /**
* Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. * Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent.
*/ */
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable
/** /**
* Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run.
*/ */
def scheduleOnce(f: () Unit, delay: Duration): Cancellable def scheduleOnce(delay: Duration)(f: Unit): Cancellable
} }
trait Cancellable { trait Cancellable {

View file

@ -136,7 +136,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
shutdownScheduleUpdater.get(this) match { shutdownScheduleUpdater.get(this) match {
case UNSCHEDULED case UNSCHEDULED
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
scheduler.scheduleOnce(shutdownAction, shutdownTimeout) scheduler.scheduleOnce(shutdownTimeout, shutdownAction)
() ()
} else ifSensibleToDoSoThenScheduleShutdown() } else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED case SCHEDULED
@ -211,7 +211,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
} }
case RESCHEDULED case RESCHEDULED
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
scheduler.scheduleOnce(this, shutdownTimeout) scheduler.scheduleOnce(shutdownTimeout, this)
else run() else run()
} }
} }

View file

@ -14,7 +14,7 @@ import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption }
import scala.util.continuations._ import scala.util.continuations._
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS NANOS, MILLISECONDS MILLIS } import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import java.util.{ LinkedList JLinkedList } import java.util.{ LinkedList JLinkedList }
@ -853,7 +853,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
@tailrec @tailrec
private def awaitUnsafe(waitTimeNanos: Long): Boolean = { private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
if (value.isEmpty && waitTimeNanos > 0) { if (value.isEmpty && waitTimeNanos > 0) {
val ms = NANOS.toMillis(waitTimeNanos) val ms = NANOSECONDS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
val start = currentTimeInNanos val start = currentTimeInNanos
try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException } try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException }
@ -877,7 +877,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
else Long.MaxValue //If both are infinite, use Long.MaxValue else Long.MaxValue //If both are infinite, use Long.MaxValue
if (awaitUnsafe(waitNanos)) this if (awaitUnsafe(waitNanos)) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds") else throw new FutureTimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds")
} }
def await = await(timeout.duration) def await = await(timeout.duration)
@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable { val runnable = new Runnable {
def run() { def run() {
if (!isCompleted) { if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), NANOSECONDS), this)
else func(DefaultPromise.this) else func(DefaultPromise.this)
} }
} }
} }
val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable)
onComplete(_ timeoutFuture.cancel()) onComplete(_ timeoutFuture.cancel())
false false
} else true } else true
@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable { val runnable = new Runnable {
def run() { def run() {
if (!isCompleted) { if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), NANOSECONDS), this)
else promise complete (try { Right(fallback) } catch { case e Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418 else promise complete (try { Right(fallback) } catch { case e Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418
} }
} }
} }
dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable)
promise promise
} }
} else this } else this
@ -999,7 +999,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
} }
@inline @inline
private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? private def currentTimeInNanos: Long = MILLISECONDS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)?
//TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs. //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs.
@inline @inline
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)

View file

@ -130,8 +130,8 @@ class Gossiper(remote: Remote) {
{ {
// start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between
system.scheduler schedule (() initateGossip(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(initateGossip())
system.scheduler schedule (() scrutinize(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) system.scheduler.schedule(Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))(scrutinize())
} }
/** /**

View file

@ -80,7 +80,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Taken(`chopstickToWaitFor`) case Taken(`chopstickToWaitFor`)
println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name)) println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name))
become(eating) become(eating)
system.scheduler.scheduleOnce(self, Think, 5 seconds) system.scheduler.scheduleOnce(5 seconds, self, Think)
case Busy(chopstick) case Busy(chopstick)
become(thinking) become(thinking)
@ -109,7 +109,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
left ! Put(self) left ! Put(self)
right ! Put(self) right ! Put(self)
println("%s puts down his chopsticks and starts to think".format(name)) println("%s puts down his chopsticks and starts to think".format(name))
system.scheduler.scheduleOnce(self, Eat, 5 seconds) system.scheduler.scheduleOnce(5 seconds, self, Eat)
} }
//All hakkers start in a non-eating state //All hakkers start in a non-eating state
@ -117,7 +117,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Think case Think
println("%s starts to think".format(name)) println("%s starts to think".format(name))
become(thinking) become(thinking)
system.scheduler.scheduleOnce(self, Eat, 5 seconds) system.scheduler.scheduleOnce(5 seconds, self, Eat)
} }
} }