diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 8d12f58a46..04f75ce74c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -7,7 +7,7 @@ package akka.actor import language.postfixOps import java.io.Closeable import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicInteger +import atomic.{ AtomicReference, AtomicInteger } import scala.concurrent.{ future, Await, ExecutionContext } import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom @@ -475,7 +475,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev def withScheduler(start: Long = 0L, config: Config = ConfigFactory.empty)(thunk: (Scheduler with Closeable, Driver) ⇒ Unit): Unit = { import akka.actor.{ LightArrayRevolverScheduler ⇒ LARS } - val lbq = new LinkedBlockingQueue[Long] + val lbq = new AtomicReference[LinkedBlockingQueue[Long]](new LinkedBlockingQueue[Long]) val prb = TestProbe() val tf = system.asInstanceOf[ActorSystemImpl].threadFactory val sched = @@ -487,14 +487,30 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev override protected def waitNanos(ns: Long): Unit = { // println(s"waiting $ns") prb.ref ! ns - try time += lbq.take() + try time += (lbq.get match { + case q: LinkedBlockingQueue[Long] ⇒ q.take() + case _ ⇒ + val start = System.nanoTime() + super.waitNanos(ns) + System.nanoTime() - start + }) catch { case _: InterruptedException ⇒ Thread.currentThread.interrupt() } } + override def close(): Unit = { + lbq.getAndSet(null) match { + case q: LinkedBlockingQueue[Long] ⇒ q.offer(0L) + case _ ⇒ + } + super.close() + } } val driver = new Driver { - def wakeUp(d: FiniteDuration) { lbq.offer(d.toNanos) } + def wakeUp(d: FiniteDuration) = lbq.get match { + case q: LinkedBlockingQueue[Long] ⇒ q.offer(d.toNanos) + case _ ⇒ + } def expectWait(): FiniteDuration = probe.expectMsgType[Long].nanos def probe = prb def step = sched.TickDuration diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 543bdb6c75..62bcadbf45 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -363,6 +363,9 @@ akka { # If you are scheduling a lot of tasks you should consider increasing the # ticks per wheel. # For more information see: http://www.jboss.org/netty/ + # Note that it might take up to 1 tick to stop the Timer, so setting the + # tick-duration to a high value will make shutting down the actor system + # take longer. tick-duration = 100ms # The timer uses a circular wheel of buckets to store the timer tasks. diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index fba646fc4d..b8f340d4b8 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -342,7 +342,10 @@ class LightArrayRevolverScheduler(config: Config, private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] def stop(): Future[immutable.Seq[TimerTask]] = if (stopped.compareAndSet(null, Promise())) { - timerThread.interrupt() + // Interrupting the timer thread to make it shut down faster is not good since + // it could be in the middle of executing the scheduled tasks, which might not + // respond well to being interrupted. + // Instead we just wait one more tick for it to finish. stopped.get.future } else Future.successful(Nil)