Moving in isEmpty and count into AbstractNodeQueue

This commit is contained in:
Viktor Klang 2013-04-03 20:05:20 +02:00
parent f7e0cdad9f
commit 68dfada8bc
3 changed files with 20 additions and 13 deletions

View file

@ -31,6 +31,17 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
getAndSet(n).setNext(n); getAndSet(n).setNext(n);
} }
public final boolean isEmpty() {
return peek() == null;
}
public final int count() {
int count = 0;
for(Node<T> n = peek();n != null; n = n.next())
++count;
return count;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final T poll() { public final T poll() {
final Node<T> next = peek(); final Node<T> next = peek();

View file

@ -353,12 +353,9 @@ class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue {
final def dequeue(): Envelope = poll() final def dequeue(): Envelope = poll()
final def numberOfMessages: Int = { final def numberOfMessages: Int = count()
@tailrec def count(node: AbstractNodeQueue.Node[Envelope], c: Int): Int = if (node eq null) c else count(node.next(), c + 1)
count(peek(), 0)
}
final def hasMessages: Boolean = peek() ne null final def hasMessages: Boolean = !isEmpty()
@tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { @tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
val envelope = dequeue() val envelope = dequeue()
@ -535,11 +532,11 @@ case class UnboundedMailbox() extends MailboxType {
} }
/** /**
* MPSCUnboundedMailbox is a high-performance unbounded MailboxType, * SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producersingle consumer, unbounded MailboxType,
* the only drawback is that you can't have multiple consumers, * the only drawback is that you can't have multiple consumers,
* which rules out using it with BalancingDispatcher for instance. * which rules out using it with BalancingDispatcher for instance.
*/ */
case class MPSCUnboundedMailbox() extends MailboxType { case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this() def this(settings: ActorSystem.Settings, config: Config) = this()

View file

@ -76,12 +76,11 @@ private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)
override final def execute(task: Runnable): Unit = try add(task) finally attach() override final def execute(task: Runnable): Unit = try add(task) finally attach()
override final def reportFailure(t: Throwable): Unit = context reportFailure t override final def reportFailure(t: Throwable): Unit = context reportFailure t
final def size(): Int = { /**
@tailrec def count(node: AbstractNodeQueue.Node[Runnable], c: Int): Int = if (node eq null) c else count(node.next(), c + 1) * O(N)
count(peek(), 0) * @return the number of Runnable's currently enqueued
} */
final def size(): Int = count()
final def isEmpty(): Boolean = peek() eq null
override final def toString: String = (state.get: @switch) match { override final def toString: String = (state.get: @switch) match {
case 0 "Off" case 0 "Off"