From 9e4017be95df2a5e7e8af7cf3e73a7c5d2f5f7a7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 6 Jul 2011 14:52:17 +0200 Subject: [PATCH] Adding guards in FJDispatcher so that multiple FJDispatchers do not interact badly with one and another --- .../main/scala/akka/dispatch/FJDispatcher.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/FJDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/FJDispatcher.scala index a2eb391b07..6abfd19f8f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/FJDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/FJDispatcher.scala @@ -8,6 +8,7 @@ import akka.actor.ActorRef import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinPool, ForkJoinTask } import java.util.concurrent._ import java.lang.UnsupportedOperationException +import akka.event.EventHandler /** * A Dispatcher that uses the ForkJoin library in scala.concurrent.forkjoin @@ -53,8 +54,7 @@ class FJDispatcher( override private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = { super.doneProcessingMailbox(mbox) - if (FJDispatcher.isCurrentThreadFJThread) - ForkJoinTask.helpQuiesce() + ForkJoinTask.helpQuiesce() } } @@ -73,9 +73,13 @@ case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availa r match { case fjmbox: FJMailbox ⇒ fjmbox.fjTask.reinitialize() - if (FJDispatcher.isCurrentThreadFJThread) fjmbox.fjTask.fork() - else super.execute[Unit](fjmbox.fjTask) - case _ ⇒ super.execute(r) + Thread.currentThread match { + case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒ + fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected + case _ ⇒ super.execute[Unit](fjmbox.fjTask) + } + case _ ⇒ + super.execute(r) } } @@ -100,7 +104,7 @@ trait FJMailbox { self: ExecutableMailbox ⇒ def getRawResult() = result def setRawResult(v: Unit) { result = v } def exec() = { - self.run() + try { self.run() } catch { case t ⇒ EventHandler.error(t, self, "Exception in FJ Worker") } true } def run() { invoke() }