fix endless loop race condition in NodeQueue, #19949

- also fixed some forgotten copy-pasta between peekNode and pollNode
- also added JavaDoc to all methods, explaining which can be used from
  what thread
- did not fix the JDK8 improvement of using Unsafe instead of inheriting
  from AtomicReference since that inheritance is not a bad thing,
  actually
This commit is contained in:
Roland Kuhn 2016-03-15 11:41:29 +01:00
parent ad8ab128c4
commit f9d3789bfc
5 changed files with 182 additions and 39 deletions

View file

@ -11,11 +11,20 @@ import java.util.concurrent.atomic.AtomicReference;
/** /**
* Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue: * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue:
* http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
*
* This queue could be wait-free (i.e. without the spinning loops in peekNode and pollNode) if
* it were permitted to return null while the queue is not quite empty anymore but the enqueued
* element is not yet visible. This would break actor scheduling, though.
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> { public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
// Extends AtomicReference for the "head" slot (which is the one that is appended to) since
// Unsafe does not expose XCHG operation intrinsically before JDK 8 /*
* Extends AtomicReference for the "head" slot (which is the one that is appended to) since
* there is nothing to be gained by going to all-out Unsafe usagewed have to do
* cache-line padding ourselves.
*/
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile Node<T> _tailDoNotCallMeDirectly; private volatile Node<T> _tailDoNotCallMeDirectly;
@ -25,10 +34,14 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
set(n); set(n);
} }
/* /**
* Query the queue tail for the next element without dequeuing it.
*
* Use this method only from the consumer thread! * Use this method only from the consumer thread!
* *
* !!! There is a copy of this code in pollNode() !!! * !!! There is a copy of this code in pollNode() !!!
*
* @return queue node with element inside if there was one, or null if there was none
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected final Node<T> peekNode() { protected final Node<T> peekNode() {
@ -43,37 +56,83 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
} }
return next; return next;
} }
/*
/**
* Query the queue tail for the next element without dequeuing it.
*
* Use this method only from the consumer thread! * Use this method only from the consumer thread!
*
* @return element if there was one, or null if there was none
*/ */
public final T peek() { public final T peek() {
final Node<T> n = peekNode(); final Node<T> n = peekNode();
return (n != null) ? n.value : null; return (n != null) ? n.value : null;
} }
/**
* Add an element to the head of the queue.
*
* This method can be used from any thread.
*
* @param value the element to be added; must not be null
*/
public final void add(final T value) { public final void add(final T value) {
final Node<T> n = new Node<T>(value); final Node<T> n = new Node<T>(value);
getAndSet(n).setNext(n); getAndSet(n).setNext(n);
} }
/**
* Add an element to the head of the queue, providing the queue node to be used.
*
* This method can be used from any thread.
*
* @param n the node containing the element to be added; both must not be null
*/
public final void addNode(final Node<T> n) { public final void addNode(final Node<T> n) {
n.setNext(null); n.setNext(null);
getAndSet(n).setNext(n); getAndSet(n).setNext(n);
} }
/**
* Query the queue whether it is empty right now.
*
* This method can be used from any thread.
*
* @return true if queue was empty at some point in the past
*/
public final boolean isEmpty() { public final boolean isEmpty() {
return Unsafe.instance.getObjectVolatile(this, tailOffset) == get(); return Unsafe.instance.getObjectVolatile(this, tailOffset) == get();
} }
/**
* This method returns an upper bound on the queue size at the time it
* starts executing. It may spuriously return smaller values (including
* zero) if the consumer pulls items out concurrently.
*
* This method can be used from any thread.
*
* @return an upper bound on queue length at some time in the past
*/
@SuppressWarnings("unchecked")
public final int count() { public final int count() {
int count = 0; int count = 0;
for(Node<T> n = peekNode();n != null && count < Integer.MAX_VALUE; n = n.next()) final Node<T> head = get();
for(Node<T> n = ((Node<T>) Unsafe.instance.getObjectVolatile(this, tailOffset)).next();
n != null && count < Integer.MAX_VALUE;
n = n.next()) {
++count; ++count;
// only iterate up to the point where head was when starting: this is a moving queue!
if (n == head) break;
}
return count; return count;
} }
/* /**
* Pull one item from the queues tail if there is one.
*
* Use this method only from the consumer thread! * Use this method only from the consumer thread!
*
* @return element if there was one, or null if there was none
*/ */
public final T poll() { public final T poll() {
final Node<T> next = pollNode(); final Node<T> next = pollNode();
@ -85,18 +144,23 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
} }
} }
/* /**
* Pull one item from the queue, returning it within a queue node.
*
* Use this method only from the consumer thread! * Use this method only from the consumer thread!
*
* @return queue node with element inside if there was one, or null if there was none
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final Node<T> pollNode() { public final Node<T> pollNode() {
Node<T> tail; final Node<T> tail = (Node<T>) Unsafe.instance.getObjectVolatile(this, tailOffset);
Node<T> next; Node<T> next = tail.next();
for(;;) { if (next == null && get() != tail) {
tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset)); // if tail != head this is not going to change until producer makes progress
// we can avoid reading the head and just spin on next until it shows up
do {
next = tail.next(); next = tail.next();
if (next != null || get() == tail) } while (next == null);
break;
} }
if (next == null) return null; if (next == null) return null;
else { else {

View file

@ -619,8 +619,11 @@ object UnboundedMailbox {
/** /**
* SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producersingle consumer, unbounded MailboxType, * SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producersingle consumer, unbounded MailboxType,
* the only drawback is that you can't have multiple consumers, * with the drawback that you can't have multiple consumers,
* which rules out using it with BalancingPool (BalancingDispatcher) for instance. * which rules out using it with BalancingPool (BalancingDispatcher) for instance.
*
* Currently this queue is slower for some benchmarks than the ConcurrentLinkedQueue from JDK 8 that is used by default,
* so be sure to measure the performance in your particular setting in order to determine which one to use.
*/ */
final case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with ProducesMessageQueue[NodeMessageQueue] { final case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with ProducesMessageQueue[NodeMessageQueue] {

View file

@ -0,0 +1,72 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.dispatch
import akka.actor._
import org.openjdk.jmh.annotations._
import com.typesafe.config.ConfigFactory
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.testkit.TestProbe
object NodeQueueBenchmark {
final val burst = 100000
case object Stop
}
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
@Warmup(iterations = 5)
@Measurement(iterations = 10)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
class NodeQueueBenchmark {
import NodeQueueBenchmark._
val config = ConfigFactory.parseString("""
dispatcher {
executor = "thread-pool-executor"
throughput = 1000
thread-pool-executor {
fixed-pool-size = 1
}
}
mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
mailbox-capacity = 1000000
}
""").withFallback(ConfigFactory.load())
implicit val sys = ActorSystem("ANQ", config)
val ref = sys.actorOf(Props(new Actor {
def receive = {
case Stop => sender() ! Stop
case _ =>
}
}).withDispatcher("dispatcher").withMailbox("mailbox"), "receiver")
@TearDown
def teardown(): Unit = Await.result(sys.terminate(), 5.seconds)
@TearDown(Level.Invocation)
def waitInBetween(): Unit = {
val probe = TestProbe()
probe.send(ref, Stop)
probe.expectMsg(Stop)
System.gc()
System.gc()
System.gc()
}
@Benchmark
@OperationsPerInvocation(burst)
def send(): Unit = {
var todo = burst
while (todo > 0) {
ref ! "hello"
todo -= 1
}
}
}

View file

@ -124,7 +124,9 @@ Akka comes shipped with a number of mailbox implementations:
* **SingleConsumerOnlyUnboundedMailbox** * **SingleConsumerOnlyUnboundedMailbox**
- Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` This queue may or may not be faster than the default one depending on your use-case—be sure to benchmark properly!
- Backed by a Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher``
- Blocking: No - Blocking: No

View file

@ -124,7 +124,9 @@ Akka comes shipped with a number of mailbox implementations:
* **SingleConsumerOnlyUnboundedMailbox** * **SingleConsumerOnlyUnboundedMailbox**
- Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` This queue may or may not be faster than the default one depending on your use-case—be sure to benchmark properly!
- Backed by a Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher``
- Blocking: No - Blocking: No