Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-07-01 15:28:05 +02:00
commit b0bdccfc24
2 changed files with 47 additions and 23 deletions

View file

@ -15,10 +15,9 @@
*/
package se.scalablesolutions.akka.actor
import _root_.scala.collection.JavaConversions
import java.util.concurrent._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.util.Logging
object Scheduler {
@ -35,6 +34,7 @@ object Scheduler {
val future = service.scheduleAtFixedRate(
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
createAndStoreScheduleActorForFuture(future)
val scheduler = actorOf(new ScheduleActor(future)).start
schedulers.put(scheduler, scheduler)
scheduler
@ -43,6 +43,22 @@ object Scheduler {
}
}
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ActorRef = {
try {
val future = service.schedule(
new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
createAndStoreScheduleActorForFuture(future)
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
}
private def createAndStoreScheduleActorForFuture(future: ScheduledFuture[AnyRef]): ActorRef = {
val scheduler = actorOf(new ScheduleActor(future)).start
schedulers.put(scheduler, scheduler)
scheduler
}
def unschedule(scheduleActor: ActorRef) = {
scheduleActor ! UnSchedule
schedulers.remove(scheduleActor)
@ -65,7 +81,7 @@ private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor with
def receive = {
case Scheduler.UnSchedule =>
future.cancel(true)
exit
self.stop
}
}
@ -79,4 +95,4 @@ private object SchedulerThreadFactory extends ThreadFactory {
thread.setDaemon(true)
thread
}
}
}

View file

@ -1,30 +1,38 @@
package se.scalablesolutions.akka.actor
import java.util.concurrent.TimeUnit
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.junit.{After, Test}
class SchedulerSpec extends JUnitSuite {
@Test def schedulerShouldScheduleMoreThanOnce = {
@Test def schedulerShouldSchedule = {
/*
var count = 0
case object Tick
val actor = new Actor() {
def receive = {
case Tick => count += 1
}}
actor.start
Thread.sleep(1000)
Scheduler.schedule(actor, Tick, 0L, 1L, TimeUnit.SECONDS)
Thread.sleep(5000)
Scheduler.stop
assert(count > 0)
val countDownLatch = new CountDownLatch(3)
val tickActor = actor {
case Tick => countDownLatch.countDown
}
// run every 50 millisec
Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
*/
assert(true)
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS))
}
@Test def schedulerShouldScheduleOnce = {
case object Tick
val countDownLatch = new CountDownLatch(2)
val tickActor = actor {
case Tick => countDownLatch.countDown
}
// run every 50 millisec
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
// after 1 second the wait should fail
assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)
// should still be 1 left
assert(countDownLatch.getCount == 1)
}
}