Switching to LBD and minimizing window of failure

This commit is contained in:
Viktor Klang 2012-01-20 17:38:58 +01:00
parent daff1fd2a0
commit ef5287a6d8

View file

@ -4,7 +4,6 @@
package akka.actor
import akka.config.ConfigurationException
import akka.actor._
import akka.event._
import akka.dispatch._
import akka.util.duration._
@ -12,24 +11,15 @@ import akka.util.Timeout
import akka.util.Timeout._
import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.io.File
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigResolveOptions
import com.typesafe.config.ConfigException
import akka.util.{ Helpers, Duration, ReflectiveAccess }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ CountDownLatch, Executors, ConcurrentHashMap }
import scala.annotation.tailrec
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
import java.io.Closeable
import java.util.concurrent.ConcurrentLinkedQueue
import akka.dispatch.Await.Awaitable
import akka.dispatch.Await.CanAwait
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.{ CountDownLatch, LinkedBlockingDeque, TimeUnit, TimeoutException }
object ActorSystem {
@ -529,39 +519,31 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
override def toString = lookupRoot.path.root.address.toString
class TerminationCallbacks extends Runnable with Awaitable[Unit] {
import scala.collection.JavaConverters._
private val callbacks = new ConcurrentLinkedQueue[Runnable]
private val callbacks = new LinkedBlockingDeque[Runnable]
private val latch = new CountDownLatch(1)
def add(callback: Runnable) {
callbacks add callback
final def add(callback: Runnable): Unit =
if (latch.await(0, TimeUnit.NANOSECONDS)) callbacks.clear() else callbacks addFirst callback
final def run(): Unit = {
@tailrec def runNext(): Unit = callbacks.pollFirst() match {
case null ()
case some
try some.run() catch { case e log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) }
runNext()
}
try runNext() finally latch.countDown()
}
def run(): Unit = try {
for (c callbacks.asScala.toSeq.reverse) {
try {
c.run()
} catch {
case e
// TODO catching all and continue isn't good for OOME, ticket #1310
log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage)
}
}
} finally {
latch.countDown()
}
final def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
if (atMost.isFinite()) {
if (!latch.await(atMost.length, atMost.unit))
throw new TimeoutException("Await termination timed out after [%s]" format (atMost.toString))
} else latch.await()
def ready(atMost: Duration)(implicit permit: CanAwait) = {
if (atMost == Duration.Inf) {
latch.await()
} else {
val opened = latch.await(atMost.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TimeoutException("Await termination timed out after [%s]" format (atMost.toString))
}
this
}
def result(atMost: Duration)(implicit permit: CanAwait): Unit = ready(atMost)
final def result(atMost: Duration)(implicit permit: CanAwait): Unit = ready(atMost)
}
}