diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index a958e15751..89158e1574 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -43,6 +43,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn "create a bounded mailbox with 10 capacity and with push timeout" in { val config = BoundedMailbox(10, 10 milliseconds) + config.capacity should be(10) val q = factory(config) ensureInitialMailboxState(config, q) @@ -240,8 +241,8 @@ class SingleConsumerOnlyMailboxSpec extends MailboxSpec { lazy val name = "The single-consumer-only mailbox implementation" override def maxConsumers = 1 def factory = { - case u: UnboundedMailbox ⇒ SingleConsumerOnlyUnboundedMailbox().create(None, None) - case b: BoundedMailbox ⇒ pending; null + case u: UnboundedMailbox ⇒ SingleConsumerOnlyUnboundedMailbox().create(None, None) + case b @ BoundedMailbox(capacity, _) ⇒ NonBlockingBoundedMailbox(capacity).create(None, None) } } @@ -249,37 +250,51 @@ object SingleConsumerOnlyMailboxVerificationSpec { case object Ping val mailboxConf = ConfigFactory.parseString(""" akka.actor.serialize-messages = off - test-dispatcher { + test-unbounded-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" throughput = 1 + } + test-bounded-dispatcher { + mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox" + mailbox-capacity = 1 + throughput = 1 }""") } class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerOnlyMailboxVerificationSpec.mailboxConf) { import SingleConsumerOnlyMailboxVerificationSpec.Ping + + def pathologicalPingPong(dispatcherId: String): Unit = { + val total = 2000000 + val runner = system.actorOf(Props(new Actor { + val a, b = context.watch( + context.actorOf(Props(new Actor { + var n = total / 2 + def receive = { + case Ping ⇒ + n -= 1 + sender() ! Ping + if (n == 0) + context stop self + } + }).withDispatcher(dispatcherId))) + def receive = { + case Ping ⇒ a.tell(Ping, b) + case Terminated(`a` | `b`) ⇒ if (context.children.isEmpty) context stop self + } + })) + watch(runner) + runner ! Ping + expectTerminated(runner) + } + "A SingleConsumerOnlyMailbox" should { - "support pathological ping-ponging" in within(30.seconds) { - val total = 2000000 - val runner = system.actorOf(Props(new Actor { - val a, b = context.watch( - context.actorOf(Props(new Actor { - var n = total / 2 - def receive = { - case Ping ⇒ - n -= 1 - sender() ! Ping - if (n == 0) - context stop self - } - }).withDispatcher("test-dispatcher"))) - def receive = { - case Ping ⇒ a.tell(Ping, b) - case Terminated(`a` | `b`) ⇒ if (context.children.isEmpty) context stop self - } - })) - watch(runner) - runner ! Ping - expectTerminated(runner) + "support pathological ping-ponging for the unbounded case" in within(30.seconds) { + pathologicalPingPong("test-unbounded-dispatcher") + } + + "support pathological ping-ponging for the bounded case" in within(30.seconds) { + pathologicalPingPong("test-bounded-dispatcher") } } } diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractBoundedNodeQueue.java b/akka-actor/src/main/java/akka/dispatch/AbstractBoundedNodeQueue.java new file mode 100644 index 0000000000..65173c0027 --- /dev/null +++ b/akka-actor/src/main/java/akka/dispatch/AbstractBoundedNodeQueue.java @@ -0,0 +1,215 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.dispatch; + +import akka.util.Unsafe; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Lock-free bounded non-blocking multiple-producer multiple-consumer queue based on the works of: + * + * Andriy Plokhotnuyk (https://github.com/plokhotnyuk) + * - https://github.com/plokhotnyuk/actors/blob/2e65abb7ce4cbfcb1b29c98ee99303d6ced6b01f/src/test/scala/akka/dispatch/Mailboxes.scala + * (Apache V2: https://github.com/plokhotnyuk/actors/blob/master/LICENSE) + * + * Dmitriy Vyukov's non-intrusive MPSC queue: + * - http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + * (Simplified BSD) + */ +@SuppressWarnings("serial") +public abstract class AbstractBoundedNodeQueue { + private final int capacity; + + @SuppressWarnings("unused") + private volatile Node _enqDoNotCallMeDirectly; + + @SuppressWarnings("unused") + private volatile Node _deqDoNotCallMeDirectly; + + protected AbstractBoundedNodeQueue(final int capacity) { + if (capacity < 0) throw new IllegalArgumentException("AbstractBoundedNodeQueue.capacity must be >= 0"); + this.capacity = capacity; + final Node n = new Node(); + setDeq(n); + setEnq(n); + } + + private final void setEnq(Node n) { + Unsafe.instance.putObjectVolatile(this, enqOffset, n); + } + + @SuppressWarnings("unchecked") + private final Node getEnq() { + return (Node)Unsafe.instance.getObjectVolatile(this, enqOffset); + } + + private final boolean casEnq(Node old, Node nju) { + return Unsafe.instance.compareAndSwapObject(this, enqOffset, old, nju); + } + + private final void setDeq(Node n) { + Unsafe.instance.putObjectVolatile(this, deqOffset, n); + } + + @SuppressWarnings("unchecked") + private final Node getDeq() { + return (Node)Unsafe.instance.getObjectVolatile(this, deqOffset); + } + + private final boolean casDeq(Node old, Node nju) { + return Unsafe.instance.compareAndSwapObject(this, deqOffset, old, nju); + } + + @SuppressWarnings("unchecked") + protected final Node peekNode() { + for(;;) { + final Node deq = getDeq(); + final Node next = deq.next(); + if (next != null || getEnq() == deq) + return next; + } + } + + /** + * + * @return the first value of this queue, null if empty + */ + public final T peek() { + final Node n = peekNode(); + return (n != null) ? n.value : null; + } + + /** + * @return the maximum capacity of this queue + */ + public final int capacity() { + return capacity; + } + // Possible TODO — impl. could be switched to addNode(new Node(value)) if we want to allocate even if full already + public final boolean add(final T value) { + for(Node n = null;;) { + final Node lastNode = getEnq(); + final int lastNodeCount = lastNode.count; + if (lastNodeCount - getDeq().count < capacity) { + // Trade a branch for avoiding to create a new node if full, + // and to avoid creating multiple nodes on write conflict á la Be Kind to Your GC + if (n == null) { + n = new Node(); + n.value = value; + } + + n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq() + + // Try to append the node to the end, if we fail we continue loopin' + if(casEnq(lastNode, n)) { + lastNode.setNext(n); + return true; + } + } else return false; // Over capacity—couldn't add the node + } + } + + public final boolean addNode(final Node n) { + n.setNext(null); // Make sure we're not corrupting the queue + for(;;) { + final Node lastNode = getEnq(); + final int lastNodeCount = lastNode.count; + if (lastNodeCount - getDeq().count < capacity) { + n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq() + // Try to append the node to the end, if we fail we continue loopin' + if(casEnq(lastNode, n)) { + lastNode.setNext(n); + return true; + } + } else return false; // Over capacity—couldn't add the node + } + } + + public final boolean isEmpty() { + return peekNode() == null; + } + + /** + * Returns an approximation of the queue's "current" size + */ + public final int size() { + //Order of operations is extremely important here + // If no item was dequeued between when we looked at the count of the enqueueing end, + // there should be no out-of-bounds + for(;;) { + final int deqCountBefore = getDeq().count; + final int enqCount = getEnq().count; + final int deqCountAfter = getDeq().count; + + if (deqCountAfter == deqCountBefore) + return enqCount - deqCountAfter; + } + } + + /** + * Removes the first element of this queue if any + * @return the value of the first element of the queue, null if empty + */ + public final T poll() { + final Node n = pollNode(); + return (n != null) ? n.value : null; + } + + /** + * Removes the first element of this queue if any + * @return the `Node` of the first element of the queue, null if empty + */ + public final Node pollNode() { + for(;;) { + final Node deq = getDeq(); + final Node next = deq.next(); + if (next != null) { + if (casDeq(deq, next)) { + deq.value = next.value; + next.value = null; + return deq; + } // else we retry (concurrent consumers) + } else if (getEnq() == deq) return null; // If we got a null and head meets tail, we are empty + } + } + + private final static long enqOffset, deqOffset; + + static { + try { + enqOffset = Unsafe.instance.objectFieldOffset(AbstractBoundedNodeQueue.class.getDeclaredField("_enqDoNotCallMeDirectly")); + deqOffset = Unsafe.instance.objectFieldOffset(AbstractBoundedNodeQueue.class.getDeclaredField("_deqDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } + + public static class Node { + protected T value; + @SuppressWarnings("unused") + private volatile Node _nextDoNotCallMeDirectly; + protected int count; + + @SuppressWarnings("unchecked") + public final Node next() { + return (Node)Unsafe.instance.getObjectVolatile(this, nextOffset); + } + + protected final void setNext(final Node newNext) { + Unsafe.instance.putOrderedObject(this, nextOffset, newNext); + } + + private final static long nextOffset; + + static { + try { + nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index d1dfb1629e..ff4e0fffff 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -370,6 +370,31 @@ class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue wit } } +//Discards overflowing messages into DeadLetters +class BoundedNodeMessageQueue(capacity: Int) extends AbstractBoundedNodeQueue[Envelope](capacity) +with MessageQueue with BoundedMessageQueueSemantics with MultipleConsumerSemantics { + final def pushTimeOut: Duration = Duration.Undefined + + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = + if (!add(handle)) + receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( + DeadLetter(handle.message, handle.sender, receiver), handle.sender) + + final def dequeue(): Envelope = poll() + + final def numberOfMessages: Int = size() + + final def hasMessages: Boolean = !isEmpty() + + @tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { + val envelope = dequeue() + if (envelope ne null) { + deadLetters.enqueue(owner, envelope) + cleanUp(owner, deadLetters) + } + } +} + /** * INTERNAL API */ @@ -576,6 +601,22 @@ final case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with P final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue } +/** + * NonBlockingBoundedMailbox is a high-performance, multiple producer—multiple consumer, bounded MailboxType, + * Noteworthy is that it discards overflow as DeadLetters. + * + * NOTE: NonBlockingBoundedMailbox does not use `mailbox-push-timeout-time` as it is non-blocking. + */ +case class NonBlockingBoundedMailbox(val capacity: Int) extends MailboxType with ProducesMessageQueue[BoundedNodeMessageQueue] { + + def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity")) + + if (capacity < 0) throw new IllegalArgumentException("The capacity for NonBlockingBoundedMailbox can not be negative") + + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = + new BoundedNodeMessageQueue(capacity) +} + /** * BoundedMailbox is the default bounded MailboxType used by Akka Actors. */ diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index fb0dc5ed33..60d18f5b7f 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -131,6 +131,16 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" +* NonBlockingBoundedMailbox + + - Backed by a very efficient MultiPle-Producer Multiple-Consumer queue + + - Blocking: No + + - Bounded: Yes + + - Configuration name: "akka.dispatch.NonBlockingBoundedMailbox" + * BoundedMailbox - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index 608c41ce82..3c85f4ad1e 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -123,7 +123,7 @@ Akka comes shipped with a number of mailbox implementations: * SingleConsumerOnlyUnboundedMailbox - - Backed by a very efficient Multiple Producer Single Consumer queue, cannot be used with BalancingDispatcher + - Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with BalancingDispatcher - Blocking: No @@ -141,6 +141,16 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "bounded" or "akka.dispatch.BoundedMailbox" +* NonBlockingBoundedMailbox + + - Backed by a very efficient MultiPle-Producer Multiple-Consumer queue + + - Blocking: No + + - Bounded: Yes + + - Configuration name: "akka.dispatch.NonBlockingBoundedMailbox" + * UnboundedPriorityMailbox - Backed by a ``java.util.concurrent.PriorityBlockingQueue``