Merge pull request #17413 from drewhk/wip-17341-fjp-revert-fwdport-drewhk
=act #17341: Revert starvation prone optimization (for validation)
This commit is contained in:
commit
2d70599d22
3 changed files with 70 additions and 28 deletions
|
|
@ -0,0 +1,66 @@
|
|||
package akka.dispatch
|
||||
|
||||
import akka.actor.{ ActorRef, Actor, Props }
|
||||
import akka.testkit.{ ImplicitSender, AkkaSpec }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object ForkJoinPoolStarvationSpec {
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|actorhang {
|
||||
|
|
||||
| task-dispatcher {
|
||||
| mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||
| throughput = 100
|
||||
| fork-join-executor {
|
||||
| parallelism-factor = 2
|
||||
| parallelism-max = 2
|
||||
| parallelism-min = 2
|
||||
| }
|
||||
| }
|
||||
|}
|
||||
""".stripMargin)
|
||||
|
||||
class SelfBusyActor extends Actor {
|
||||
self ! "tick"
|
||||
|
||||
override def receive = {
|
||||
case "tick" ⇒
|
||||
self ! "tick"
|
||||
}
|
||||
}
|
||||
|
||||
class InnocentActor extends Actor {
|
||||
|
||||
override def receive = {
|
||||
case "ping" ⇒
|
||||
sender ! "All fine"
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ForkJoinPoolStarvationSpec extends AkkaSpec(ForkJoinPoolStarvationSpec.config) with ImplicitSender {
|
||||
import ForkJoinPoolStarvationSpec._
|
||||
|
||||
val Iterations = 1000
|
||||
|
||||
"AkkaForkJoinPool" must {
|
||||
|
||||
"not starve tasks arriving from external dispatchers under high internal traffic" in {
|
||||
// Two busy actors that will occupy the threads of the dispatcher
|
||||
// Since they submit to the local task queue via fork, they can starve external submissions
|
||||
system.actorOf(Props(new SelfBusyActor).withDispatcher("actorhang.task-dispatcher"))
|
||||
system.actorOf(Props(new SelfBusyActor).withDispatcher("actorhang.task-dispatcher"))
|
||||
|
||||
val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("actorhang.task-dispatcher"))
|
||||
|
||||
for (_ ← 1 to Iterations) {
|
||||
// External task submission via the default dispatcher
|
||||
innocentActor ! "ping"
|
||||
expectMsg("All fine")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -12,7 +12,7 @@ import akka.event.EventStream
|
|||
import com.typesafe.config.{ ConfigFactory, Config }
|
||||
import akka.util.{ Unsafe, Index }
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool, ForkJoinWorkerThread }
|
||||
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool }
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.ExecutionContextExecutor
|
||||
|
|
@ -377,16 +377,8 @@ object ForkJoinExecutorConfigurator {
|
|||
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
|
||||
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
|
||||
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
|
||||
override def execute(r: Runnable): Unit = {
|
||||
if (r eq null) throw new NullPointerException("The Runnable must not be null")
|
||||
val task =
|
||||
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
|
||||
else new AkkaForkJoinTask(r)
|
||||
Thread.currentThread match {
|
||||
case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork()
|
||||
case _ ⇒ super.execute(task)
|
||||
}
|
||||
}
|
||||
override def execute(r: Runnable): Unit =
|
||||
if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r))
|
||||
|
||||
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import akka.event.Logging.Error
|
|||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.forkjoin.ForkJoinTask
|
||||
import scala.util.control.NonFatal
|
||||
import com.typesafe.config.Config
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
|
@ -54,7 +53,7 @@ private[akka] object Mailbox {
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
|
||||
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
|
||||
extends SystemMessageQueue with Runnable {
|
||||
|
||||
import Mailbox._
|
||||
|
||||
|
|
@ -229,21 +228,6 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
|
|||
}
|
||||
}
|
||||
|
||||
override final def getRawResult(): Unit = ()
|
||||
override final def setRawResult(unit: Unit): Unit = ()
|
||||
final override def exec(): Boolean = try { run(); false } catch {
|
||||
case ie: InterruptedException ⇒
|
||||
Thread.currentThread.interrupt()
|
||||
false
|
||||
case anything: Throwable ⇒
|
||||
val t = Thread.currentThread
|
||||
t.getUncaughtExceptionHandler match {
|
||||
case null ⇒
|
||||
case some ⇒ some.uncaughtException(t, anything)
|
||||
}
|
||||
throw anything
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the messages in the mailbox
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue