clean up BalancingDispatcher:

- change from messageQueue.numberOfMessages to maintaining an AtomicLong
  for performance reasons
- add comments/scaladoc where missing
- remove some assert()s
- fix ResiserSpec to employ buddy-wakeup-threshold
This commit is contained in:
Roland 2012-02-13 12:38:59 +01:00
parent 5a9ec45d01
commit 251a7cc7e3
6 changed files with 74 additions and 22 deletions

View file

@ -352,7 +352,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
def flood(num: Int) {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
val stopLatch = new CountDownLatch(num)
val waitTime = (20 seconds).dilated.toMillis
val waitTime = (30 seconds).dilated.toMillis
val boss = system.actorOf(Props(new Actor {
def receive = {
case "run" for (_ 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage
@ -369,7 +369,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
val buddies = dispatcher.buddies
val mq = dispatcher.messageQueue
System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhab)
System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants)
buddies.toArray sorted new Ordering[AnyRef] {
def compare(l: AnyRef, r: AnyRef) = (l, r) match {
case (ll: ActorCell, rr: ActorCell) ll.self.path.toString.compareTo(rr.self.path.toString)

View file

@ -26,6 +26,7 @@ object ResizerSpec {
}
bal-disp {
type = BalancingDispatcher
buddy-wakeup-threshold = 1
}
"""

View file

@ -290,6 +290,11 @@ private[akka] class ActorCell(
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self))
/*
* attach before submitting the mailbox for the first time, because
* otherwise the actor could already be dead before the dispatcher is
* informed of its existence (with reversed attach/detach sequence).
*/
dispatcher.attach(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅

View file

@ -260,7 +260,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
mailBox.cleanUp()
}
def inhab = inhabitantsUpdater.get(this)
def inhabitants: Long = inhabitantsUpdater.get(this)
private val shutdownAction = new Runnable {
@tailrec

View file

@ -11,6 +11,8 @@ import annotation.tailrec
import java.util.concurrent.atomic.AtomicBoolean
import akka.util.{ Duration, Helpers }
import java.util.{ Comparator, Iterator }
import akka.util.Unsafe
import java.util.concurrent.atomic.AtomicLong
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -43,18 +45,46 @@ class BalancingDispatcher(
}))
val messageQueue: MessageQueue = mailboxType match {
case _: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
}
case BoundedMailbox(cap, timeout) new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](cap)
final val pushTimeOut = timeout
}
case UnboundedMailbox()
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
override def enqueue(receiver: ActorRef, handle: Envelope) = {
super.enqueue(receiver, handle)
_pressure.getAndIncrement()
}
override def dequeue(): Envelope =
super.dequeue() match {
case null null
case x _pressure.getAndDecrement(); x
}
}
case BoundedMailbox(cap, timeout)
new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](cap)
final val pushTimeOut = timeout
override def enqueue(receiver: ActorRef, handle: Envelope) = {
super.enqueue(receiver, handle)
_pressure.getAndIncrement()
}
override def dequeue(): Envelope =
super.dequeue() match {
case null null
case x _pressure.getAndDecrement(); x
}
}
case other throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]")
}
protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor)
private val _pressure = new AtomicLong
class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle)
@ -81,6 +111,11 @@ class BalancingDispatcher(
}
protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit =
/*
* need to filter out Create() messages here because BalancingDispatcher
* already enqueues this within register(), which is called first by the
* ActorCell.
*/
invocation match {
case Create()
case x super.systemDispatch(receiver, invocation)
@ -91,13 +126,13 @@ class BalancingDispatcher(
mbox.systemEnqueue(actor.self, Create())
// must make sure that Create() is the first message enqueued in this mailbox
super.register(actor)
assert(buddies.add(actor))
buddies.add(actor)
// must make sure that buddy-add is executed before the actor has had a chance to die
registerForExecution(mbox, false, true)
}
protected[akka] override def unregister(actor: ActorCell) = {
assert(buddies.remove(actor))
buddies.remove(actor)
super.unregister(actor)
if (messageQueue.hasMessages) registerOne()
}
@ -106,7 +141,7 @@ class BalancingDispatcher(
messageQueue.enqueue(receiver.self, invocation)
if (!registerForExecution(receiver.mailbox, false, false) &&
buddyWakeupThreshold >= 0 &&
messageQueue.numberOfMessages >= buddyWakeupThreshold) registerOne()
_pressure.get >= buddyWakeupThreshold) registerOne()
}
@tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit =

View file

@ -239,15 +239,26 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
}
trait MessageQueue {
/*
* These method need to be implemented in subclasses; they should not rely on the internal stuff above.
/**
* Try to enqueue the message to this queue, or throw an exception.
*/
def enqueue(receiver: ActorRef, handle: Envelope)
def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed
/**
* Try to dequeue the next message from this queue, return null failing that.
*/
def dequeue(): Envelope
/**
* Should return the current number of messages held in this queue; may
* always return 0 if no other value is available efficiently. Do not use
* this for testing for presence of messages, use `hasMessages` instead.
*/
def numberOfMessages: Int
/**
* Indicates whether this queue is non-empty.
*/
def hasMessages: Boolean
}
@ -295,15 +306,15 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
}
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
final def dequeue(): Envelope = queue.poll()
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def dequeue(): Envelope = queue.poll()
}
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingQueue[Envelope]
final def enqueue(receiver: ActorRef, handle: Envelope) {
def enqueue(receiver: ActorRef, handle: Envelope) {
if (pushTimeOut.length > 0) {
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
@ -311,13 +322,13 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
} else queue put handle
}
final def dequeue(): Envelope = queue.poll()
def dequeue(): Envelope = queue.poll()
}
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
final def numberOfMessages = queue.size
final def hasMessages = !queue.isEmpty
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
}
/**