From 66dd0123bc3856f8107b52b67818c6da3095bc6d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 14 Nov 2011 19:19:44 +0100 Subject: [PATCH] Temporary fix for the throughput benchmark --- .../TellThroughputPerformanceSpec.scala | 48 ++++++++++++++----- .../main/scala/akka/dispatch/Dispatcher.scala | 22 ++++----- .../main/scala/akka/dispatch/Mailbox.scala | 29 +++-------- .../akka/dispatch/ThreadPoolBuilder.scala | 19 -------- 4 files changed, 54 insertions(+), 64 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index d264b82c93..bf0a4dc0eb 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -1,28 +1,33 @@ package akka.performance.microbench import akka.performance.workbench.PerformanceSpec -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit import org.apache.commons.math.stat.descriptive.DescriptiveStatistics -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.PoisonPill -import akka.actor.Props +import akka.actor._ +import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } +import akka.dispatch._ +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy // -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TellThroughputPerformanceSpec extends PerformanceSpec { import TellThroughputPerformanceSpec._ - val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") + def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(app, name, 5, + 0, UnboundedMailbox(), config, 60000), ThreadPoolConfig(app)) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + .setFlowHandler(Left(new AbortPolicy)) .setCorePoolSize(maxClients) .build - val destinationDispatcher = app.dispatcherFactory.newDispatcher("destination-dispatcher") - .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - .setCorePoolSize(maxClients) - .build + val clientDispatcher = createDispatcher("client-dispatcher") + val destinationDispatcher = createDispatcher("destination-dispatcher") + + override def atTermination { + super.atTermination() + System.out.println("Cleaning up after TellThroughputPerformanceSpec") + clientDispatcher.shutdown() + destinationDispatcher.shutdown() + } val repeat = 30000L * repeatFactor @@ -76,6 +81,27 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { val ok = latch.await(((5000000 + 500 * repeat) * timeDilation) / 100, TimeUnit.MICROSECONDS) val durationNs = (System.nanoTime - start) + if (!ok) { + System.err.println("Destinations: ") + destinations.foreach { + case l: LocalActorRef ⇒ + val m = l.underlying.mailbox + System.err.println(" -" + l + " mbox(" + m.status + ")" + " containing [" + Stream.continually(m.dequeue()).takeWhile(_ != null).mkString(", ") + "] and has systemMsgs: " + m.hasSystemMessages) + } + System.err.println("") + System.err.println("Clients: ") + + clients.foreach { + case l: LocalActorRef ⇒ + val m = l.underlying.mailbox + System.err.println(" -" + l + " mbox(" + m.status + ")" + " containing [" + Stream.continually(m.dequeue()).takeWhile(_ != null).mkString(", ") + "] and has systemMsgs: " + m.hasSystemMessages) + } + + val e = clientDispatcher.asInstanceOf[Dispatcher].executorService.get().asInstanceOf[ExecutorServiceDelegate].executor.asInstanceOf[ThreadPoolExecutor] + val q = e.getQueue + System.err.println("Client Dispatcher: " + e.getActiveCount + " " + Stream.continually(q.poll()).takeWhile(_ != null).mkString(", ")) + } + if (!warmup) { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 89c1647c5c..f6386bef82 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -74,7 +74,9 @@ class Dispatcher( extends MessageDispatcher(_app) { protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) - protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) + protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { + lazy val executor = executorServiceFactory.createExecutorService + }) protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox @@ -103,9 +105,11 @@ class Dispatcher( protected[akka] def start {} protected[akka] def shutdown { - val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) - if (old ne null) { - old.shutdown() + executorService.getAndSet(new ExecutorServiceDelegate { + lazy val executor = executorServiceFactory.createExecutorService + }) match { + case null ⇒ + case some ⇒ some.shutdown() } } @@ -113,19 +117,13 @@ class Dispatcher( * Returns if it was registered */ protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { - if (mbox.shouldBeRegisteredForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races + if (mbox.shouldBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { executorService.get() execute mbox true } catch { - case e: RejectedExecutionException ⇒ - try { - app.eventStream.publish(Warning(this, e.toString)) - } finally { - mbox.setAsIdle() - } - throw e + case e: RejectedExecutionException ⇒ executorService.get() execute mbox; true //Retry once } } else false } else false diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 745bade6b8..07163d8a77 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -43,7 +43,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag final def status: Mailbox.Status = AbstractMailbox.updater.get(this) @inline - final def isActive: Boolean = (status & 3) == Open + final def shouldProcessMessage: Boolean = (status & 3) == Open @inline final def isSuspended: Boolean = (status & 3) == Suspended @@ -62,16 +62,6 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag protected final def setStatus(newStatus: Status): Unit = AbstractMailbox.updater.set(this, newStatus) - /** - * Internal method to enforce a volatile write of the status - */ - @tailrec - final def acknowledgeStatus() { - val s = status - if (updateStatus(s, s)) () - else acknowledgeStatus() - } - /** * set new primary status Open. Caller does not need to worry about whether * status was Scheduled or not. @@ -128,12 +118,8 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag * without Scheduled bit set (this is one of the reasons why the numbers * cannot be changed in object Mailbox above) */ - if (s >= Scheduled) { - updateStatus(s, s & ~Scheduled) || setAsIdle() - } else { - acknowledgeStatus() // this write is needed to make memory consistent after processMailbox() - false - } + + updateStatus(s, s & ~Scheduled) || setAsIdle() } /* @@ -142,15 +128,14 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this) protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new) - def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { + def shouldBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } final def run = { - try processMailbox() - finally { + try processMailbox() finally { setAsIdle() dispatcher.registerForExecution(this, false, false) } @@ -164,7 +149,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag final def processMailbox() { processAllSystemMessages() //First, process all system messages - if (isActive) { + if (shouldProcessMessage) { var nextMessage = dequeue() if (nextMessage ne null) { //If we have a message if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping @@ -175,7 +160,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag actor invoke nextMessage processAllSystemMessages() //After we're done, process all system messages - nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries + nextMessage = if (shouldProcessMessage) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 if ((processedMessages >= dispatcher.throughput) || (dispatcher.isThroughputDeadlineTimeDefined && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index b1a9547ccf..6a9c2c55c4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -269,22 +269,3 @@ trait ExecutorServiceDelegate extends ExecutorService { def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) } - -/** - * An ExecutorService that only creates the underlying Executor if any of the methods of the ExecutorService are called - */ -trait LazyExecutorService extends ExecutorServiceDelegate { - - def createExecutor: ExecutorService - - lazy val executor = { - createExecutor - } -} - -/** - * A concrete implementation of LazyExecutorService (Scala API) - */ -class LazyExecutorServiceWrapper(executorFactory: ⇒ ExecutorService) extends LazyExecutorService { - def createExecutor = executorFactory -}