From ef5287a6d87f974fc2267507abfcc2cad3100ee7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 20 Jan 2012 17:38:58 +0100 Subject: [PATCH] Switching to LBD and minimizing window of failure --- .../main/scala/akka/actor/ActorSystem.scala | 56 +++++++------------ 1 file changed, 19 insertions(+), 37 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index b607bc6a7e..14d44106bb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -4,7 +4,6 @@ package akka.actor import akka.config.ConfigurationException -import akka.actor._ import akka.event._ import akka.dispatch._ import akka.util.duration._ @@ -12,24 +11,15 @@ import akka.util.Timeout import akka.util.Timeout._ import org.jboss.netty.akka.util.HashedWheelTimer import java.util.concurrent.TimeUnit.MILLISECONDS -import java.util.concurrent.TimeUnit.NANOSECONDS -import java.io.File import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigParseOptions -import com.typesafe.config.ConfigResolveOptions -import com.typesafe.config.ConfigException import akka.util.{ Helpers, Duration, ReflectiveAccess } -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{ CountDownLatch, Executors, ConcurrentHashMap } import scala.annotation.tailrec import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import java.io.Closeable -import java.util.concurrent.ConcurrentLinkedQueue import akka.dispatch.Await.Awaitable import akka.dispatch.Await.CanAwait -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException +import java.util.concurrent.{ CountDownLatch, LinkedBlockingDeque, TimeUnit, TimeoutException } object ActorSystem { @@ -529,39 +519,31 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor override def toString = lookupRoot.path.root.address.toString class TerminationCallbacks extends Runnable with Awaitable[Unit] { - import scala.collection.JavaConverters._ - private val callbacks = new ConcurrentLinkedQueue[Runnable] + private val callbacks = new LinkedBlockingDeque[Runnable] private val latch = new CountDownLatch(1) - def add(callback: Runnable) { - callbacks add callback + final def add(callback: Runnable): Unit = + if (latch.await(0, TimeUnit.NANOSECONDS)) callbacks.clear() else callbacks addFirst callback + + final def run(): Unit = { + @tailrec def runNext(): Unit = callbacks.pollFirst() match { + case null ⇒ () + case some ⇒ + try some.run() catch { case e ⇒ log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) } + runNext() + } + try runNext() finally latch.countDown() } - def run(): Unit = try { - for (c ← callbacks.asScala.toSeq.reverse) { - try { - c.run() - } catch { - case e ⇒ - // TODO catching all and continue isn't good for OOME, ticket #1310 - log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) - } - } - } finally { - latch.countDown() - } + final def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { + if (atMost.isFinite()) { + if (!latch.await(atMost.length, atMost.unit)) + throw new TimeoutException("Await termination timed out after [%s]" format (atMost.toString)) + } else latch.await() - def ready(atMost: Duration)(implicit permit: CanAwait) = { - if (atMost == Duration.Inf) { - latch.await() - } else { - val opened = latch.await(atMost.toNanos, TimeUnit.NANOSECONDS) - if (!opened) throw new TimeoutException("Await termination timed out after [%s]" format (atMost.toString)) - } this } - def result(atMost: Duration)(implicit permit: CanAwait): Unit = ready(atMost) - + final def result(atMost: Duration)(implicit permit: CanAwait): Unit = ready(atMost) } }