Making a Java API for Scheduler (JScheduler) and an abstract class Scheduler that extends it, to make the Scheduler pluggable, moving it into AkkaApplication and migrating the code.

This commit is contained in:
Viktor Klang 2011-10-17 18:34:34 +02:00
parent 2270395d3f
commit 050411bf3b
10 changed files with 65 additions and 46 deletions

View file

@ -33,14 +33,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() }
})
// run every 50 millisec
collectFuture(Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS))
collectFuture(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3)
collectFuture(Scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS))
collectFuture(app.scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(2, TimeUnit.SECONDS))
@ -53,8 +53,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() }
})
// run every 50 millisec
collectFuture(Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS))
collectFuture(Scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS))
collectFuture(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS))
collectFuture(app.scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS))
// after 1 second the wait should fail
assert(countDownLatch.await(2, TimeUnit.SECONDS) == false)
@ -90,7 +90,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
})
(1 to 10).foreach { i
val future = collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS))
val future = collectFuture(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS))
future.cancel(true)
}
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
@ -116,9 +116,9 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
override def postRestart(reason: Throwable) = restartLatch.open
}).withSupervisor(supervisor))
collectFuture(Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS))
collectFuture(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS))
// appx 2 pings before crash
collectFuture(Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS))
collectFuture(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS))
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
// should be enough time for the ping countdown to recover and reach 6 pings

View file

@ -181,4 +181,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
val typedActor = new TypedActor(this)
val serialization = new Serialization(this)
val scheduler = new DefaultScheduler
}

View file

@ -505,7 +505,7 @@ private[akka] class ActorCell(
val recvtimeout = receiveTimeout
if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed
futureTimeout = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS))
futureTimeout = Some(app.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS))
}
}

View file

@ -8,6 +8,7 @@ import akka.event.EventHandler
import scala.collection.mutable
import java.util.concurrent.ScheduledFuture
import akka.AkkaApplication
object FSM {
@ -29,14 +30,14 @@ object FSM {
case object StateTimeout
case class TimeoutMarker(generation: Long)
case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int) {
case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit app: AkkaApplication) {
private var ref: Option[ScheduledFuture[AnyRef]] = _
def schedule(actor: ActorRef, timeout: Duration) {
if (repeat) {
ref = Some(Scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit))
ref = Some(app.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit))
} else {
ref = Some(Scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit))
ref = Some(app.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit))
}
}
@ -525,7 +526,7 @@ trait FSM[S, D] extends ListenerManagement {
if (timeout.isDefined) {
val t = timeout.get
if (t.finite_? && t.length >= 0) {
timeoutFuture = Some(Scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit))
timeoutFuture = Some(app.scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit))
}
}
}

View file

@ -15,22 +15,49 @@
*/
package akka.actor
import akka.event.EventHandler
import akka.AkkaException
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent._
import java.lang.RuntimeException
import akka.util.Duration
object Scheduler {
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) {
def this(msg: String) = this(msg, null)
}
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e)
trait JScheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
}
abstract class Scheduler extends JScheduler {
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] =
schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] =
schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): ScheduledFuture[AnyRef] =
scheduleOnce(receiver, message, delay.length, delay.unit)
def scheduleOnce(f: () Unit, delay: Duration): ScheduledFuture[AnyRef] =
scheduleOnce(f, delay.length, delay.unit)
}
class DefaultScheduler extends Scheduler {
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = new Runnable {
def run = {
receiver ! message
if (throwWhenReceiverExpired && receiver.isShutdown) throw new ActorKilledException("Receiver was terminated")
}
}
private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = {
new Runnable { def run = receiver ! message }
}
/**
* Schedules to send the specified message to the receiver after initialDelay and then repeated after delay.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the

View file

@ -127,7 +127,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
shutdownSchedule = RESCHEDULED
case RESCHEDULED //Already marked for reschedule
@ -159,7 +159,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
shutdownSchedule = RESCHEDULED
case RESCHEDULED //Already marked for reschedule
@ -220,7 +220,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
shutdownSchedule match {
case RESCHEDULED
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
if (uuids.isEmpty && _tasks.get == 0) {
active switchOff {

View file

@ -7,7 +7,7 @@ package akka.dispatch
import akka.AkkaException
import akka.event.EventHandler
import akka.actor.{ Actor, UntypedChannel, Scheduler, Timeout, ExceptionChannel }
import akka.actor.{ Actor, UntypedChannel, Timeout, ExceptionChannel }
import scala.Option
import akka.japi.{ Procedure, Function JFunc, Option JOption }
@ -947,12 +947,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable {
def run() {
if (!isCompleted) {
if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
else func(DefaultPromise.this)
}
}
}
Scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
false
} else true
} else false
@ -973,12 +973,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable {
def run() {
if (!isCompleted) {
if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
else promise complete (try { Right(fallback) } catch { case e Left(e) })
}
}
}
Scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
promise
}
} else this

View file

@ -1,23 +1,12 @@
Scheduler
=========
Module stability: **SOLID**
``Akka`` has a little scheduler written using actors.
This can be convenient if you want to schedule some periodic task for maintenance or similar.
It allows you to register a message that you want to be sent to a specific actor at a periodic interval.
//FIXME
Here is an example:
-------------------
.. code-block:: scala
import akka.actor.Scheduler
//Sends messageToBeSent to receiverActor after initialDelayBeforeSending and then after each delayBetweenMessages
Scheduler.schedule(receiverActor, messageToBeSent, initialDelayBeforeSending, delayBetweenMessages, timeUnit)
//Sends messageToBeSent to receiverActor after delayUntilSend
Scheduler.scheduleOnce(receiverActor, messageToBeSent, delayUntilSend, timeUnit)
//TODO FIXME

View file

@ -138,7 +138,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
actor
} else { // we lost the race -- wait for future to complete
oldFuture.await.resultOrException.get
oldFuture.get
}
}

View file

@ -3,7 +3,7 @@ package sample.fsm.dining.become
//Akka adaptation of
//http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus/
import akka.actor.{ Scheduler, ActorRef, Actor }
import akka.actor.{ ActorRef, Actor }
import java.util.concurrent.TimeUnit
import akka.AkkaApplication
@ -78,7 +78,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Taken(`chopstickToWaitFor`)
println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address)
become(eating)
Scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS)
app.scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS)
case Busy(chopstick)
become(thinking)
@ -107,7 +107,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
left ! Put(self)
right ! Put(self)
println("%s puts down his chopsticks and starts to think", name)
Scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS)
app.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS)
}
//All hakkers start in a non-eating state
@ -115,7 +115,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Think
println("%s starts to think", name)
become(thinking)
Scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS)
app.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS)
}
}