Adding high-performance MPSC queue based mailbox to Akka
This commit is contained in:
parent
3f29adcee5
commit
fb2decbcda
3 changed files with 143 additions and 83 deletions
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch;
|
||||
|
||||
import akka.util.Unsafe;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue:
|
||||
* http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
|
||||
*/
|
||||
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
|
||||
// Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
|
||||
private volatile Node<T> _tailDoNotCallMeDirectly;
|
||||
|
||||
protected AbstractNodeQueue() {
|
||||
final Node<T> n = new Node<T>();
|
||||
_tailDoNotCallMeDirectly = n;
|
||||
set(n);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final Node<T> peek() {
|
||||
return ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset)).next();
|
||||
}
|
||||
|
||||
public final void add(final T value) {
|
||||
final Node<T> n = new Node<T>(value);
|
||||
getAndSet(n).setNext(n);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final T poll() {
|
||||
final Node<T> next = peek();
|
||||
if (next == null) return null;
|
||||
else {
|
||||
final T ret = next.value;
|
||||
next.value = null; // Null out the value so that we can GC it early
|
||||
Unsafe.instance.putOrderedObject(this, tailOffset, next);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
private final static long tailOffset;
|
||||
|
||||
static {
|
||||
try {
|
||||
tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly"));
|
||||
} catch(Throwable t){
|
||||
throw new ExceptionInInitializerError(t);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Node<T> {
|
||||
T value;
|
||||
private volatile Node<T> _nextDoNotCallMeDirectly;
|
||||
|
||||
Node() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
Node(final T value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final Node<T> next() {
|
||||
return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset);
|
||||
}
|
||||
|
||||
protected final void setNext(final Node<T> newNext) {
|
||||
Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
|
||||
}
|
||||
|
||||
private final static long nextOffset;
|
||||
|
||||
static {
|
||||
try {
|
||||
nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
|
||||
} catch(Throwable t){
|
||||
throw new ExceptionInInitializerError(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -10,10 +10,10 @@ import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, De
|
|||
import akka.util.{ Unsafe, BoundedBlockingQueue }
|
||||
import akka.event.Logging.Error
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.control.NonFatal
|
||||
import com.typesafe.config.Config
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -193,9 +193,6 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
|
|||
updateStatus(s, s & ~Scheduled) || setAsIdle()
|
||||
}
|
||||
|
||||
/*
|
||||
* AtomicReferenceFieldUpdater for system queue.
|
||||
*/
|
||||
protected final def systemQueueGet: SystemMessage =
|
||||
Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]
|
||||
|
||||
|
|
@ -343,6 +340,28 @@ trait MessageQueue {
|
|||
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
|
||||
}
|
||||
|
||||
class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue {
|
||||
|
||||
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = add(handle)
|
||||
|
||||
final def dequeue(): Envelope = poll()
|
||||
|
||||
final def numberOfMessages: Int = {
|
||||
@tailrec def count(node: AbstractNodeQueue.Node[Envelope], c: Int): Int = if (node eq null) c else count(node.next(), c + 1)
|
||||
count(peek(), 0)
|
||||
}
|
||||
|
||||
final def hasMessages: Boolean = peek() ne null
|
||||
|
||||
@tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
|
||||
val envelope = dequeue()
|
||||
if (envelope ne null) {
|
||||
deadLetters.enqueue(owner, envelope)
|
||||
cleanUp(owner, deadLetters)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -518,6 +537,18 @@ case class UnboundedMailbox() extends MailboxType {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* MPSCUnboundedMailbox is a high-performance unbounded MailboxType,
|
||||
* the only drawback is that you can't have multiple consumers,
|
||||
* which rules out using it with BalancingDispatcher for instance.
|
||||
*/
|
||||
case class MPSCUnboundedMailbox() extends MailboxType {
|
||||
|
||||
def this(settings: ActorSystem.Settings, config: Config) = this()
|
||||
|
||||
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue()
|
||||
}
|
||||
|
||||
/**
|
||||
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -42,9 +42,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
notFull.await()
|
||||
require(backing.offer(e))
|
||||
notEmpty.signal()
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def take(): E = { //Blocks until not empty
|
||||
|
|
@ -56,9 +54,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
require(e ne null)
|
||||
notFull.signal()
|
||||
e
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false
|
||||
|
|
@ -71,9 +67,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
notEmpty.signal()
|
||||
true
|
||||
}
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail
|
||||
|
|
@ -92,9 +86,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
notEmpty.signal()
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail
|
||||
|
|
@ -124,9 +116,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
}
|
||||
}
|
||||
result
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null
|
||||
|
|
@ -138,9 +128,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
notFull.signal()
|
||||
e
|
||||
}
|
||||
} finally {
|
||||
lock.unlock
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false
|
||||
|
|
@ -151,55 +139,35 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
notFull.signal()
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
override def contains(e: AnyRef): Boolean = {
|
||||
if (e eq null) throw new NullPointerException
|
||||
lock.lock()
|
||||
try {
|
||||
backing contains e
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.contains(e) finally lock.unlock()
|
||||
}
|
||||
|
||||
override def clear() {
|
||||
lock.lock()
|
||||
try {
|
||||
backing.clear
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.clear() finally lock.unlock()
|
||||
}
|
||||
|
||||
def remainingCapacity(): Int = {
|
||||
lock.lock()
|
||||
try {
|
||||
maxCapacity - backing.size()
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def size(): Int = {
|
||||
lock.lock()
|
||||
try {
|
||||
backing.size()
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.size() finally lock.unlock()
|
||||
}
|
||||
|
||||
def peek(): E = {
|
||||
lock.lock()
|
||||
try {
|
||||
backing.peek()
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.peek() finally lock.unlock()
|
||||
}
|
||||
|
||||
def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue)
|
||||
|
|
@ -219,19 +187,13 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
}
|
||||
} else n
|
||||
drainOne(0)
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
override def containsAll(c: Collection[_]): Boolean = {
|
||||
lock.lock()
|
||||
try {
|
||||
backing containsAll c
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.containsAll(c) finally lock.unlock()
|
||||
}
|
||||
|
||||
override def removeAll(c: Collection[_]): Boolean = {
|
||||
|
|
@ -243,9 +205,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
if (sz > 0) notEmpty.signal()
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
override def retainAll(c: Collection[_]): Boolean = {
|
||||
|
|
@ -257,9 +217,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
if (sz > 0) notEmpty.signal()
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
def iterator(): Iterator[E] = {
|
||||
|
|
@ -294,40 +252,24 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
}
|
||||
|
||||
removeTarget()
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock
|
||||
}
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
override def toArray(): Array[AnyRef] = {
|
||||
lock.lock()
|
||||
try {
|
||||
backing.toArray
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.toArray finally lock.unlock()
|
||||
}
|
||||
|
||||
override def isEmpty(): Boolean = {
|
||||
lock.lock()
|
||||
try {
|
||||
backing.isEmpty()
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.isEmpty() finally lock.unlock()
|
||||
}
|
||||
|
||||
override def toArray[X](a: Array[X with AnyRef]) = {
|
||||
lock.lock()
|
||||
try {
|
||||
backing.toArray[X](a)
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
try backing.toArray[X](a) finally lock.unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue