=act - Reinstates the Mailbox optimization reusing the FJT instance but doesn't use FJT.fork() due to unfairness

This commit is contained in:
Viktor Klang 2015-05-11 13:21:05 +02:00 committed by Patrik Nordwall
parent c39e41c45b
commit db5a32fa04
2 changed files with 22 additions and 4 deletions

View file

@ -11,8 +11,8 @@ import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, De
import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe }
import akka.util.Helpers.ConfigOps
import akka.event.Logging.Error
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.forkjoin.ForkJoinTask
import scala.annotation.tailrec
import scala.util.control.NonFatal
import com.typesafe.config.Config
@ -53,7 +53,7 @@ private[akka] object Mailbox {
* INTERNAL API
*/
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
import Mailbox._
@ -228,6 +228,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
}
}
override final def getRawResult(): Unit = ()
override final def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { run(); false } catch {
case ie: InterruptedException
Thread.currentThread.interrupt()
false
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some some.uncaughtException(t, anything)
}
throw anything
}
/**
* Process the messages in the mailbox
*/