Temporary fix for the throughput benchmark

This commit is contained in:
Viktor Klang 2011-11-14 19:19:44 +01:00
parent d14e524485
commit 66dd0123bc
4 changed files with 54 additions and 64 deletions

View file

@ -1,28 +1,33 @@
package akka.performance.microbench package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec import akka.performance.workbench.PerformanceSpec
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import akka.actor.Actor import akka.actor._
import akka.actor.ActorRef import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
import akka.actor.PoisonPill import akka.dispatch._
import akka.actor.Props import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 // -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TellThroughputPerformanceSpec extends PerformanceSpec { class TellThroughputPerformanceSpec extends PerformanceSpec {
import TellThroughputPerformanceSpec._ import TellThroughputPerformanceSpec._
val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config new Dispatcher(app, name, 5,
0, UnboundedMailbox(), config, 60000), ThreadPoolConfig(app))
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setFlowHandler(Left(new AbortPolicy))
.setCorePoolSize(maxClients) .setCorePoolSize(maxClients)
.build .build
val destinationDispatcher = app.dispatcherFactory.newDispatcher("destination-dispatcher") val clientDispatcher = createDispatcher("client-dispatcher")
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity val destinationDispatcher = createDispatcher("destination-dispatcher")
.setCorePoolSize(maxClients)
.build override def atTermination {
super.atTermination()
System.out.println("Cleaning up after TellThroughputPerformanceSpec")
clientDispatcher.shutdown()
destinationDispatcher.shutdown()
}
val repeat = 30000L * repeatFactor val repeat = 30000L * repeatFactor
@ -76,6 +81,27 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
val ok = latch.await(((5000000 + 500 * repeat) * timeDilation) / 100, TimeUnit.MICROSECONDS) val ok = latch.await(((5000000 + 500 * repeat) * timeDilation) / 100, TimeUnit.MICROSECONDS)
val durationNs = (System.nanoTime - start) val durationNs = (System.nanoTime - start)
if (!ok) {
System.err.println("Destinations: ")
destinations.foreach {
case l: LocalActorRef
val m = l.underlying.mailbox
System.err.println(" -" + l + " mbox(" + m.status + ")" + " containing [" + Stream.continually(m.dequeue()).takeWhile(_ != null).mkString(", ") + "] and has systemMsgs: " + m.hasSystemMessages)
}
System.err.println("")
System.err.println("Clients: ")
clients.foreach {
case l: LocalActorRef
val m = l.underlying.mailbox
System.err.println(" -" + l + " mbox(" + m.status + ")" + " containing [" + Stream.continually(m.dequeue()).takeWhile(_ != null).mkString(", ") + "] and has systemMsgs: " + m.hasSystemMessages)
}
val e = clientDispatcher.asInstanceOf[Dispatcher].executorService.get().asInstanceOf[ExecutorServiceDelegate].executor.asInstanceOf[ThreadPoolExecutor]
val q = e.getQueue
System.err.println("Client Dispatcher: " + e.getActiveCount + " " + Stream.continually(q.poll()).takeWhile(_ != null).mkString(", "))
}
if (!warmup) { if (!warmup) {
ok must be(true) ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat) logMeasurement(numberOfClients, durationNs, repeat)

View file

@ -74,7 +74,9 @@ class Dispatcher(
extends MessageDispatcher(_app) { extends MessageDispatcher(_app) {
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService
})
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
val mbox = receiver.mailbox val mbox = receiver.mailbox
@ -103,9 +105,11 @@ class Dispatcher(
protected[akka] def start {} protected[akka] def start {}
protected[akka] def shutdown { protected[akka] def shutdown {
val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) executorService.getAndSet(new ExecutorServiceDelegate {
if (old ne null) { lazy val executor = executorServiceFactory.createExecutorService
old.shutdown() }) match {
case null
case some some.shutdown()
} }
} }
@ -113,19 +117,13 @@ class Dispatcher(
* Returns if it was registered * Returns if it was registered
*/ */
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.shouldBeRegisteredForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.shouldBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) { if (mbox.setAsScheduled()) {
try { try {
executorService.get() execute mbox executorService.get() execute mbox
true true
} catch { } catch {
case e: RejectedExecutionException case e: RejectedExecutionException executorService.get() execute mbox; true //Retry once
try {
app.eventStream.publish(Warning(this, e.toString))
} finally {
mbox.setAsIdle()
}
throw e
} }
} else false } else false
} else false } else false

View file

@ -43,7 +43,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
final def status: Mailbox.Status = AbstractMailbox.updater.get(this) final def status: Mailbox.Status = AbstractMailbox.updater.get(this)
@inline @inline
final def isActive: Boolean = (status & 3) == Open final def shouldProcessMessage: Boolean = (status & 3) == Open
@inline @inline
final def isSuspended: Boolean = (status & 3) == Suspended final def isSuspended: Boolean = (status & 3) == Suspended
@ -62,16 +62,6 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
protected final def setStatus(newStatus: Status): Unit = protected final def setStatus(newStatus: Status): Unit =
AbstractMailbox.updater.set(this, newStatus) AbstractMailbox.updater.set(this, newStatus)
/**
* Internal method to enforce a volatile write of the status
*/
@tailrec
final def acknowledgeStatus() {
val s = status
if (updateStatus(s, s)) ()
else acknowledgeStatus()
}
/** /**
* set new primary status Open. Caller does not need to worry about whether * set new primary status Open. Caller does not need to worry about whether
* status was Scheduled or not. * status was Scheduled or not.
@ -128,12 +118,8 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
* without Scheduled bit set (this is one of the reasons why the numbers * without Scheduled bit set (this is one of the reasons why the numbers
* cannot be changed in object Mailbox above) * cannot be changed in object Mailbox above)
*/ */
if (s >= Scheduled) {
updateStatus(s, s & ~Scheduled) || setAsIdle() updateStatus(s, s & ~Scheduled) || setAsIdle()
} else {
acknowledgeStatus() // this write is needed to make memory consistent after processMailbox()
false
}
} }
/* /*
@ -142,15 +128,14 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this) protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this)
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new) protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new)
def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { def shouldBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed false case Closed false
case _ hasSystemMessageHint || hasSystemMessages case _ hasSystemMessageHint || hasSystemMessages
} }
final def run = { final def run = {
try processMailbox() try processMailbox() finally {
finally {
setAsIdle() setAsIdle()
dispatcher.registerForExecution(this, false, false) dispatcher.registerForExecution(this, false, false)
} }
@ -164,7 +149,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
final def processMailbox() { final def processMailbox() {
processAllSystemMessages() //First, process all system messages processAllSystemMessages() //First, process all system messages
if (isActive) { if (shouldProcessMessage) {
var nextMessage = dequeue() var nextMessage = dequeue()
if (nextMessage ne null) { //If we have a message if (nextMessage ne null) { //If we have a message
if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping
@ -175,7 +160,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
actor invoke nextMessage actor invoke nextMessage
processAllSystemMessages() //After we're done, process all system messages processAllSystemMessages() //After we're done, process all system messages
nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries nextMessage = if (shouldProcessMessage) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
processedMessages += 1 processedMessages += 1
if ((processedMessages >= dispatcher.throughput) || (dispatcher.isThroughputDeadlineTimeDefined && System.nanoTime >= deadlineNs)) // If we're throttled, break out if ((processedMessages >= dispatcher.throughput) || (dispatcher.isThroughputDeadlineTimeDefined && System.nanoTime >= deadlineNs)) // If we're throttled, break out
null //We reached our boundaries, abort null //We reached our boundaries, abort

View file

@ -269,22 +269,3 @@ trait ExecutorServiceDelegate extends ExecutorService {
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
} }
/**
* An ExecutorService that only creates the underlying Executor if any of the methods of the ExecutorService are called
*/
trait LazyExecutorService extends ExecutorServiceDelegate {
def createExecutor: ExecutorService
lazy val executor = {
createExecutor
}
}
/**
* A concrete implementation of LazyExecutorService (Scala API)
*/
class LazyExecutorServiceWrapper(executorFactory: ExecutorService) extends LazyExecutorService {
def createExecutor = executorFactory
}