diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index a79a3c9ab8..65ca5cfb6f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -19,6 +19,10 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn def factory: MailboxType ⇒ MessageQueue + def supportsBeingBounded = true + + def maxConsumers = 4 + name should { "create an unbounded mailbox" in { val config = UnboundedMailbox() @@ -122,7 +126,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn r } - val consumers = for (i ← (1 to 4).toList) yield createConsumer + val consumers = List.fill(maxConsumers)(createConsumer) val ps = producers.map(Await.result(_, within)) val cs = consumers.map(Await.result(_, within)) @@ -187,3 +191,12 @@ class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { } } } + +class SingleConsumerOnlyMailboxSpec extends MailboxSpec { + lazy val name = "The single-consumer-only mailbox implementation" + override def maxConsumers = 1 + def factory = { + case u: UnboundedMailbox ⇒ SingleConsumerOnlyUnboundedMailbox().create(None, None) + case b: BoundedMailbox ⇒ pending; null + } +} diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java new file mode 100644 index 0000000000..e72a6b2e18 --- /dev/null +++ b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.dispatch; + +import akka.util.Unsafe; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue: + * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + */ +public abstract class AbstractNodeQueue extends AtomicReference> { + // Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically + private volatile Node _tailDoNotCallMeDirectly; + + protected AbstractNodeQueue() { + final Node n = new Node(); + _tailDoNotCallMeDirectly = n; + set(n); + } + + @SuppressWarnings("unchecked") + protected final Node peekNode() { + return ((Node)Unsafe.instance.getObjectVolatile(this, tailOffset)).next(); + } + + public final T peek() { + final Node n = peekNode(); + return (n != null) ? n.value : null; + } + + public final void add(final T value) { + final Node n = new Node(value); + getAndSet(n).setNext(n); + } + + public final boolean isEmpty() { + return peek() == null; + } + + public final int count() { + int count = 0; + for(Node n = peekNode();n != null; n = n.next()) + ++count; + return count; + } + + @SuppressWarnings("unchecked") + public final T poll() { + final Node next = peekNode(); + if (next == null) return null; + else { + final T ret = next.value; + next.value = null; // Null out the value so that we can GC it early + Unsafe.instance.putOrderedObject(this, tailOffset, next); + return ret; + } + } + + private final static long tailOffset; + + static { + try { + tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } + + public static class Node { + T value; + private volatile Node _nextDoNotCallMeDirectly; + + Node() { + this(null); + } + + Node(final T value) { + this.value = value; + } + + @SuppressWarnings("unchecked") + public final Node next() { + return (Node)Unsafe.instance.getObjectVolatile(this, nextOffset); + } + + protected final void setNext(final Node newNext) { + Unsafe.instance.putOrderedObject(this, nextOffset, newNext); + } + + private final static long nextOffset; + + static { + try { + nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 022a00c465..fc665dc86b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -11,12 +11,10 @@ import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, De import akka.util.{ Unsafe, BoundedBlockingQueue } import akka.event.Logging.Error import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration import scala.annotation.tailrec import scala.util.control.NonFatal import com.typesafe.config.Config -import scala.concurrent.duration.FiniteDuration -import akka.actor.DeadLetter - /** * INTERNAL API */ @@ -194,7 +192,6 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) val s = status updateStatus(s, s & ~Scheduled) || setAsIdle() } - /* * AtomicReferenceFieldUpdater for system queue. */ @@ -350,6 +347,25 @@ trait MessageQueue { def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit } +class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue { + + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = add(handle) + + final def dequeue(): Envelope = poll() + + final def numberOfMessages: Int = count() + + final def hasMessages: Boolean = !isEmpty() + + @tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { + val envelope = dequeue() + if (envelope ne null) { + deadLetters.enqueue(owner, envelope) + cleanUp(owner, deadLetters) + } + } +} + /** * INTERNAL API */ @@ -520,6 +536,18 @@ case class UnboundedMailbox() extends MailboxType { } } +/** + * SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producer—single consumer, unbounded MailboxType, + * the only drawback is that you can't have multiple consumers, + * which rules out using it with BalancingDispatcher for instance. + */ +case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType { + + def this(settings: ActorSystem.Settings, config: Config) = this() + + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue() +} + /** * BoundedMailbox is the default bounded MailboxType used by Akka Actors. */ diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index 24fc20b824..a4f0d1f1f8 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -42,9 +42,7 @@ class BoundedBlockingQueue[E <: AnyRef]( notFull.await() require(backing.offer(e)) notEmpty.signal() - } finally { - lock.unlock() - } + } finally lock.unlock() } def take(): E = { //Blocks until not empty @@ -56,9 +54,7 @@ class BoundedBlockingQueue[E <: AnyRef]( require(e ne null) notFull.signal() e - } finally { - lock.unlock() - } + } finally lock.unlock() } def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false @@ -71,9 +67,7 @@ class BoundedBlockingQueue[E <: AnyRef]( notEmpty.signal() true } - } finally { - lock.unlock() - } + } finally lock.unlock() } def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail @@ -92,9 +86,7 @@ class BoundedBlockingQueue[E <: AnyRef]( notEmpty.signal() true } else false - } finally { - lock.unlock() - } + } finally lock.unlock() } def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail @@ -124,9 +116,7 @@ class BoundedBlockingQueue[E <: AnyRef]( } } result - } finally { - lock.unlock() - } + } finally lock.unlock() } def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null @@ -138,9 +128,7 @@ class BoundedBlockingQueue[E <: AnyRef]( notFull.signal() e } - } finally { - lock.unlock - } + } finally lock.unlock() } override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false @@ -151,55 +139,35 @@ class BoundedBlockingQueue[E <: AnyRef]( notFull.signal() true } else false - } finally { - lock.unlock() - } + } finally lock.unlock() } override def contains(e: AnyRef): Boolean = { if (e eq null) throw new NullPointerException lock.lock() - try { - backing contains e - } finally { - lock.unlock() - } + try backing.contains(e) finally lock.unlock() } override def clear() { lock.lock() - try { - backing.clear - } finally { - lock.unlock() - } + try backing.clear() finally lock.unlock() } def remainingCapacity(): Int = { lock.lock() try { maxCapacity - backing.size() - } finally { - lock.unlock() - } + } finally lock.unlock() } def size(): Int = { lock.lock() - try { - backing.size() - } finally { - lock.unlock() - } + try backing.size() finally lock.unlock() } def peek(): E = { lock.lock() - try { - backing.peek() - } finally { - lock.unlock() - } + try backing.peek() finally lock.unlock() } def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) @@ -219,19 +187,13 @@ class BoundedBlockingQueue[E <: AnyRef]( } } else n drainOne(0) - } finally { - lock.unlock() - } + } finally lock.unlock() } } override def containsAll(c: Collection[_]): Boolean = { lock.lock() - try { - backing containsAll c - } finally { - lock.unlock() - } + try backing.containsAll(c) finally lock.unlock() } override def removeAll(c: Collection[_]): Boolean = { @@ -243,9 +205,7 @@ class BoundedBlockingQueue[E <: AnyRef]( if (sz > 0) notEmpty.signal() true } else false - } finally { - lock.unlock() - } + } finally lock.unlock() } override def retainAll(c: Collection[_]): Boolean = { @@ -257,9 +217,7 @@ class BoundedBlockingQueue[E <: AnyRef]( if (sz > 0) notEmpty.signal() true } else false - } finally { - lock.unlock() - } + } finally lock.unlock() } def iterator(): Iterator[E] = { @@ -294,40 +252,24 @@ class BoundedBlockingQueue[E <: AnyRef]( } removeTarget() - } finally { - lock.unlock() - } + } finally lock.unlock() } } - } finally { - lock.unlock - } + } finally lock.unlock() } override def toArray(): Array[AnyRef] = { lock.lock() - try { - backing.toArray - } finally { - lock.unlock() - } + try backing.toArray finally lock.unlock() } override def isEmpty(): Boolean = { lock.lock() - try { - backing.isEmpty() - } finally { - lock.unlock() - } + try backing.isEmpty() finally lock.unlock() } override def toArray[X](a: Array[X with AnyRef]) = { lock.lock() - try { - backing.toArray[X](a) - } finally { - lock.unlock() - } + try backing.toArray[X](a) finally lock.unlock() } } diff --git a/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala index e11afab11b..b6e7f4c09f 100644 --- a/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala +++ b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext import scala.util.control.NonFatal import scala.annotation.{ tailrec, switch } +import akka.dispatch.AbstractNodeQueue private[akka] object SerializedSuspendableExecutionContext { final val Off = 0 @@ -31,7 +32,7 @@ private[akka] object SerializedSuspendableExecutionContext { * @param context the underlying context which will be used to actually execute the submitted tasks */ private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)(val context: ExecutionContext) - extends ConcurrentLinkedQueue[Runnable] with Runnable with ExecutionContext { + extends AbstractNodeQueue[Runnable] with Runnable with ExecutionContext { import SerializedSuspendableExecutionContext._ require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput") @@ -71,10 +72,16 @@ private[akka] final class SerializedSuspendableExecutionContext(throughput: Int) try run(0) finally remState(On) } - final def attach(): Unit = if (!isEmpty && state.compareAndSet(Off, On)) context execute this + final def attach(): Unit = if (!isEmpty() && state.compareAndSet(Off, On)) context execute this override final def execute(task: Runnable): Unit = try add(task) finally attach() override final def reportFailure(t: Throwable): Unit = context reportFailure t + /** + * O(N) + * @return the number of Runnable's currently enqueued + */ + final def size(): Int = count() + override final def toString: String = (state.get: @switch) match { case 0 ⇒ "Off" case 1 ⇒ "On"