=act #16152 - Elides the need to allocate an AkkaForkJoinTask for Mailbox-submission to registerForExecution,

by having Mailbox extend ForkJoinTask and using the fact that ForkJoinTask.exec when returning
false does not set completion on the task, so it is free to be resubmitted to the ForkJoinPool
without reinitialization.

Also adds the ability to use fork() when the currentThread is a worker thread of the pool that we want to execute on.

Adds a JMH benchmark for both the ping-pong performance and pipelined throughput.

Conflicts:
	project/AkkaBuild.scala
This commit is contained in:
Viktor Klang 2014-10-29 11:24:40 +01:00
parent b00a89d713
commit 439be7d99a
6 changed files with 174 additions and 21 deletions

View file

@ -398,7 +398,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path } def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path }
} foreach { } foreach {
case cell: ActorCell case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.currentStatus + " "
+ cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size) + cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size)
} }

View file

@ -790,7 +790,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
case _ Logging.simpleName(cell) case _ Logging.simpleName(cell)
}) + }) +
(cell match { (cell match {
case real: ActorCell " status=" + real.mailbox.status case real: ActorCell " status=" + real.mailbox.currentStatus
case _ "" case _ ""
}) + }) +
" " + (cell.childrenRefs match { " " + (cell.childrenRefs match {

View file

@ -72,7 +72,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
clearActorFields(failedActor) clearActorFields(failedActor)
} }
} }
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status) assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus)
if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
} else { } else {
// need to keep that suspend counter balanced // need to keep that suspend counter balanced
@ -118,7 +118,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
* Do create the actor in response to a failure. * Do create the actor in response to a failure.
*/ */
protected def faultCreate(): Unit = { protected def faultCreate(): Unit = {
assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status) assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.currentStatus)
assert(perpetrator == self) assert(perpetrator == self)
setReceiveTimeout(Duration.Undefined) setReceiveTimeout(Duration.Undefined)

View file

@ -12,7 +12,7 @@ import akka.event.EventStream
import com.typesafe.config.{ ConfigFactory, Config } import com.typesafe.config.{ ConfigFactory, Config }
import akka.util.{ Unsafe, Index } import akka.util.{ Unsafe, Index }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool, ForkJoinWorkerThread }
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
@ -377,8 +377,16 @@ object ForkJoinExecutorConfigurator {
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler) unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
override def execute(r: Runnable): Unit = override def execute(r: Runnable): Unit = {
if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) if (r eq null) throw new NullPointerException("The Runnable must not be null")
val task =
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
else new AkkaForkJoinTask(r)
Thread.currentThread match {
case worker: ForkJoinWorkerThread if worker.getPool eq this task.fork()
case _ super.execute(task)
}
}
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
} }
@ -391,6 +399,9 @@ object ForkJoinExecutorConfigurator {
override def getRawResult(): Unit = () override def getRawResult(): Unit = ()
override def setRawResult(unit: Unit): Unit = () override def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch { final override def exec(): Boolean = try { runnable.run(); true } catch {
case ie: InterruptedException
Thread.currentThread.interrupt()
false
case anything: Throwable case anything: Throwable
val t = Thread.currentThread val t = Thread.currentThread
t.getUncaughtExceptionHandler match { t.getUncaughtExceptionHandler match {

View file

@ -14,6 +14,7 @@ import akka.event.Logging.Error
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.forkjoin.ForkJoinTask
import scala.util.control.NonFatal import scala.util.control.NonFatal
import com.typesafe.config.Config import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@ -53,7 +54,7 @@ private[akka] object Mailbox {
* INTERNAL API * INTERNAL API
*/ */
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable { extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
import Mailbox._ import Mailbox._
@ -107,22 +108,22 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default
@inline @inline
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline @inline
final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0 final def shouldProcessMessage: Boolean = (currentStatus & shouldNotProcessMask) == 0
@inline @inline
final def suspendCount: Int = status / suspendUnit final def suspendCount: Int = currentStatus / suspendUnit
@inline @inline
final def isSuspended: Boolean = (status & suspendMask) != 0 final def isSuspended: Boolean = (currentStatus & suspendMask) != 0
@inline @inline
final def isClosed: Boolean = status == Closed final def isClosed: Boolean = currentStatus == Closed
@inline @inline
final def isScheduled: Boolean = (status & Scheduled) != 0 final def isScheduled: Boolean = (currentStatus & Scheduled) != 0
@inline @inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
@ -139,7 +140,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* @return true if the suspend count reached zero * @return true if the suspend count reached zero
*/ */
@tailrec @tailrec
final def resume(): Boolean = status match { final def resume(): Boolean = currentStatus match {
case Closed case Closed
setStatus(Closed); false setStatus(Closed); false
case s case s
@ -155,7 +156,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* @return true if the previous suspend count was zero * @return true if the previous suspend count was zero
*/ */
@tailrec @tailrec
final def suspend(): Boolean = status match { final def suspend(): Boolean = currentStatus match {
case Closed case Closed
setStatus(Closed); false setStatus(Closed); false
case s case s
@ -168,7 +169,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* status was Scheduled or not. * status was Scheduled or not.
*/ */
@tailrec @tailrec
final def becomeClosed(): Boolean = status match { final def becomeClosed(): Boolean = currentStatus match {
case Closed case Closed
setStatus(Closed); false setStatus(Closed); false
case s updateStatus(s, Closed) || becomeClosed() case s updateStatus(s, Closed) || becomeClosed()
@ -179,7 +180,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/ */
@tailrec @tailrec
final def setAsScheduled(): Boolean = { final def setAsScheduled(): Boolean = {
val s = status val s = currentStatus
/* /*
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set. * Scheduled bit already set.
@ -193,7 +194,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/ */
@tailrec @tailrec
final def setAsIdle(): Boolean = { final def setAsIdle(): Boolean = {
val s = status val s = currentStatus
updateStatus(s, s & ~Scheduled) || setAsIdle() updateStatus(s, s & ~Scheduled) || setAsIdle()
} }
/* /*
@ -210,13 +211,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
// Without calling .head the parameters would be boxed in SystemMessageList wrapper. // Without calling .head the parameters would be boxed in SystemMessageList wrapper.
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head) Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head)
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = currentStatus match {
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed false case Closed false
case _ hasSystemMessageHint || hasSystemMessages case _ hasSystemMessageHint || hasSystemMessages
} }
final def run = { override final def run(): Unit = {
try { try {
if (!isClosed) { //Volatile read, needed here if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages processAllSystemMessages() //First, deal with any system messages
@ -228,6 +229,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
} }
} }
override final def getRawResult(): Unit = ()
override final def setRawResult(unit: Unit): Unit = ()
final override def exec(): Boolean = try { run(); false } catch {
case ie: InterruptedException
Thread.currentThread.interrupt()
false
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some some.uncaughtException(t, anything)
}
throw anything
}
/** /**
* Process the messages in the mailbox * Process the messages in the mailbox
*/ */

View file

@ -0,0 +1,126 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 20)
class ForkJoinActorBenchmark {
import ForkJoinActorBenchmark._
@Param(Array("1", "5"))
var tpt = 0
@Param(Array("1", "4"))
var threads = ""
implicit var system: ActorSystem = _
@Setup(Level.Trial)
def setup() {
system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString(
s"""| akka {
| log-dead-letters = off
| actor {
| default-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 1
| parallelism-factor = $threads
| parallelism-max = 64
| }
| throughput = $tpt
| }
| }
| }
""".stripMargin))
}
@TearDown(Level.Trial)
def shutdown() {
system.shutdown()
system.awaitTermination()
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def pingPong = {
val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
ping.tell(message, pong)
val p = TestProbe()
p.watch(ping)
p.expectTerminated(ping, timeout)
p.watch(pong)
p.expectTerminated(pong, timeout)
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def floodPipe = {
val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None))
val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end)))
val penultimate = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(middle)))
val beginning = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(penultimate)))
val p = TestProbe()
p.watch(end)
def send(left: Int): Unit =
if (left > 0) {
beginning ! message
send(left - 1)
}
send(messages / 4) // we have 4 actors in the pipeline
beginning ! stop
p.expectTerminated(end, timeout)
}
}
object ForkJoinActorBenchmark {
final val stop = "stop"
final val message = "message"
final val timeout = 15.seconds
final val messages = 400000
class Pipe(next: Option[ActorRef]) extends Actor {
def receive = {
case m @ `message` =>
if(next.isDefined) next.get forward m
case s @ `stop` =>
context stop self
if(next.isDefined) next.get forward s
}
}
class PingPong extends Actor {
var left = messages / 2
def receive = {
case `message` =>
if (left <= 1)
context stop self
sender() ! message
left -= 1
}
}
}