From ff540d76ecdfbb2aa311d2c471f5ec583c8f7531 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 28 Jan 2013 23:43:55 +0100 Subject: [PATCH] Making sure that the current Threads' UEH is called when using Akka FJP in Dispatcher as ExecutionContext --- .../main/scala/akka/actor/ActorSystem.scala | 21 +++++++++---------- .../akka/dispatch/AbstractDispatcher.scala | 15 +++++++------ 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index f4398a1180..0e9705fb65 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -4,23 +4,22 @@ package akka.actor +import java.io.Closeable +import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } +import java.util.concurrent.TimeUnit.MILLISECONDS +import com.typesafe.config.{ Config, ConfigFactory } import akka.event._ import akka.dispatch._ import akka.japi.Util.immutableSeq -import com.typesafe.config.{ Config, ConfigFactory } +import akka.actor.dungeon.ChildrenContainer +import akka.util._ import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration.{ FiniteDuration, Duration } -import scala.concurrent.{ Await, Awaitable, CanAwait, Future } +import scala.concurrent.{ Await, Awaitable, CanAwait, Future, ExecutionContext } import scala.util.{ Failure, Success } -import scala.util.control.NonFatal -import akka.util._ -import java.io.Closeable -import akka.util.internal.{ HashedWheelTimer } -import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.dungeon.ChildrenContainer -import scala.concurrent.ExecutionContext +import scala.util.control.{ NonFatal, ControlThrowable } + object ActorSystem { @@ -465,7 +464,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable): Unit = { cause match { - case NonFatal(_) | _: InterruptedException ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName) + case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName) case _ ⇒ if (settings.JvmExitOnFatalError) { try { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index e6f7dccd37..9739a7284a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -487,10 +487,8 @@ object ForkJoinExecutorConfigurator { threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { - override def execute(r: Runnable): Unit = r match { - case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) - case other ⇒ super.execute(other) - } + override def execute(r: Runnable): Unit = + if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } @@ -498,10 +496,11 @@ object ForkJoinExecutorConfigurator { /** * INTERNAL AKKA USAGE ONLY */ - final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] { - final override def setRawResult(u: Unit): Unit = () - final override def getRawResult(): Unit = () - final override def exec(): Boolean = try { mailbox.run; true } catch { + @SerialVersionUID(1L) + final class AkkaForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] { + override def getRawResult(): Unit = () + override def setRawResult(unit: Unit): Unit = () + final override def exec(): Boolean = try { runnable.run(); true } catch { case anything: Throwable ⇒ val t = Thread.currentThread t.getUncaughtExceptionHandler match {