From 7b22aa499dd8e19c2d30c17ce32eb65021ea32d2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 21 Oct 2015 14:47:49 +0200 Subject: [PATCH] =act #18613 Optimize RepointableActorRef for sys msg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Before the cell is started the RepointableActorRef queue the messages (both system and non-system messages) into a queue. System messages are inserted right before the last non-system message by iterating the queue, i.e. becomes slow when there are many system messages already enqueued. * This is a problem for pools with many routees. * The fix is to keep system messages in a separate queue JMH benchmark before: [info] Benchmark (size) Mode Cnt Score Error Units [info] RouterPoolCreationBenchmark.testCreation 1000 ss 300 13204.048 ± 3081.576 us/op [info] RouterPoolCreationBenchmark.testCreation 2000 ss 300 41939.524 ± 6178.087 us/op [info] RouterPoolCreationBenchmark.testCreation 3000 ss 300 70752.881 ± 4344.992 us/op [info] RouterPoolCreationBenchmark.testCreation 4000 ss 300 120620.885 ± 3296.342 us/op JMH benchmark after: [info] Benchmark (size) Mode Cnt Score Error Units [info] RouterPoolCreationBenchmark.testCreation 1000 ss 300 7738.721 ± 1806.297 us/op [info] RouterPoolCreationBenchmark.testCreation 2000 ss 300 15497.588 ± 4532.852 us/op [info] RouterPoolCreationBenchmark.testCreation 3000 ss 300 28704.005 ± 6322.458 us/op [info] RouterPoolCreationBenchmark.testCreation 4000 ss 300 37783.516 ± 6778.437 us/op fully drain sysmsgQueue when swap cell --- .../akka/actor/RepointableActorRef.scala | 50 ++++++++++--------- .../actor/RouterPoolCreationBenchmark.scala | 43 ++++++++++++++++ 2 files changed, 70 insertions(+), 23 deletions(-) create mode 100644 akka-bench-jmh/src/main/scala/akka/actor/RouterPoolCreationBenchmark.scala diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index c00b67f7b5..7b24c015e2 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -183,18 +183,36 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, private[this] final val lock = new ReentrantLock // use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK - private[this] final val queue = new JLinkedList[Any]() + private[this] final val queue = new JLinkedList[Envelope]() + + // ACCESS MUST BE PROTECTED BY THE LOCK + private[this] var sysmsgQueue: LatestFirstSystemMessageList = SystemMessageList.LNil import systemImpl.settings.UnstartedPushTimeout.{ duration ⇒ timeout } def replaceWith(cell: Cell): Unit = locked { try { - while (!queue.isEmpty) { - queue.poll() match { - case s: SystemMessage ⇒ cell.sendSystemMessage(s) - case e: Envelope ⇒ cell.sendMessage(e) + def drainSysmsgQueue(): Unit = { + // using while in case a sys msg enqueues another sys msg + while (sysmsgQueue.nonEmpty) { + var sysQ = sysmsgQueue.reverse + sysmsgQueue = SystemMessageList.LNil + while (sysQ.nonEmpty) { + val msg = sysQ.head + sysQ = sysQ.tail + msg.unlink() + cell.sendSystemMessage(msg) + } } } + + drainSysmsgQueue() + + while (!queue.isEmpty) { + cell.sendMessage(queue.poll()) + // drain sysmsgQueue in case a msg enqueues a sys msg + drainSysmsgQueue() + } } finally { self.swapCell(cell) } @@ -236,25 +254,11 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, lock.lock // we cannot lose system messages, ever, and we cannot throw an Error from here as well try { val cell = self.underlying - if (cellIsReady(cell)) { + if (cellIsReady(cell)) cell.sendSystemMessage(msg) - } else { - // systemMessages that are sent during replace need to jump to just after the last system message in the queue, so it's processed before other messages - val wasEnqueued = if ((self.lookup ne this) && (self.underlying eq this) && !queue.isEmpty()) { - @tailrec def tryEnqueue(i: JListIterator[Any] = queue.listIterator(), insertIntoIndex: Int = -1): Boolean = - if (i.hasNext()) - tryEnqueue(i, - if (i.next().isInstanceOf[SystemMessage]) i.nextIndex() // update last sysmsg seen so far - else insertIntoIndex) // or just keep the last seen one - else if (insertIntoIndex == -1) queue.offer(msg) - else Try(queue.add(insertIntoIndex, msg)).isSuccess - tryEnqueue() - } else queue.offer(msg) - - if (!wasEnqueued) { - system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) - system.deadLetters ! DeadLetter(msg, self, self) - } else if (Mailbox.debug) println(s"$self temp queueing system $msg") + else { + sysmsgQueue ::= msg + if (Mailbox.debug) println(s"$self temp queueing system $msg") } } finally lock.unlock() } diff --git a/akka-bench-jmh/src/main/scala/akka/actor/RouterPoolCreationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/RouterPoolCreationBenchmark.scala new file mode 100644 index 0000000000..191b2d01cc --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/RouterPoolCreationBenchmark.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.actor + +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.routing.RoundRobinPool +import akka.testkit.TestActors +import akka.testkit.TestProbe +import org.openjdk.jmh.annotations._ +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.SingleShotTime)) +@Fork(3) +@Warmup(iterations = 20) +@Measurement(iterations = 100) +class RouterPoolCreationBenchmark { + implicit val system: ActorSystem = ActorSystem() + val probe = TestProbe() + + Props[TestActors.EchoActor] + + @Param(Array("1000", "2000", "3000", "4000")) + var size = 0 + + @TearDown(Level.Trial) + def shutdown() { + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) + } + + @Benchmark + @OutputTimeUnit(TimeUnit.MICROSECONDS) + def testCreation: Boolean = { + val pool = system.actorOf(RoundRobinPool(size).props(TestActors.echoActorProps)) + pool.tell("hello", probe.ref) + probe.expectMsg(5.seconds, "hello") + true + } +} +