diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala new file mode 100644 index 0000000000..e3e0d48880 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala @@ -0,0 +1,66 @@ +package akka.dispatch + +import akka.actor.{ ActorRef, Actor, Props } +import akka.testkit.{ ImplicitSender, AkkaSpec } +import com.typesafe.config.ConfigFactory + +object ForkJoinPoolStarvationSpec { + val config = ConfigFactory.parseString( + """ + |actorhang { + | + | task-dispatcher { + | mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + | throughput = 100 + | fork-join-executor { + | parallelism-factor = 2 + | parallelism-max = 2 + | parallelism-min = 2 + | } + | } + |} + """.stripMargin) + + class SelfBusyActor extends Actor { + self ! "tick" + + override def receive = { + case "tick" ⇒ + self ! "tick" + } + } + + class InnocentActor extends Actor { + + override def receive = { + case "ping" ⇒ + sender ! "All fine" + } + } + +} + +class ForkJoinPoolStarvationSpec extends AkkaSpec(ForkJoinPoolStarvationSpec.config) with ImplicitSender { + import ForkJoinPoolStarvationSpec._ + + val Iterations = 1000 + + "AkkaForkJoinPool" must { + + "not starve tasks arriving from external dispatchers under high internal traffic" in { + // Two busy actors that will occupy the threads of the dispatcher + // Since they submit to the local task queue via fork, they can starve external submissions + system.actorOf(Props(new SelfBusyActor).withDispatcher("actorhang.task-dispatcher")) + system.actorOf(Props(new SelfBusyActor).withDispatcher("actorhang.task-dispatcher")) + + val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("actorhang.task-dispatcher")) + + for (_ ← 1 to Iterations) { + // External task submission via the default dispatcher + innocentActor ! "ping" + expectMsg("All fine") + } + } + + } +} diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index e8d0601175..095c4f2aac 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -12,7 +12,7 @@ import akka.event.EventStream import com.typesafe.config.{ ConfigFactory, Config } import akka.util.{ Unsafe, Index } import scala.annotation.tailrec -import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool, ForkJoinWorkerThread } +import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContextExecutor @@ -377,16 +377,8 @@ object ForkJoinExecutorConfigurator { threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { - override def execute(r: Runnable): Unit = { - if (r eq null) throw new NullPointerException("The Runnable must not be null") - val task = - if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]] - else new AkkaForkJoinTask(r) - Thread.currentThread match { - case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork() - case _ ⇒ super.execute(task) - } - } + override def execute(r: Runnable): Unit = + if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index f95c188ad6..b7c1d7104c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -14,7 +14,6 @@ import akka.event.Logging.Error import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.annotation.tailrec -import scala.concurrent.forkjoin.ForkJoinTask import scala.util.control.NonFatal import com.typesafe.config.Config import java.util.concurrent.atomic.AtomicInteger @@ -54,7 +53,7 @@ private[akka] object Mailbox { * INTERNAL API */ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) - extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { + extends SystemMessageQueue with Runnable { import Mailbox._ @@ -229,21 +228,6 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } } - override final def getRawResult(): Unit = () - override final def setRawResult(unit: Unit): Unit = () - final override def exec(): Boolean = try { run(); false } catch { - case ie: InterruptedException ⇒ - Thread.currentThread.interrupt() - false - case anything: Throwable ⇒ - val t = Thread.currentThread - t.getUncaughtExceptionHandler match { - case null ⇒ - case some ⇒ some.uncaughtException(t, anything) - } - throw anything - } - /** * Process the messages in the mailbox */