diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 15886973b2..deafb9cdc1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -547,7 +547,7 @@ object BalancingDispatcherModelSpec { mailboxType, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), - config.getInt("buddy-wakeup-threshold")) with MessageDispatcherInterceptor + config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index b7e0563339..9e0ee70ff1 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -247,12 +247,10 @@ akka { # com.typesafe.config.Config parameter. mailbox-type = "" - # For BalancingDispatcher: if during message enqueuing the target actor is - # already busy and at least this number of messages is currently in the queue, - # then wake up another actor from the same dispatcher at random. - # Set to -1 to disable (which will also skip the possibly expensive check; - # obtaining the mailbox size is O(n) for the default mailboxes). - buddy-wakeup-threshold = 5 + # For BalancingDispatcher: If the balancing dispatcher should attempt to + # schedule idle actors using the same dispatcher when a message comes in, + # and the dispatchers ExecutorService is not fully busy already. + attempt-teamwork = on } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 22eadb55d5..6046e249af 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -156,7 +156,10 @@ trait ExecutionContext { * log the problem or whatever is appropriate for the implementation. */ def reportFailure(t: Throwable): Unit +} +private[akka] trait LoadMetrics { self: Executor ⇒ + def atFullThrottle(): Boolean } object MessageDispatcher { @@ -447,11 +450,13 @@ object ForkJoinExecutorConfigurator { final class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) - extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) { + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { override def execute(r: Runnable): Unit = r match { case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) case other ⇒ super.execute(other) } + + def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 61ac773aa0..d2d978341c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -4,15 +4,11 @@ package akka.dispatch -import util.DynamicVariable import akka.actor.{ ActorCell, ActorRef } -import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import annotation.tailrec -import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } import java.util.{ Comparator, Iterator } -import akka.util.Unsafe -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -36,7 +32,7 @@ class BalancingDispatcher( mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, _shutdownTimeout: Duration, - buddyWakeupThreshold: Int) + attemptTeamWork: Boolean) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell]( @@ -48,34 +44,12 @@ class BalancingDispatcher( case UnboundedMailbox() ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] - - override def enqueue(receiver: ActorRef, handle: Envelope) = { - super.enqueue(receiver, handle) - _pressure.getAndIncrement() - } - - override def dequeue(): Envelope = - super.dequeue() match { - case null ⇒ null - case x ⇒ _pressure.getAndDecrement(); x - } } case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { final val queue = new LinkedBlockingQueue[Envelope](cap) final val pushTimeOut = timeout - - override def enqueue(receiver: ActorRef, handle: Envelope) = { - super.enqueue(receiver, handle) - _pressure.getAndIncrement() - } - - override def dequeue(): Envelope = - super.dequeue() match { - case null ⇒ null - case x ⇒ _pressure.getAndDecrement(); x - } } case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") @@ -83,8 +57,6 @@ class BalancingDispatcher( protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) - private val _pressure = new AtomicLong - class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) @@ -123,11 +95,15 @@ class BalancingDispatcher( override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - if (!registerForExecution(receiver.mailbox, false, false) && - buddyWakeupThreshold >= 0 && - _pressure.get >= buddyWakeupThreshold) scheduleOne() + if (!registerForExecution(receiver.mailbox, false, false) && doTeamWork) scheduleOne() } + protected def doTeamWork(): Boolean = + attemptTeamWork && (executorService.get().executor match { + case lm: LoadMetrics ⇒ lm.atFullThrottle == false + case other ⇒ true + }) + @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index a735ea367e..2046f02286 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -32,12 +32,11 @@ class Dispatcher( val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { - protected[akka] val executorServiceFactory: ExecutorServiceFactory = + protected val executorServiceFactory: ExecutorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) - protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { - lazy val executor = executorServiceFactory.createExecutorService - }) + protected val executorService = new AtomicReference[ExecutorServiceDelegate]( + new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index b9fd3f784b..5f4528146d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -190,7 +190,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), - config.getInt("buddy-wakeup-threshold")) + config.getBoolean("attempt-teamwork")) /** * Returns the same dispatcher instance for each invocation diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 1c63831013..b6fd432296 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -81,14 +81,16 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def extends ExecutorServiceFactoryProvider { class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = { - val service = new ThreadPoolExecutor( + val service: ThreadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, - rejectionPolicy) + rejectionPolicy) with LoadMetrics { + def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize + } service.allowCoreThreadTimeOut(allowCorePoolTimeout) service } @@ -182,7 +184,7 @@ case class MonitorableThreadFactory(name: String, protected def wire[T <: Thread](t: T): T = { t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) - contextClassLoader foreach (t.setContextClassLoader(_)) + contextClassLoader foreach t.setContextClassLoader t } }