Merge pull request #308 from jboner/wip-1751-1769-fj-uhe-√
Creating our own FJTask and escalate exceptions to UHE
This commit is contained in:
commit
4e886827bb
2 changed files with 40 additions and 6 deletions
|
|
@ -14,9 +14,9 @@ import akka.event.EventStream
|
|||
import com.typesafe.config.Config
|
||||
import akka.util.ReflectiveAccess
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.jsr166y.ForkJoinPool
|
||||
import akka.util.NonFatal
|
||||
import akka.event.Logging.LogEventException
|
||||
import akka.jsr166y.{ ForkJoinTask, ForkJoinPool }
|
||||
|
||||
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
|
||||
if (message.isInstanceOf[AnyRef]) {
|
||||
|
|
@ -424,7 +424,41 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
|
|||
threadPoolConfig.createExecutorServiceFactory(name, threadFactory)
|
||||
}
|
||||
|
||||
object ForkJoinExecutorConfigurator {
|
||||
|
||||
/**
|
||||
* INTERNAL AKKA USAGE ONLY
|
||||
*/
|
||||
final class AkkaForkJoinPool(parallelism: Int,
|
||||
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
|
||||
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
|
||||
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) {
|
||||
override def execute(r: Runnable): Unit = r match {
|
||||
case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m))
|
||||
case other ⇒ super.execute(other)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL AKKA USAGE ONLY
|
||||
*/
|
||||
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
|
||||
final override def setRawResult(u: Unit): Unit = ()
|
||||
final override def getRawResult(): Unit = ()
|
||||
final override def exec(): Boolean = try { mailbox.run; true } catch {
|
||||
case anything ⇒
|
||||
val t = Thread.currentThread
|
||||
t.getUncaughtExceptionHandler match {
|
||||
case null ⇒
|
||||
case some ⇒ some.uncaughtException(t, anything)
|
||||
}
|
||||
throw anything
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
|
||||
import ForkJoinExecutorConfigurator._
|
||||
|
||||
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match {
|
||||
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory ⇒ correct
|
||||
|
|
@ -433,7 +467,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
|
|||
|
||||
class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
|
||||
val parallelism: Int) extends ExecutorServiceFactory {
|
||||
def createExecutorService: ExecutorService = new ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true)
|
||||
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)
|
||||
}
|
||||
final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
|
||||
new ForkJoinExecutorServiceFactory(
|
||||
|
|
|
|||
|
|
@ -328,7 +328,7 @@ trait MailboxType {
|
|||
* It's a case class for Java (new UnboundedMailbox)
|
||||
*/
|
||||
case class UnboundedMailbox() extends MailboxType {
|
||||
override def create(receiver: ActorContext) =
|
||||
final override def create(receiver: ActorContext): Mailbox =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||
}
|
||||
|
|
@ -339,7 +339,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
|
|||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
||||
override def create(receiver: ActorContext) =
|
||||
final override def create(receiver: ActorContext): Mailbox =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new LinkedBlockingQueue[Envelope](capacity)
|
||||
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
|
||||
|
|
@ -347,7 +347,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
|
|||
}
|
||||
|
||||
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
|
||||
override def create(receiver: ActorContext) =
|
||||
final override def create(receiver: ActorContext): Mailbox =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
|
||||
}
|
||||
|
|
@ -358,7 +358,7 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
|
|||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
||||
override def create(receiver: ActorContext) =
|
||||
final override def create(receiver: ActorContext): Mailbox =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
|
||||
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue