diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index c00b67f7b5..7b24c015e2 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -183,18 +183,36 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, private[this] final val lock = new ReentrantLock // use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK - private[this] final val queue = new JLinkedList[Any]() + private[this] final val queue = new JLinkedList[Envelope]() + + // ACCESS MUST BE PROTECTED BY THE LOCK + private[this] var sysmsgQueue: LatestFirstSystemMessageList = SystemMessageList.LNil import systemImpl.settings.UnstartedPushTimeout.{ duration ⇒ timeout } def replaceWith(cell: Cell): Unit = locked { try { - while (!queue.isEmpty) { - queue.poll() match { - case s: SystemMessage ⇒ cell.sendSystemMessage(s) - case e: Envelope ⇒ cell.sendMessage(e) + def drainSysmsgQueue(): Unit = { + // using while in case a sys msg enqueues another sys msg + while (sysmsgQueue.nonEmpty) { + var sysQ = sysmsgQueue.reverse + sysmsgQueue = SystemMessageList.LNil + while (sysQ.nonEmpty) { + val msg = sysQ.head + sysQ = sysQ.tail + msg.unlink() + cell.sendSystemMessage(msg) + } } } + + drainSysmsgQueue() + + while (!queue.isEmpty) { + cell.sendMessage(queue.poll()) + // drain sysmsgQueue in case a msg enqueues a sys msg + drainSysmsgQueue() + } } finally { self.swapCell(cell) } @@ -236,25 +254,11 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, lock.lock // we cannot lose system messages, ever, and we cannot throw an Error from here as well try { val cell = self.underlying - if (cellIsReady(cell)) { + if (cellIsReady(cell)) cell.sendSystemMessage(msg) - } else { - // systemMessages that are sent during replace need to jump to just after the last system message in the queue, so it's processed before other messages - val wasEnqueued = if ((self.lookup ne this) && (self.underlying eq this) && !queue.isEmpty()) { - @tailrec def tryEnqueue(i: JListIterator[Any] = queue.listIterator(), insertIntoIndex: Int = -1): Boolean = - if (i.hasNext()) - tryEnqueue(i, - if (i.next().isInstanceOf[SystemMessage]) i.nextIndex() // update last sysmsg seen so far - else insertIntoIndex) // or just keep the last seen one - else if (insertIntoIndex == -1) queue.offer(msg) - else Try(queue.add(insertIntoIndex, msg)).isSuccess - tryEnqueue() - } else queue.offer(msg) - - if (!wasEnqueued) { - system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) - system.deadLetters ! DeadLetter(msg, self, self) - } else if (Mailbox.debug) println(s"$self temp queueing system $msg") + else { + sysmsgQueue ::= msg + if (Mailbox.debug) println(s"$self temp queueing system $msg") } } finally lock.unlock() } diff --git a/akka-bench-jmh/src/main/scala/akka/actor/RouterPoolCreationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/RouterPoolCreationBenchmark.scala new file mode 100644 index 0000000000..191b2d01cc --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/RouterPoolCreationBenchmark.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.actor + +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.routing.RoundRobinPool +import akka.testkit.TestActors +import akka.testkit.TestProbe +import org.openjdk.jmh.annotations._ +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.SingleShotTime)) +@Fork(3) +@Warmup(iterations = 20) +@Measurement(iterations = 100) +class RouterPoolCreationBenchmark { + implicit val system: ActorSystem = ActorSystem() + val probe = TestProbe() + + Props[TestActors.EchoActor] + + @Param(Array("1000", "2000", "3000", "4000")) + var size = 0 + + @TearDown(Level.Trial) + def shutdown() { + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) + } + + @Benchmark + @OutputTimeUnit(TimeUnit.MICROSECONDS) + def testCreation: Boolean = { + val pool = system.actorOf(RoundRobinPool(size).props(TestActors.echoActorProps)) + pool.tell("hello", probe.ref) + probe.expectMsg(5.seconds, "hello") + true + } +} +