From f9d3789bfca6eb08fb2dcf9cc7b1dc1643e099e6 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 15 Mar 2016 11:41:29 +0100 Subject: [PATCH] fix endless loop race condition in NodeQueue, #19949 - also fixed some forgotten copy-pasta between peekNode and pollNode - also added JavaDoc to all methods, explaining which can be used from what thread - did not fix the JDK8 improvement of using Unsafe instead of inheriting from AtomicReference since that inheritance is not a bad thing, actually --- .../java/akka/dispatch/AbstractNodeQueue.java | 92 ++++++++++++++++--- .../main/scala/akka/dispatch/Mailbox.scala | 49 +++++----- .../akka/dispatch/NodeQueueBenchmark.scala | 72 +++++++++++++++ akka-docs/rst/java/mailboxes.rst | 4 +- akka-docs/rst/scala/mailboxes.rst | 4 +- 5 files changed, 182 insertions(+), 39 deletions(-) create mode 100644 akka-bench-jmh/src/main/scala/akka/dispatch/NodeQueueBenchmark.scala diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java index 7f8dee1d72..24f2228438 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java @@ -11,11 +11,20 @@ import java.util.concurrent.atomic.AtomicReference; /** * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue: * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + * + * This queue could be wait-free (i.e. without the spinning loops in peekNode and pollNode) if + * it were permitted to return null while the queue is not quite empty anymore but the enqueued + * element is not yet visible. This would break actor scheduling, though. */ @SuppressWarnings("serial") public abstract class AbstractNodeQueue extends AtomicReference> { - // Extends AtomicReference for the "head" slot (which is the one that is appended to) since - // Unsafe does not expose XCHG operation intrinsically before JDK 8 + + /* + * Extends AtomicReference for the "head" slot (which is the one that is appended to) since + * there is nothing to be gained by going to all-out Unsafe usage—we’d have to do + * cache-line padding ourselves. + */ + @SuppressWarnings("unused") private volatile Node _tailDoNotCallMeDirectly; @@ -25,10 +34,14 @@ public abstract class AbstractNodeQueue extends AtomicReference peekNode() { @@ -43,37 +56,83 @@ public abstract class AbstractNodeQueue extends AtomicReference n = peekNode(); return (n != null) ? n.value : null; } + /** + * Add an element to the head of the queue. + * + * This method can be used from any thread. + * + * @param value the element to be added; must not be null + */ public final void add(final T value) { final Node n = new Node(value); getAndSet(n).setNext(n); } + /** + * Add an element to the head of the queue, providing the queue node to be used. + * + * This method can be used from any thread. + * + * @param n the node containing the element to be added; both must not be null + */ public final void addNode(final Node n) { n.setNext(null); getAndSet(n).setNext(n); } + /** + * Query the queue whether it is empty right now. + * + * This method can be used from any thread. + * + * @return true if queue was empty at some point in the past + */ public final boolean isEmpty() { return Unsafe.instance.getObjectVolatile(this, tailOffset) == get(); } + /** + * This method returns an upper bound on the queue size at the time it + * starts executing. It may spuriously return smaller values (including + * zero) if the consumer pulls items out concurrently. + * + * This method can be used from any thread. + * + * @return an upper bound on queue length at some time in the past + */ + @SuppressWarnings("unchecked") public final int count() { int count = 0; - for(Node n = peekNode();n != null && count < Integer.MAX_VALUE; n = n.next()) + final Node head = get(); + for(Node n = ((Node) Unsafe.instance.getObjectVolatile(this, tailOffset)).next(); + n != null && count < Integer.MAX_VALUE; + n = n.next()) { ++count; + // only iterate up to the point where head was when starting: this is a moving queue! + if (n == head) break; + } return count; } - /* + /** + * Pull one item from the queue’s tail if there is one. + * * Use this method only from the consumer thread! + * + * @return element if there was one, or null if there was none */ public final T poll() { final Node next = pollNode(); @@ -85,18 +144,23 @@ public abstract class AbstractNodeQueue extends AtomicReference pollNode() { - Node tail; - Node next; - for(;;) { - tail = ((Node)Unsafe.instance.getObjectVolatile(this, tailOffset)); - next = tail.next(); - if (next != null || get() == tail) - break; + final Node tail = (Node) Unsafe.instance.getObjectVolatile(this, tailOffset); + Node next = tail.next(); + if (next == null && get() != tail) { + // if tail != head this is not going to change until producer makes progress + // we can avoid reading the head and just spin on next until it shows up + do { + next = tail.next(); + } while (next == null); } if (next == null) return null; else { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 081a34a0c8..0e53f4f120 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -54,7 +54,7 @@ private[akka] object Mailbox { * INTERNAL API */ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) - extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { + extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { import Mailbox._ @@ -391,7 +391,7 @@ class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue wit * Discards overflowing messages into DeadLetters. */ class BoundedNodeMessageQueue(capacity: Int) extends AbstractBoundedNodeQueue[Envelope](capacity) - with MessageQueue with BoundedMessageQueueSemantics with MultipleConsumerSemantics { + with MessageQueue with BoundedMessageQueueSemantics with MultipleConsumerSemantics { final def pushTimeOut: Duration = Duration.Undefined final def enqueue(receiver: ActorRef, handle: Envelope): Unit = @@ -619,8 +619,11 @@ object UnboundedMailbox { /** * SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producer—single consumer, unbounded MailboxType, - * the only drawback is that you can't have multiple consumers, + * with the drawback that you can't have multiple consumers, * which rules out using it with BalancingPool (BalancingDispatcher) for instance. + * + * Currently this queue is slower for some benchmarks than the ConcurrentLinkedQueue from JDK 8 that is used by default, + * so be sure to measure the performance in your particular setting in order to determine which one to use. */ final case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with ProducesMessageQueue[NodeMessageQueue] { @@ -651,8 +654,8 @@ case class NonBlockingBoundedMailbox(val capacity: Int) extends MailboxType with * BoundedMailbox is the default bounded MailboxType used by Akka Actors. */ final case class BoundedMailbox(val capacity: Int, override val pushTimeOut: FiniteDuration) - extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] - with ProducesPushTimeoutSemanticsMailbox { + extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), config.getNanosDuration("mailbox-push-timeout-time")) @@ -666,7 +669,7 @@ final case class BoundedMailbox(val capacity: Int, override val pushTimeOut: Fin object BoundedMailbox { class MessageQueue(capacity: Int, final val pushTimeOut: FiniteDuration) - extends LinkedBlockingQueue[Envelope](capacity) with BoundedQueueBasedMessageQueue { + extends LinkedBlockingQueue[Envelope](capacity) with BoundedQueueBasedMessageQueue { final def queue: BlockingQueue[Envelope] = this } } @@ -676,7 +679,7 @@ object BoundedMailbox { * Extend this class and provide the Comparator in the constructor. */ class UnboundedPriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int) - extends MailboxType with ProducesMessageQueue[UnboundedPriorityMailbox.MessageQueue] { + extends MailboxType with ProducesMessageQueue[UnboundedPriorityMailbox.MessageQueue] { def this(cmp: Comparator[Envelope]) = this(cmp, 11) final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new UnboundedPriorityMailbox.MessageQueue(initialCapacity, cmp) @@ -684,7 +687,7 @@ class UnboundedPriorityMailbox(val cmp: Comparator[Envelope], val initialCapacit object UnboundedPriorityMailbox { class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope]) - extends PriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue { + extends PriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue { final def queue: Queue[Envelope] = this } } @@ -694,8 +697,8 @@ object UnboundedPriorityMailbox { * Extend this class and provide the Comparator in the constructor. */ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration) - extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] - with ProducesPushTimeoutSemanticsMailbox { + extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") @@ -706,8 +709,8 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap object BoundedPriorityMailbox { class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration) - extends BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) - with BoundedQueueBasedMessageQueue { + extends BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) + with BoundedQueueBasedMessageQueue { final def queue: BlockingQueue[Envelope] = this } } @@ -718,7 +721,7 @@ object BoundedPriorityMailbox { * Extend this class and provide the Comparator in the constructor. */ class UnboundedStablePriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int) - extends MailboxType with ProducesMessageQueue[UnboundedStablePriorityMailbox.MessageQueue] { + extends MailboxType with ProducesMessageQueue[UnboundedStablePriorityMailbox.MessageQueue] { def this(cmp: Comparator[Envelope]) = this(cmp, 11) final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new UnboundedStablePriorityMailbox.MessageQueue(initialCapacity, cmp) @@ -726,7 +729,7 @@ class UnboundedStablePriorityMailbox(val cmp: Comparator[Envelope], val initialC object UnboundedStablePriorityMailbox { class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope]) - extends StablePriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue { + extends StablePriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue { final def queue: Queue[Envelope] = this } } @@ -737,8 +740,8 @@ object UnboundedStablePriorityMailbox { * Extend this class and provide the Comparator in the constructor. */ class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration) - extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] - with ProducesPushTimeoutSemanticsMailbox { + extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") @@ -749,8 +752,8 @@ class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final v object BoundedStablePriorityMailbox { class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration) - extends BoundedBlockingQueue[Envelope](capacity, new StablePriorityQueue[Envelope](11, cmp)) - with BoundedQueueBasedMessageQueue { + extends BoundedBlockingQueue[Envelope](capacity, new StablePriorityQueue[Envelope](11, cmp)) + with BoundedQueueBasedMessageQueue { final def queue: BlockingQueue[Envelope] = this } } @@ -776,8 +779,8 @@ object UnboundedDequeBasedMailbox { * BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque. */ case class BoundedDequeBasedMailbox( final val capacity: Int, override final val pushTimeOut: FiniteDuration) - extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] - with ProducesPushTimeoutSemanticsMailbox { + extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), config.getNanosDuration("mailbox-push-timeout-time")) @@ -791,7 +794,7 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, override final val object BoundedDequeBasedMailbox { class MessageQueue(capacity: Int, val pushTimeOut: FiniteDuration) - extends LinkedBlockingDeque[Envelope](capacity) with BoundedDequeBasedMessageQueue { + extends LinkedBlockingDeque[Envelope](capacity) with BoundedDequeBasedMessageQueue { final val queue = this } } @@ -853,8 +856,8 @@ object UnboundedControlAwareMailbox { * to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority. */ final case class BoundedControlAwareMailbox(capacity: Int, override final val pushTimeOut: FiniteDuration) extends MailboxType - with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] - with ProducesPushTimeoutSemanticsMailbox { + with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), config.getNanosDuration("mailbox-push-timeout-time")) diff --git a/akka-bench-jmh/src/main/scala/akka/dispatch/NodeQueueBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/dispatch/NodeQueueBenchmark.scala new file mode 100644 index 0000000000..0ca7640522 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/dispatch/NodeQueueBenchmark.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.dispatch + +import akka.actor._ +import org.openjdk.jmh.annotations._ +import com.typesafe.config.ConfigFactory +import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.testkit.TestProbe + +object NodeQueueBenchmark { + final val burst = 100000 + case object Stop +} + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(2) +@Warmup(iterations = 5) +@Measurement(iterations = 10) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +class NodeQueueBenchmark { + import NodeQueueBenchmark._ + + val config = ConfigFactory.parseString(""" +dispatcher { + executor = "thread-pool-executor" + throughput = 1000 + thread-pool-executor { + fixed-pool-size = 1 + } +} +mailbox { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + mailbox-capacity = 1000000 +} +""").withFallback(ConfigFactory.load()) + implicit val sys = ActorSystem("ANQ", config) + val ref = sys.actorOf(Props(new Actor { + def receive = { + case Stop => sender() ! Stop + case _ => + } + }).withDispatcher("dispatcher").withMailbox("mailbox"), "receiver") + + @TearDown + def teardown(): Unit = Await.result(sys.terminate(), 5.seconds) + + @TearDown(Level.Invocation) + def waitInBetween(): Unit = { + val probe = TestProbe() + probe.send(ref, Stop) + probe.expectMsg(Stop) + System.gc() + System.gc() + System.gc() + } + + @Benchmark + @OperationsPerInvocation(burst) + def send(): Unit = { + var todo = burst + while (todo > 0) { + ref ! "hello" + todo -= 1 + } + } + +} diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index 77e3af37ff..aaf257610d 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -124,7 +124,9 @@ Akka comes shipped with a number of mailbox implementations: * **SingleConsumerOnlyUnboundedMailbox** - - Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` + This queue may or may not be faster than the default one depending on your use-case—be sure to benchmark properly! + + - Backed by a Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` - Blocking: No diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index 5c780c67ea..dd09d53a7e 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -124,7 +124,9 @@ Akka comes shipped with a number of mailbox implementations: * **SingleConsumerOnlyUnboundedMailbox** - - Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` + This queue may or may not be faster than the default one depending on your use-case—be sure to benchmark properly! + + - Backed by a Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` - Blocking: No