Removing Scheduler.shutdown from public API and making the SchedulerSpec clean up after itself instead

This commit is contained in:
Viktor Klang 2011-07-12 12:17:32 +02:00
parent 5486d9fbb7
commit 321a9e064f
2 changed files with 19 additions and 24 deletions

View file

@ -2,16 +2,23 @@ package akka.actor
import org.scalatest.junit.JUnitSuite
import Actor._
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.config.Supervision._
import org.multiverse.api.latches.StandardLatch
import org.junit.Test
import java.util.concurrent.{ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
class SchedulerSpec extends JUnitSuite {
private val futures = new ConcurrentLinkedQueue[ScheduledFuture[AnyRef]]()
def collectFuture(f: => ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = {
val future = f
futures.add(future)
future
}
def withCleanEndState(action: Unit) {
action
Scheduler.restart
while(futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) }
Actor.registry.local.shutdownAll
}
@ -24,14 +31,14 @@ class SchedulerSpec extends JUnitSuite {
def receive = { case Tick countDownLatch.countDown() }
}).start()
// run every 50 millisec
Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3)
Scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(1, TimeUnit.SECONDS))
@ -45,8 +52,8 @@ class SchedulerSpec extends JUnitSuite {
def receive = { case Tick countDownLatch.countDown() }
}).start()
// run every 50 millisec
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
Scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS))
collectFuture(Scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS))
// after 1 second the wait should fail
assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)
@ -65,7 +72,7 @@ class SchedulerSpec extends JUnitSuite {
def receive = { case Ping ticks.countDown }
}).start
val numActors = Actor.registry.local.actors.length
(1 to 1000).foreach(_ Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))
(1 to 1000).foreach(_ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS)))
assert(ticks.await(10, TimeUnit.SECONDS))
assert(Actor.registry.local.actors.length === numActors)
}
@ -83,7 +90,7 @@ class SchedulerSpec extends JUnitSuite {
}).start()
(1 to 10).foreach { i
val future = Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)
val future = collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS))
future.cancel(true)
}
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
@ -120,9 +127,9 @@ class SchedulerSpec extends JUnitSuite {
Permanent)
:: Nil)).start
Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS))
// appx 2 pings before crash
Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)
collectFuture(Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS))
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
// should be enough time for the ping countdown to recover and reach 6 pings

View file

@ -26,8 +26,7 @@ object Scheduler {
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e)
@volatile
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = {
receiver match {
@ -127,18 +126,7 @@ object Scheduler {
}
}
def shutdown() {
synchronized {
service.shutdown()
}
}
def restart() {
synchronized {
shutdown()
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
}
}
private[akka] def shutdown() { service.shutdown() }
}
private object SchedulerThreadFactory extends ThreadFactory {