diff --git a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala index 6b5dbd684c..3c08d0983a 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala @@ -2,16 +2,23 @@ package akka.actor import org.scalatest.junit.JUnitSuite import Actor._ -import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.config.Supervision._ import org.multiverse.api.latches.StandardLatch import org.junit.Test +import java.util.concurrent.{ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit} class SchedulerSpec extends JUnitSuite { + private val futures = new ConcurrentLinkedQueue[ScheduledFuture[AnyRef]]() + + def collectFuture(f: => ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = { + val future = f + futures.add(future) + future + } def withCleanEndState(action: ⇒ Unit) { action - Scheduler.restart + while(futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) } Actor.registry.local.shutdownAll } @@ -24,14 +31,14 @@ class SchedulerSpec extends JUnitSuite { def receive = { case Tick ⇒ countDownLatch.countDown() } }).start() // run every 50 millisec - Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS) + collectFuture(Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - Scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS) + collectFuture(Scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(1, TimeUnit.SECONDS)) @@ -45,8 +52,8 @@ class SchedulerSpec extends JUnitSuite { def receive = { case Tick ⇒ countDownLatch.countDown() } }).start() // run every 50 millisec - Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS) - Scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS) + collectFuture(Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) + collectFuture(Scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) // after 1 second the wait should fail assert(countDownLatch.await(1, TimeUnit.SECONDS) == false) @@ -65,7 +72,7 @@ class SchedulerSpec extends JUnitSuite { def receive = { case Ping ⇒ ticks.countDown } }).start val numActors = Actor.registry.local.actors.length - (1 to 1000).foreach(_ ⇒ Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS)) + (1 to 1000).foreach(_ ⇒ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))) assert(ticks.await(10, TimeUnit.SECONDS)) assert(Actor.registry.local.actors.length === numActors) } @@ -83,7 +90,7 @@ class SchedulerSpec extends JUnitSuite { }).start() (1 to 10).foreach { i ⇒ - val future = Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS) + val future = collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) future.cancel(true) } assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made @@ -120,9 +127,9 @@ class SchedulerSpec extends JUnitSuite { Permanent) :: Nil)).start - Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS) + collectFuture(Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) // appx 2 pings before crash - Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS) + collectFuture(Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) // should be enough time for the ping countdown to recover and reach 6 pings diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 1451a34ada..efbec15239 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -26,8 +26,7 @@ object Scheduler { case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) - @volatile - private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) + private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = { receiver match { @@ -127,18 +126,7 @@ object Scheduler { } } - def shutdown() { - synchronized { - service.shutdown() - } - } - - def restart() { - synchronized { - shutdown() - service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - } - } + private[akka] def shutdown() { service.shutdown() } } private object SchedulerThreadFactory extends ThreadFactory {