diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index abb5495b45..91a875a442 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -381,7 +381,10 @@ object ForkJoinExecutorConfigurator { unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true) override def execute(r: Runnable): Unit = - if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) + if (r ne null) + super.execute((if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]]) + else + throw new NullPointerException("Runnable was null") def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index b7c1d7104c..ef0d09fca3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -11,8 +11,8 @@ import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, De import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe } import akka.util.Helpers.ConfigOps import akka.event.Logging.Error -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.forkjoin.ForkJoinTask import scala.annotation.tailrec import scala.util.control.NonFatal import com.typesafe.config.Config @@ -53,7 +53,7 @@ private[akka] object Mailbox { * INTERNAL API */ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) - extends SystemMessageQueue with Runnable { + extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { import Mailbox._ @@ -228,6 +228,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } } + override final def getRawResult(): Unit = () + override final def setRawResult(unit: Unit): Unit = () + final override def exec(): Boolean = try { run(); false } catch { + case ie: InterruptedException ⇒ + Thread.currentThread.interrupt() + false + case anything: Throwable ⇒ + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null ⇒ + case some ⇒ some.uncaughtException(t, anything) + } + throw anything + } + /** * Process the messages in the mailbox */