diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8a5bcfa385..0cf6c4a77b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -14,9 +14,9 @@ import akka.event.EventStream import com.typesafe.config.Config import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension -import akka.jsr166y.ForkJoinPool import akka.util.NonFatal import akka.event.Logging.LogEventException +import akka.jsr166y.{ ForkJoinTask, ForkJoinPool } final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -424,7 +424,41 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr threadPoolConfig.createExecutorServiceFactory(name, threadFactory) } +object ForkJoinExecutorConfigurator { + + /** + * INTERNAL AKKA USAGE ONLY + */ + final class AkkaForkJoinPool(parallelism: Int, + threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + unhandledExceptionHandler: Thread.UncaughtExceptionHandler) + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) { + override def execute(r: Runnable): Unit = r match { + case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) + case other ⇒ super.execute(other) + } + } + + /** + * INTERNAL AKKA USAGE ONLY + */ + final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] { + final override def setRawResult(u: Unit): Unit = () + final override def getRawResult(): Unit = () + final override def exec(): Boolean = try { mailbox.run; true } catch { + case anything ⇒ + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null ⇒ + case some ⇒ some.uncaughtException(t, anything) + } + throw anything + } + } +} + class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { + import ForkJoinExecutorConfigurator._ def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match { case correct: ForkJoinPool.ForkJoinWorkerThreadFactory ⇒ correct @@ -433,7 +467,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = new ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true) + def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing) } final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = new ForkJoinExecutorServiceFactory( diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 3aa6103b22..27853b49db 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -328,7 +328,7 @@ trait MailboxType { * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } @@ -339,7 +339,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut @@ -347,7 +347,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) } @@ -358,7 +358,7 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut