diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 9089929ce6..eb5865f5c2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -33,14 +33,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectFuture(Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) + collectFuture(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectFuture(Scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) + collectFuture(app.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -53,8 +53,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectFuture(Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) - collectFuture(Scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) + collectFuture(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) + collectFuture(app.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -90,7 +90,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val future = collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) + val future = collectFuture(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) future.cancel(true) } assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made @@ -116,9 +116,9 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { override def postRestart(reason: Throwable) = restartLatch.open }).withSupervisor(supervisor)) - collectFuture(Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) + collectFuture(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) // appx 2 pings before crash - collectFuture(Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) + collectFuture(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) // should be enough time for the ping countdown to recover and reach 6 pings diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 60e7d78df2..ce67f66745 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -181,4 +181,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor val typedActor = new TypedActor(this) val serialization = new Serialization(this) + + val scheduler = new DefaultScheduler } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 84a82351ec..1d46a48623 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -505,7 +505,7 @@ private[akka] class ActorCell( val recvtimeout = receiveTimeout if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed - futureTimeout = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) + futureTimeout = Some(app.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 5cbeec8b09..4beff7229d 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -8,6 +8,7 @@ import akka.event.EventHandler import scala.collection.mutable import java.util.concurrent.ScheduledFuture +import akka.AkkaApplication object FSM { @@ -29,14 +30,14 @@ object FSM { case object StateTimeout case class TimeoutMarker(generation: Long) - case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int) { + case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit app: AkkaApplication) { private var ref: Option[ScheduledFuture[AnyRef]] = _ def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(Scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) + ref = Some(app.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) } else { - ref = Some(Scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) + ref = Some(app.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) } } @@ -525,7 +526,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(Scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) + timeoutFuture = Some(app.scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index d68b262085..57a7ccbc8d 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -15,22 +15,49 @@ */ package akka.actor -import akka.event.EventHandler import akka.AkkaException import java.util.concurrent.atomic.AtomicLong import java.util.concurrent._ -import java.lang.RuntimeException +import akka.util.Duration -object Scheduler { +case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) { + def this(msg: String) = this(msg, null) +} - case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) +trait JScheduler { + def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] + def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] + def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] +} + +abstract class Scheduler extends JScheduler { + + def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] + def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] + + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] = + schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) + + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] = + schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) + + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): ScheduledFuture[AnyRef] = + scheduleOnce(receiver, message, delay.length, delay.unit) + + def scheduleOnce(f: () ⇒ Unit, delay: Duration): ScheduledFuture[AnyRef] = + scheduleOnce(f, delay.length, delay.unit) +} + +class DefaultScheduler extends Scheduler { + private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = new Runnable { + def run = { + receiver ! message + if (throwWhenReceiverExpired && receiver.isShutdown) throw new ActorKilledException("Receiver was terminated") + } + } private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = { - new Runnable { def run = receiver ! message } - } - /** * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay. * The returned java.util.concurrent.ScheduledFuture can be used to cancel the diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index aef413579f..c7e7fab35a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -127,7 +127,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable shutdownSchedule match { case UNSCHEDULED ⇒ shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED ⇒ shutdownSchedule = RESCHEDULED case RESCHEDULED ⇒ //Already marked for reschedule @@ -159,7 +159,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable shutdownSchedule match { case UNSCHEDULED ⇒ shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED ⇒ shutdownSchedule = RESCHEDULED case RESCHEDULED ⇒ //Already marked for reschedule @@ -220,7 +220,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable shutdownSchedule match { case RESCHEDULED ⇒ shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) + app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED ⇒ if (uuids.isEmpty && _tasks.get == 0) { active switchOff { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index cfda93760f..dc302c5af9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,7 +7,7 @@ package akka.dispatch import akka.AkkaException import akka.event.EventHandler -import akka.actor.{ Actor, UntypedChannel, Scheduler, Timeout, ExceptionChannel } +import akka.actor.{ Actor, UntypedChannel, Timeout, ExceptionChannel } import scala.Option import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } @@ -947,12 +947,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) else func(DefaultPromise.this) } } } - Scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) false } else true } else false @@ -973,12 +973,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) } } } - Scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) promise } } else this diff --git a/akka-docs/common/scheduler.rst b/akka-docs/common/scheduler.rst index bf2b813d2e..7fd28d37ee 100644 --- a/akka-docs/common/scheduler.rst +++ b/akka-docs/common/scheduler.rst @@ -1,23 +1,12 @@ Scheduler ========= -Module stability: **SOLID** - -``Akka`` has a little scheduler written using actors. -This can be convenient if you want to schedule some periodic task for maintenance or similar. - -It allows you to register a message that you want to be sent to a specific actor at a periodic interval. +//FIXME Here is an example: ------------------- .. code-block:: scala - import akka.actor.Scheduler - - //Sends messageToBeSent to receiverActor after initialDelayBeforeSending and then after each delayBetweenMessages - Scheduler.schedule(receiverActor, messageToBeSent, initialDelayBeforeSending, delayBetweenMessages, timeUnit) - - //Sends messageToBeSent to receiverActor after delayUntilSend - Scheduler.scheduleOnce(receiverActor, messageToBeSent, delayUntilSend, timeUnit) + //TODO FIXME diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5fcf8b8fee..87a65d21bf 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -138,7 +138,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider actor } else { // we lost the race -- wait for future to complete - oldFuture.await.resultOrException.get + oldFuture.get } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 4bd6baafb2..d3326d85d6 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -3,7 +3,7 @@ package sample.fsm.dining.become //Akka adaptation of //http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus/ -import akka.actor.{ Scheduler, ActorRef, Actor } +import akka.actor.{ ActorRef, Actor } import java.util.concurrent.TimeUnit import akka.AkkaApplication @@ -78,7 +78,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) become(eating) - Scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS) + app.scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS) case Busy(chopstick) ⇒ become(thinking) @@ -107,7 +107,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think", name) - Scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + app.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) } //All hakkers start in a non-eating state @@ -115,7 +115,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think", name) become(thinking) - Scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + app.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) } }