diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 5ec6a34aee..c8404b3c02 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -398,7 +398,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path } } foreach { case cell: ActorCell ⇒ - System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.currentStatus + " " + cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index af28581891..9628b68970 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -790,7 +790,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, case _ ⇒ Logging.simpleName(cell) }) + (cell match { - case real: ActorCell ⇒ " status=" + real.mailbox.status + case real: ActorCell ⇒ " status=" + real.mailbox.currentStatus case _ ⇒ "" }) + " " + (cell.childrenRefs match { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 26b7c2e626..e2f9e0f4d2 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -72,7 +72,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ clearActorFields(failedActor) } } - assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status) + assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.currentStatus) if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) } else { // need to keep that suspend counter balanced @@ -118,7 +118,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ * Do create the actor in response to a failure. */ protected def faultCreate(): Unit = { - assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status) + assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.currentStatus) assert(perpetrator == self) setReceiveTimeout(Duration.Undefined) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 2da28b7a3b..f30851954a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -12,7 +12,7 @@ import akka.event.EventStream import com.typesafe.config.{ ConfigFactory, Config } import akka.util.{ Unsafe, Index } import scala.annotation.tailrec -import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } +import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool, ForkJoinWorkerThread } import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContextExecutor @@ -377,8 +377,16 @@ object ForkJoinExecutorConfigurator { threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { - override def execute(r: Runnable): Unit = - if (r eq null) throw new NullPointerException else super.execute(new AkkaForkJoinTask(r)) + override def execute(r: Runnable): Unit = { + if (r eq null) throw new NullPointerException("The Runnable must not be null") + val task = + if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]] + else new AkkaForkJoinTask(r) + Thread.currentThread match { + case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork() + case _ ⇒ super.execute(task) + } + } def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } @@ -391,6 +399,9 @@ object ForkJoinExecutorConfigurator { override def getRawResult(): Unit = () override def setRawResult(unit: Unit): Unit = () final override def exec(): Boolean = try { runnable.run(); true } catch { + case ie: InterruptedException ⇒ + Thread.currentThread.interrupt() + false case anything: Throwable ⇒ val t = Thread.currentThread t.getUncaughtExceptionHandler match { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index d81a01012a..575e82b952 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -14,6 +14,7 @@ import akka.event.Logging.Error import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.annotation.tailrec +import scala.concurrent.forkjoin.ForkJoinTask import scala.util.control.NonFatal import com.typesafe.config.Config import java.util.concurrent.atomic.AtomicInteger @@ -53,7 +54,7 @@ private[akka] object Mailbox { * INTERNAL API */ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) - extends SystemMessageQueue with Runnable { + extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { import Mailbox._ @@ -107,22 +108,22 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default @inline - final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) + final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline - final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0 + final def shouldProcessMessage: Boolean = (currentStatus & shouldNotProcessMask) == 0 @inline - final def suspendCount: Int = status / suspendUnit + final def suspendCount: Int = currentStatus / suspendUnit @inline - final def isSuspended: Boolean = (status & suspendMask) != 0 + final def isSuspended: Boolean = (currentStatus & suspendMask) != 0 @inline - final def isClosed: Boolean = status == Closed + final def isClosed: Boolean = currentStatus == Closed @inline - final def isScheduled: Boolean = (status & Scheduled) != 0 + final def isScheduled: Boolean = (currentStatus & Scheduled) != 0 @inline protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = @@ -139,7 +140,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * @return true if the suspend count reached zero */ @tailrec - final def resume(): Boolean = status match { + final def resume(): Boolean = currentStatus match { case Closed ⇒ setStatus(Closed); false case s ⇒ @@ -155,7 +156,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * @return true if the previous suspend count was zero */ @tailrec - final def suspend(): Boolean = status match { + final def suspend(): Boolean = currentStatus match { case Closed ⇒ setStatus(Closed); false case s ⇒ @@ -168,7 +169,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * status was Scheduled or not. */ @tailrec - final def becomeClosed(): Boolean = status match { + final def becomeClosed(): Boolean = currentStatus match { case Closed ⇒ setStatus(Closed); false case s ⇒ updateStatus(s, Closed) || becomeClosed() @@ -179,7 +180,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ @tailrec final def setAsScheduled(): Boolean = { - val s = status + val s = currentStatus /* * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with * Scheduled bit already set. @@ -193,7 +194,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ @tailrec final def setAsIdle(): Boolean = { - val s = status + val s = currentStatus updateStatus(s, s & ~Scheduled) || setAsIdle() } /* @@ -210,13 +211,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) // Without calling .head the parameters would be boxed in SystemMessageList wrapper. Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head) - final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { + final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = currentStatus match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } - final def run = { + override final def run(): Unit = { try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages @@ -228,6 +229,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } } + override final def getRawResult(): Unit = () + override final def setRawResult(unit: Unit): Unit = () + final override def exec(): Boolean = try { run(); false } catch { + case ie: InterruptedException ⇒ + Thread.currentThread.interrupt() + false + case anything: Throwable ⇒ + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null ⇒ + case some ⇒ some.uncaughtException(t, anything) + } + throw anything + } + /** * Process the messages in the mailbox */ diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala new file mode 100644 index 0000000000..dcc62a2b9c --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala @@ -0,0 +1,126 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.actor + +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +import scala.concurrent.duration._ + +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 20) +class ForkJoinActorBenchmark { + import ForkJoinActorBenchmark._ + + @Param(Array("1", "5")) + var tpt = 0 + + @Param(Array("1", "4")) + var threads = "" + + implicit var system: ActorSystem = _ + + @Setup(Level.Trial) + def setup() { + system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString( + s"""| akka { + | log-dead-letters = off + | actor { + | default-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 1 + | parallelism-factor = $threads + | parallelism-max = 64 + | } + | throughput = $tpt + | } + | } + | } + """.stripMargin)) + } + + @TearDown(Level.Trial) + def shutdown() { + system.shutdown() + system.awaitTermination() + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MILLISECONDS) + @OperationsPerInvocation(messages) + def pingPong = { + val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong]) + val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong]) + + ping.tell(message, pong) + + val p = TestProbe() + p.watch(ping) + p.expectTerminated(ping, timeout) + p.watch(pong) + p.expectTerminated(pong, timeout) + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MILLISECONDS) + @OperationsPerInvocation(messages) + def floodPipe = { + + val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None)) + val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end))) + val penultimate = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(middle))) + val beginning = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(penultimate))) + + val p = TestProbe() + p.watch(end) + + def send(left: Int): Unit = + if (left > 0) { + beginning ! message + send(left - 1) + } + + send(messages / 4) // we have 4 actors in the pipeline + + beginning ! stop + + p.expectTerminated(end, timeout) + } +} + +object ForkJoinActorBenchmark { + final val stop = "stop" + final val message = "message" + final val timeout = 15.seconds + final val messages = 400000 + class Pipe(next: Option[ActorRef]) extends Actor { + def receive = { + case m @ `message` => + if(next.isDefined) next.get forward m + case s @ `stop` => + context stop self + if(next.isDefined) next.get forward s + } + } + class PingPong extends Actor { + var left = messages / 2 + def receive = { + case `message` => + + if (left <= 1) + context stop self + + sender() ! message + left -= 1 + } + } +} \ No newline at end of file