Adding guards in FJDispatcher so that multiple FJDispatchers do not interact badly with one and another
This commit is contained in:
parent
6117e599d6
commit
9e4017be95
1 changed files with 10 additions and 6 deletions
|
|
@ -8,6 +8,7 @@ import akka.actor.ActorRef
|
|||
import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinPool, ForkJoinTask }
|
||||
import java.util.concurrent._
|
||||
import java.lang.UnsupportedOperationException
|
||||
import akka.event.EventHandler
|
||||
|
||||
/**
|
||||
* A Dispatcher that uses the ForkJoin library in scala.concurrent.forkjoin
|
||||
|
|
@ -53,8 +54,7 @@ class FJDispatcher(
|
|||
|
||||
override private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = {
|
||||
super.doneProcessingMailbox(mbox)
|
||||
if (FJDispatcher.isCurrentThreadFJThread)
|
||||
ForkJoinTask.helpQuiesce()
|
||||
ForkJoinTask.helpQuiesce()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -73,9 +73,13 @@ case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availa
|
|||
r match {
|
||||
case fjmbox: FJMailbox ⇒
|
||||
fjmbox.fjTask.reinitialize()
|
||||
if (FJDispatcher.isCurrentThreadFJThread) fjmbox.fjTask.fork()
|
||||
else super.execute[Unit](fjmbox.fjTask)
|
||||
case _ ⇒ super.execute(r)
|
||||
Thread.currentThread match {
|
||||
case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒
|
||||
fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected
|
||||
case _ ⇒ super.execute[Unit](fjmbox.fjTask)
|
||||
}
|
||||
case _ ⇒
|
||||
super.execute(r)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -100,7 +104,7 @@ trait FJMailbox { self: ExecutableMailbox ⇒
|
|||
def getRawResult() = result
|
||||
def setRawResult(v: Unit) { result = v }
|
||||
def exec() = {
|
||||
self.run()
|
||||
try { self.run() } catch { case t ⇒ EventHandler.error(t, self, "Exception in FJ Worker") }
|
||||
true
|
||||
}
|
||||
def run() { invoke() }
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue