Merge pull request #18754 from akka/wip-18613-optimize-repointable-patriknw

=act #18613 Optimize RepointableActorRef for only sys msg
This commit is contained in:
drewhk 2015-11-12 10:56:13 +01:00
commit fc75eb361a
2 changed files with 70 additions and 23 deletions

View file

@ -183,18 +183,36 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
private[this] final val lock = new ReentrantLock
// use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK
private[this] final val queue = new JLinkedList[Any]()
private[this] final val queue = new JLinkedList[Envelope]()
// ACCESS MUST BE PROTECTED BY THE LOCK
private[this] var sysmsgQueue: LatestFirstSystemMessageList = SystemMessageList.LNil
import systemImpl.settings.UnstartedPushTimeout.{ duration timeout }
def replaceWith(cell: Cell): Unit = locked {
try {
while (!queue.isEmpty) {
queue.poll() match {
case s: SystemMessage cell.sendSystemMessage(s)
case e: Envelope cell.sendMessage(e)
def drainSysmsgQueue(): Unit = {
// using while in case a sys msg enqueues another sys msg
while (sysmsgQueue.nonEmpty) {
var sysQ = sysmsgQueue.reverse
sysmsgQueue = SystemMessageList.LNil
while (sysQ.nonEmpty) {
val msg = sysQ.head
sysQ = sysQ.tail
msg.unlink()
cell.sendSystemMessage(msg)
}
}
}
drainSysmsgQueue()
while (!queue.isEmpty) {
cell.sendMessage(queue.poll())
// drain sysmsgQueue in case a msg enqueues a sys msg
drainSysmsgQueue()
}
} finally {
self.swapCell(cell)
}
@ -236,25 +254,11 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
lock.lock // we cannot lose system messages, ever, and we cannot throw an Error from here as well
try {
val cell = self.underlying
if (cellIsReady(cell)) {
if (cellIsReady(cell))
cell.sendSystemMessage(msg)
} else {
// systemMessages that are sent during replace need to jump to just after the last system message in the queue, so it's processed before other messages
val wasEnqueued = if ((self.lookup ne this) && (self.underlying eq this) && !queue.isEmpty()) {
@tailrec def tryEnqueue(i: JListIterator[Any] = queue.listIterator(), insertIntoIndex: Int = -1): Boolean =
if (i.hasNext())
tryEnqueue(i,
if (i.next().isInstanceOf[SystemMessage]) i.nextIndex() // update last sysmsg seen so far
else insertIntoIndex) // or just keep the last seen one
else if (insertIntoIndex == -1) queue.offer(msg)
else Try(queue.add(insertIntoIndex, msg)).isSuccess
tryEnqueue()
} else queue.offer(msg)
if (!wasEnqueued) {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure"))
system.deadLetters ! DeadLetter(msg, self, self)
} else if (Mailbox.debug) println(s"$self temp queueing system $msg")
else {
sysmsgQueue ::= msg
if (Mailbox.debug) println(s"$self temp queueing system $msg")
}
} finally lock.unlock()
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.routing.RoundRobinPool
import akka.testkit.TestActors
import akka.testkit.TestProbe
import org.openjdk.jmh.annotations._
import java.util.concurrent.TimeUnit
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Fork(3)
@Warmup(iterations = 20)
@Measurement(iterations = 100)
class RouterPoolCreationBenchmark {
implicit val system: ActorSystem = ActorSystem()
val probe = TestProbe()
Props[TestActors.EchoActor]
@Param(Array("1000", "2000", "3000", "4000"))
var size = 0
@TearDown(Level.Trial)
def shutdown() {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@Benchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
def testCreation: Boolean = {
val pool = system.actorOf(RoundRobinPool(size).props(TestActors.echoActorProps))
pool.tell("hello", probe.ref)
probe.expectMsg(5.seconds, "hello")
true
}
}