Adding scheduling thats usable from TypedActor

This commit is contained in:
Viktor Klang 2010-08-18 19:01:34 +02:00
parent e9baf2b804
commit de79fa140b
3 changed files with 65 additions and 5 deletions

View file

@ -255,7 +255,7 @@ object Actor extends Logging {
case object Spawn
actorOf(new Actor() {
def receive = {
case Spawn => body; self.stop
case Spawn => try { body } finally { self.stop }
}
}).start ! Spawn
}

View file

@ -29,6 +29,9 @@ object Scheduler extends Logging {
log.info("Starting up Scheduler")
/**
* Schedules to send the specified message to the receiver after initialDelay and then repeated after delay
*/
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule scheduled event\n\tevent = [%s]\n\treceiver = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
@ -38,19 +41,68 @@ object Scheduler extends Logging {
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
case e: Exception => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
}
/**
* Schedules to run specified function to the receiver after initialDelay and then repeated after delay,
* avoid blocking operations since this is executed in the schedulers thread
*/
def schedule(f: () => Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
schedule(new Runnable { def run = f() }, initialDelay, delay, timeUnit)
/**
* Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay,
* avoid blocking operations since this is executed in the schedulers thread
*/
def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule scheduled event\n\trunnable = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
runnable, initialDelay, delay, timeUnit)
try {
service.scheduleAtFixedRate(runnable,initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException("Failed to schedule a Runnable", e)
}
}
/**
* Schedules to send the specified message to the receiver after delay
*/
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule one-time event\n\tevent = [%s]\n\treceiver = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
message, receiver, delay, timeUnit)
try {
service.schedule(
new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
new Runnable { def run = receiver ! message },
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
case e: Exception => throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
}
}
/**
* Schedules a function to be run after delay,
* avoid blocking operations since the runnable is executed in the schedulers thread
*/
def scheduleOnce(f: () => Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
scheduleOnce(new Runnable { def run = f() }, delay, timeUnit)
/**
* Schedules a runnable to be run after delay,
* avoid blocking operations since the runnable is executed in the schedulers thread
*/
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule one-time event\n\trunnable = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
runnable, delay, timeUnit)
try {
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException("Failed to scheduleOnce a Runnable", e)
}
}

View file

@ -28,16 +28,24 @@ class SchedulerSpec extends JUnitSuite {
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3)
Scheduler.schedule( () => countDownLatch2.countDown, 0, 50, TimeUnit.MILLISECONDS)
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(1, TimeUnit.SECONDS))
}
@Test def schedulerShouldScheduleOnce = withCleanEndState {
case object Tick
val countDownLatch = new CountDownLatch(2)
val countDownLatch = new CountDownLatch(3)
val tickActor = actor {
case Tick => countDownLatch.countDown
}
// run every 50 millisec
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
Scheduler.scheduleOnce( () => countDownLatch.countDown, 50, TimeUnit.MILLISECONDS)
// after 1 second the wait should fail
assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)