Do not interrupt LARS at shutdown or tasks might fail. See #3005
This commit is contained in:
parent
a9cb1f9c19
commit
fb69f33dd1
3 changed files with 27 additions and 5 deletions
|
|
@ -7,7 +7,7 @@ package akka.actor
|
|||
import language.postfixOps
|
||||
import java.io.Closeable
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import atomic.{ AtomicReference, AtomicInteger }
|
||||
import scala.concurrent.{ future, Await, ExecutionContext }
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
|
|
@ -475,7 +475,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
|
|||
|
||||
def withScheduler(start: Long = 0L, config: Config = ConfigFactory.empty)(thunk: (Scheduler with Closeable, Driver) ⇒ Unit): Unit = {
|
||||
import akka.actor.{ LightArrayRevolverScheduler ⇒ LARS }
|
||||
val lbq = new LinkedBlockingQueue[Long]
|
||||
val lbq = new AtomicReference[LinkedBlockingQueue[Long]](new LinkedBlockingQueue[Long])
|
||||
val prb = TestProbe()
|
||||
val tf = system.asInstanceOf[ActorSystemImpl].threadFactory
|
||||
val sched =
|
||||
|
|
@ -487,14 +487,30 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
|
|||
override protected def waitNanos(ns: Long): Unit = {
|
||||
// println(s"waiting $ns")
|
||||
prb.ref ! ns
|
||||
try time += lbq.take()
|
||||
try time += (lbq.get match {
|
||||
case q: LinkedBlockingQueue[Long] ⇒ q.take()
|
||||
case _ ⇒
|
||||
val start = System.nanoTime()
|
||||
super.waitNanos(ns)
|
||||
System.nanoTime() - start
|
||||
})
|
||||
catch {
|
||||
case _: InterruptedException ⇒ Thread.currentThread.interrupt()
|
||||
}
|
||||
}
|
||||
override def close(): Unit = {
|
||||
lbq.getAndSet(null) match {
|
||||
case q: LinkedBlockingQueue[Long] ⇒ q.offer(0L)
|
||||
case _ ⇒
|
||||
}
|
||||
super.close()
|
||||
}
|
||||
}
|
||||
val driver = new Driver {
|
||||
def wakeUp(d: FiniteDuration) { lbq.offer(d.toNanos) }
|
||||
def wakeUp(d: FiniteDuration) = lbq.get match {
|
||||
case q: LinkedBlockingQueue[Long] ⇒ q.offer(d.toNanos)
|
||||
case _ ⇒
|
||||
}
|
||||
def expectWait(): FiniteDuration = probe.expectMsgType[Long].nanos
|
||||
def probe = prb
|
||||
def step = sched.TickDuration
|
||||
|
|
|
|||
|
|
@ -363,6 +363,9 @@ akka {
|
|||
# If you are scheduling a lot of tasks you should consider increasing the
|
||||
# ticks per wheel.
|
||||
# For more information see: http://www.jboss.org/netty/
|
||||
# Note that it might take up to 1 tick to stop the Timer, so setting the
|
||||
# tick-duration to a high value will make shutting down the actor system
|
||||
# take longer.
|
||||
tick-duration = 100ms
|
||||
|
||||
# The timer uses a circular wheel of buckets to store the timer tasks.
|
||||
|
|
|
|||
|
|
@ -342,7 +342,10 @@ class LightArrayRevolverScheduler(config: Config,
|
|||
private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]]
|
||||
def stop(): Future[immutable.Seq[TimerTask]] =
|
||||
if (stopped.compareAndSet(null, Promise())) {
|
||||
timerThread.interrupt()
|
||||
// Interrupting the timer thread to make it shut down faster is not good since
|
||||
// it could be in the middle of executing the scheduled tasks, which might not
|
||||
// respond well to being interrupted.
|
||||
// Instead we just wait one more tick for it to finish.
|
||||
stopped.get.future
|
||||
} else Future.successful(Nil)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue