Merge branch 'wip-1800-balancing-è'

This commit is contained in:
Roland 2012-02-13 20:44:32 +01:00
commit c9b8062fe7
14 changed files with 161 additions and 102 deletions

View file

@ -110,8 +110,9 @@ object ActorModelSpec {
val stops = new AtomicLong(0)
def getStats(actorRef: ActorRef) = {
stats.putIfAbsent(actorRef, new InterceptorStats) match {
case null stats.get(actorRef)
val is = new InterceptorStats
stats.putIfAbsent(actorRef, is) match {
case null is
case other other
}
}
@ -127,12 +128,12 @@ object ActorModelSpec {
}
protected[akka] abstract override def register(actor: ActorCell) {
getStats(actor.self).registers.incrementAndGet()
assert(getStats(actor.self).registers.incrementAndGet() == 1)
super.register(actor)
}
protected[akka] abstract override def unregister(actor: ActorCell) {
getStats(actor.self).unregisters.incrementAndGet()
assert(getStats(actor.self).unregisters.incrementAndGet() == 1)
super.unregister(actor)
}
@ -368,13 +369,18 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
val buddies = dispatcher.buddies
val mq = dispatcher.messageQueue
System.err.println("Buddies left: ")
buddies.toArray foreach {
System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants)
buddies.toArray sorted new Ordering[AnyRef] {
def compare(l: AnyRef, r: AnyRef) = (l, r) match {
case (ll: ActorCell, rr: ActorCell) ll.self.path.toString.compareTo(rr.self.path.toString)
}
} foreach {
case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain()))
}
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ")
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages)
Iterator.continually(mq.dequeue) takeWhile (_ ne null) foreach System.err.println
case _
}
@ -540,7 +546,8 @@ object BalancingDispatcherModelSpec {
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor
override def dispatcher(): MessageDispatcher = instance
}

View file

@ -53,6 +53,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
c.getMilliseconds("shutdown-timeout") must equal(1 * 1000)
c.getInt("throughput") must equal(5)
c.getMilliseconds("throughput-deadline-time") must equal(0)
c.getBoolean("attempt-teamwork") must equal(true)
}
//Fork join executor config

View file

@ -11,6 +11,7 @@ import akka.util.duration._
import akka.actor.ActorRef
import java.util.concurrent.atomic.AtomicInteger
import akka.pattern.ask
import akka.util.Duration
object ResizerSpec {
@ -160,53 +161,48 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
// as influenced by the backlog of blocking pooled actors
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 4,
lowerBound = 3,
upperBound = 5,
rampupRate = 0.1,
backoffRate = 0.0,
pressureThreshold = 1,
messagesPerResize = 1,
backoffThreshold = 0.0)
val router = system.actorOf(Props(new Actor {
def receive = {
case (n: Int, latch: TestLatch, count: AtomicInteger)
(n millis).dilated.sleep
count.incrementAndGet
latch.countDown()
case d: Duration d.dilated.sleep; sender ! "done"
case "echo" sender ! "reply"
}
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
// first message should create the minimum number of routees
router ! 1
router ! "echo"
expectMsg("reply")
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
def routees(r: ActorRef): Int = {
r ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size
}
def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = {
(100 millis).dilated.sleep
for (m 0 until loops) {
router.!((t, latch, count))
(100 millis).dilated.sleep
}
routees(router) must be(3)
def loop(loops: Int, d: Duration) = {
for (m 0 until loops) router ! d
for (m 0 until loops) expectMsg(d * 3, "done")
}
// 2 more should go thru without triggering more
val count1 = new AtomicInteger
val latch1 = TestLatch(2)
loop(2, 200, latch1, count1)
Await.ready(latch1, TestLatch.DefaultTimeout)
count1.get must be(2)
loop(2, 200 millis)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
routees(router) must be(3)
// a whole bunch should max it out
val count2 = new AtomicInteger
val latch2 = TestLatch(10)
loop(10, 500, latch2, count2)
Await.ready(latch2, TestLatch.DefaultTimeout)
count2.get must be(10)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(4)
loop(10, 500 millis)
awaitCond(routees(router) > 3)
loop(10, 500 millis)
awaitCond(routees(router) == 5)
}
"backoff" in {
@ -239,7 +235,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
(300 millis).dilated.sleep
// let it cool down
for (m 0 to 3) {
for (m 0 to 5) {
router ! 1
(500 millis).dilated.sleep
}

View file

@ -245,6 +245,11 @@ akka {
# mailbox is used. The Class of the FQCN must have a constructor with a
# com.typesafe.config.Config parameter.
mailbox-type = ""
# For BalancingDispatcher: If the balancing dispatcher should attempt to
# schedule idle actors using the same dispatcher when a message comes in,
# and the dispatchers ExecutorService is not fully busy already.
attempt-teamwork = on
}
debug {

View file

@ -285,14 +285,18 @@ private[akka] class ActorCell(
final def isTerminated: Boolean = mailbox.isClosed
final def start(): Unit = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
mailbox = dispatcher.createMailbox(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Create())
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
}

View file

@ -156,7 +156,10 @@ trait ExecutionContext {
* log the problem or whatever is appropriate for the implementation.
*/
def reportFailure(t: Throwable): Unit
}
private[akka] trait LoadMetrics { self: Executor
def atFullThrottle(): Boolean
}
object MessageDispatcher {
@ -185,9 +188,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
def id: String
/**
* Attaches the specified actor instance to this dispatcher
* Attaches the specified actor instance to this dispatcher, which includes
* scheduling it to run for the first time (Create() is expected to have
* been enqueued by the ActorCell upon mailbox creation).
*/
final def attach(actor: ActorCell): Unit = register(actor)
final def attach(actor: ActorCell): Unit = {
register(actor)
registerForExecution(actor.mailbox, false, true)
}
/**
* Detaches the specified actor instance from this dispatcher
@ -243,7 +251,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
() if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown()
/**
* If you override it, you must call it. But only ever once. See "attach" for only invocation
* If you override it, you must call it. But only ever once. See "attach" for only invocation.
*/
protected[akka] def register(actor: ActorCell) {
inhabitantsUpdater.incrementAndGet(this)
@ -260,6 +268,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
mailBox.cleanUp()
}
def inhabitants: Long = inhabitantsUpdater.get(this)
private val shutdownAction = new Runnable {
@tailrec
final def run() {
@ -440,11 +450,13 @@ object ForkJoinExecutorConfigurator {
final class AkkaForkJoinPool(parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) {
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
override def execute(r: Runnable): Unit = r match {
case m: Mailbox super.execute(new MailboxExecutionTask(m))
case other super.execute(other)
}
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
/**

View file

@ -4,12 +4,11 @@
package akka.dispatch
import util.DynamicVariable
import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet }
import annotation.tailrec
import java.util.concurrent.atomic.AtomicBoolean
import akka.util.Duration
import akka.util.{ Duration, Helpers }
import java.util.{ Comparator, Iterator }
import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet }
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -32,20 +31,27 @@ class BalancingDispatcher(
throughputDeadlineTime: Duration,
mailboxType: MailboxType,
_executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
_shutdownTimeout: Duration)
_shutdownTimeout: Duration,
attemptTeamWork: Boolean)
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) {
val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
val rebalance = new AtomicBoolean(false)
val buddies = new ConcurrentSkipListSet[ActorCell](
Helpers.identityHashComparator(new Comparator[ActorCell] {
def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path
}))
val messageQueue: MessageQueue = mailboxType match {
case u: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
}
case BoundedMailbox(cap, timeout) new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](cap)
final val pushTimeOut = timeout
}
case UnboundedMailbox()
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
}
case BoundedMailbox(cap, timeout)
new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](cap)
final val pushTimeOut = timeout
}
case other throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]")
}
@ -84,30 +90,20 @@ class BalancingDispatcher(
protected[akka] override def unregister(actor: ActorCell) = {
buddies.remove(actor)
super.unregister(actor)
intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray
if (messageQueue.hasMessages) scheduleOne()
}
def intoTheFray(except: ActorCell): Unit =
if (rebalance.compareAndSet(false, true)) {
try {
val i = buddies.iterator()
@tailrec
def throwIn(): Unit = {
val n = if (i.hasNext) i.next() else null
if (n eq null) ()
else if ((n ne except) && registerForExecution(n.mailbox, false, false)) ()
else throwIn()
}
throwIn()
} finally {
rebalance.set(false)
}
}
override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
registerForExecution(receiver.mailbox, false, false)
intoTheFray(except = receiver)
if (!registerForExecution(receiver.mailbox, false, false) && doTeamWork) scheduleOne()
}
}
protected def doTeamWork(): Boolean =
attemptTeamWork && (executorService.get().executor match {
case lm: LoadMetrics lm.atFullThrottle == false
case other true
})
@tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit =
if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i)
}

View file

@ -32,12 +32,11 @@ class Dispatcher(
val shutdownTimeout: Duration)
extends MessageDispatcher(_prerequisites) {
protected[akka] val executorServiceFactory: ExecutorServiceFactory =
protected val executorServiceFactory: ExecutorServiceFactory =
executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory)
protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService
})
protected val executorService = new AtomicReference[ExecutorServiceDelegate](
new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService })
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
val mbox = receiver.mailbox

View file

@ -189,7 +189,8 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
config.getBoolean("attempt-teamwork"))
/**
* Returns the same dispatcher instance for each invocation

View file

@ -190,7 +190,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
var nextMessage = systemDrain()
try {
while ((nextMessage ne null) && !isClosed) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs)
if (debug) println(actor.self + " processing system message " + nextMessage + " with " +
(if (actor.childrenRefs.isEmpty) "no children"
else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children"
else actor.childrenRefs.mkString("children:\n ", "\n ", "")))
actor systemInvoke nextMessage
nextMessage = nextMessage.next
// dont ever execute normal message when system message present!
@ -236,15 +239,26 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
}
trait MessageQueue {
/*
* These method need to be implemented in subclasses; they should not rely on the internal stuff above.
/**
* Try to enqueue the message to this queue, or throw an exception.
*/
def enqueue(receiver: ActorRef, handle: Envelope)
def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed
/**
* Try to dequeue the next message from this queue, return null failing that.
*/
def dequeue(): Envelope
/**
* Should return the current number of messages held in this queue; may
* always return 0 if no other value is available efficiently. Do not use
* this for testing for presence of messages, use `hasMessages` instead.
*/
def numberOfMessages: Int
/**
* Indicates whether this queue is non-empty.
*/
def hasMessages: Boolean
}
@ -292,15 +306,15 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
}
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
final def dequeue(): Envelope = queue.poll()
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def dequeue(): Envelope = queue.poll()
}
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingQueue[Envelope]
final def enqueue(receiver: ActorRef, handle: Envelope) {
def enqueue(receiver: ActorRef, handle: Envelope) {
if (pushTimeOut.length > 0) {
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
@ -308,13 +322,13 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
} else queue put handle
}
final def dequeue(): Envelope = queue.poll()
def dequeue(): Envelope = queue.poll()
}
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
final def numberOfMessages = queue.size
final def hasMessages = !queue.isEmpty
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
}
/**

View file

@ -81,14 +81,16 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
extends ExecutorServiceFactoryProvider {
class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = {
val service = new ThreadPoolExecutor(
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
threadTimeout.length,
threadTimeout.unit,
queueFactory(),
threadFactory,
rejectionPolicy)
rejectionPolicy) with LoadMetrics {
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
}
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
service
}
@ -182,7 +184,7 @@ case class MonitorableThreadFactory(name: String,
protected def wire[T <: Thread](t: T): T = {
t.setUncaughtExceptionHandler(exceptionHandler)
t.setDaemon(daemonic)
contextClassLoader foreach (t.setContextClassLoader(_))
contextClassLoader foreach t.setContextClassLoader
t
}
}

View file

@ -1028,7 +1028,8 @@ case class DefaultResizer(
*/
def capacity(routees: IndexedSeq[ActorRef]): Int = {
val currentSize = routees.size
val delta = filter(pressure(routees), currentSize)
val press = pressure(routees)
val delta = filter(press, currentSize)
val proposed = currentSize + delta
if (proposed < lowerBound) delta + (lowerBound - proposed)
@ -1058,7 +1059,7 @@ case class DefaultResizer(
case a: LocalActorRef
val cell = a.underlying
pressureThreshold match {
case 1 cell.mailbox.isScheduled && cell.currentMessage != null
case 1 cell.mailbox.isScheduled && cell.mailbox.hasMessages
case i if i < 1 cell.mailbox.isScheduled && cell.currentMessage != null
case threshold cell.mailbox.numberOfMessages >= threshold
}

View file

@ -21,8 +21,18 @@ object Helpers {
if (diff > 0) 1 else if (diff < 0) -1 else 0
}
val IdentityHashComparator = new Comparator[AnyRef] {
def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b)
/**
* Create a comparator which will efficiently use `System.identityHashCode`,
* unless that happens to be the same for two non-equals objects, in which
* case the supplied real comparator is used; the comparator must be
* consistent with equals, otherwise it would not be an enhancement over
* the identityHashCode.
*/
def identityHashComparator[T <: AnyRef](comp: Comparator[T]): Comparator[T] = new Comparator[T] {
def compare(a: T, b: T): Int = compareIdentityHash(a, b) match {
case 0 if a != b comp.compare(a, b)
case x x
}
}
final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~"

View file

@ -8,7 +8,7 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList
import scala.annotation.tailrec
import com.typesafe.config.Config
import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.util.duration.intToDurationInt
import akka.util.{ Switch, Duration }
@ -132,6 +132,17 @@ class CallingThreadDispatcher(
protected[akka] override def shutdownTimeout = 1 second
protected[akka] override def register(actor: ActorCell): Unit = {
super.register(actor)
actor.mailbox match {
case mbox: CallingThreadMailbox
val queue = mbox.queue
queue.enter
runQueue(mbox, queue)
case x throw new ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass)
}
}
override def suspend(actor: ActorCell) {
actor.mailbox match {
case m: CallingThreadMailbox m.suspendSwitch.switchOn