/** * Copyright (C) 2009-2015 Typesafe Inc. */ package akka.dispatch; import akka.util.Unsafe; import java.util.concurrent.atomic.AtomicReference; /** * Lock-free bounded non-blocking multiple-producer multiple-consumer queue based on the works of: * * Andriy Plokhotnuyk (https://github.com/plokhotnyuk) * - https://github.com/plokhotnyuk/actors/blob/2e65abb7ce4cbfcb1b29c98ee99303d6ced6b01f/src/test/scala/akka/dispatch/Mailboxes.scala * (Apache V2: https://github.com/plokhotnyuk/actors/blob/master/LICENSE) * * Dmitriy Vyukov's non-intrusive MPSC queue: * - http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue * (Simplified BSD) */ @SuppressWarnings("serial") public abstract class AbstractBoundedNodeQueue { private final int capacity; @SuppressWarnings("unused") private volatile Node _enqDoNotCallMeDirectly; @SuppressWarnings("unused") private volatile Node _deqDoNotCallMeDirectly; protected AbstractBoundedNodeQueue(final int capacity) { if (capacity < 0) throw new IllegalArgumentException("AbstractBoundedNodeQueue.capacity must be >= 0"); this.capacity = capacity; final Node n = new Node(); setDeq(n); setEnq(n); } private final void setEnq(Node n) { Unsafe.instance.putObjectVolatile(this, enqOffset, n); } @SuppressWarnings("unchecked") private final Node getEnq() { return (Node)Unsafe.instance.getObjectVolatile(this, enqOffset); } private final boolean casEnq(Node old, Node nju) { return Unsafe.instance.compareAndSwapObject(this, enqOffset, old, nju); } private final void setDeq(Node n) { Unsafe.instance.putObjectVolatile(this, deqOffset, n); } @SuppressWarnings("unchecked") private final Node getDeq() { return (Node)Unsafe.instance.getObjectVolatile(this, deqOffset); } private final boolean casDeq(Node old, Node nju) { return Unsafe.instance.compareAndSwapObject(this, deqOffset, old, nju); } @SuppressWarnings("unchecked") protected final Node peekNode() { for(;;) { final Node deq = getDeq(); final Node next = deq.next(); if (next != null || getEnq() == deq) return next; } } /** * * @return the first value of this queue, null if empty */ public final T peek() { final Node n = peekNode(); return (n != null) ? n.value : null; } /** * @return the maximum capacity of this queue */ public final int capacity() { return capacity; } // Possible TODO — impl. could be switched to addNode(new Node(value)) if we want to allocate even if full already public final boolean add(final T value) { for(Node n = null;;) { final Node lastNode = getEnq(); final int lastNodeCount = lastNode.count; if (lastNodeCount - getDeq().count < capacity) { // Trade a branch for avoiding to create a new node if full, // and to avoid creating multiple nodes on write conflict á la Be Kind to Your GC if (n == null) { n = new Node(); n.value = value; } n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq() // Try to append the node to the end, if we fail we continue loopin' if(casEnq(lastNode, n)) { lastNode.setNext(n); return true; } } else return false; // Over capacity—couldn't add the node } } public final boolean addNode(final Node n) { n.setNext(null); // Make sure we're not corrupting the queue for(;;) { final Node lastNode = getEnq(); final int lastNodeCount = lastNode.count; if (lastNodeCount - getDeq().count < capacity) { n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq() // Try to append the node to the end, if we fail we continue loopin' if(casEnq(lastNode, n)) { lastNode.setNext(n); return true; } } else return false; // Over capacity—couldn't add the node } } public final boolean isEmpty() { return peekNode() == null; } /** * Returns an approximation of the queue's "current" size */ public final int size() { //Order of operations is extremely important here // If no item was dequeued between when we looked at the count of the enqueuing end, // there should be no out-of-bounds for(;;) { final int deqCountBefore = getDeq().count; final int enqCount = getEnq().count; final int deqCountAfter = getDeq().count; if (deqCountAfter == deqCountBefore) return enqCount - deqCountAfter; } } /** * Removes the first element of this queue if any * @return the value of the first element of the queue, null if empty */ public final T poll() { final Node n = pollNode(); return (n != null) ? n.value : null; } /** * Removes the first element of this queue if any * @return the `Node` of the first element of the queue, null if empty */ public final Node pollNode() { for(;;) { final Node deq = getDeq(); final Node next = deq.next(); if (next != null) { if (casDeq(deq, next)) { deq.value = next.value; next.value = null; return deq; } // else we retry (concurrent consumers) } else if (getEnq() == deq) return null; // If we got a null and head meets tail, we are empty } } private final static long enqOffset, deqOffset; static { try { enqOffset = Unsafe.instance.objectFieldOffset(AbstractBoundedNodeQueue.class.getDeclaredField("_enqDoNotCallMeDirectly")); deqOffset = Unsafe.instance.objectFieldOffset(AbstractBoundedNodeQueue.class.getDeclaredField("_deqDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } } public static class Node { protected T value; @SuppressWarnings("unused") private volatile Node _nextDoNotCallMeDirectly; protected int count; @SuppressWarnings("unchecked") public final Node next() { return (Node)Unsafe.instance.getObjectVolatile(this, nextOffset); } protected final void setNext(final Node newNext) { Unsafe.instance.putOrderedObject(this, nextOffset, newNext); } private final static long nextOffset; static { try { nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } } } }