Making sure that the current Threads' UEH is called when using Akka FJP in Dispatcher as ExecutionContext

This commit is contained in:
Viktor Klang 2013-01-28 23:43:55 +01:00
parent f52129b7dc
commit ff540d76ec
2 changed files with 17 additions and 19 deletions

View file

@ -4,23 +4,22 @@
package akka.actor
import java.io.Closeable
import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import com.typesafe.config.{ Config, ConfigFactory }
import akka.event._
import akka.dispatch._
import akka.japi.Util.immutableSeq
import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.dungeon.ChildrenContainer
import akka.util._
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.{ FiniteDuration, Duration }
import scala.concurrent.{ Await, Awaitable, CanAwait, Future }
import scala.concurrent.{ Await, Awaitable, CanAwait, Future, ExecutionContext }
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
import akka.util._
import java.io.Closeable
import akka.util.internal.{ HashedWheelTimer }
import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.dungeon.ChildrenContainer
import scala.concurrent.ExecutionContext
import scala.util.control.{ NonFatal, ControlThrowable }
object ActorSystem {
@ -465,7 +464,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
cause match {
case NonFatal(_) | _: InterruptedException log.error(cause, "Uncaught error from thread [{}]", thread.getName)
case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable log.error(cause, "Uncaught error from thread [{}]", thread.getName)
case _
if (settings.JvmExitOnFatalError) {
try {

View file

@ -487,10 +487,8 @@ object ForkJoinExecutorConfigurator {
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
override def execute(r: Runnable): Unit = r match {
case m: Mailbox super.execute(new MailboxExecutionTask(m))
case other super.execute(other)
}
override def execute(r: Runnable): Unit =
if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r))
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
@ -498,10 +496,11 @@ object ForkJoinExecutorConfigurator {
/**
* INTERNAL AKKA USAGE ONLY
*/
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
final override def setRawResult(u: Unit): Unit = ()
final override def getRawResult(): Unit = ()
final override def exec(): Boolean = try { mailbox.run; true } catch {
@SerialVersionUID(1L)
final class AkkaForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
override def getRawResult(): Unit = ()
override def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {