Merge pull request #15503 from akka/wip-15501-add-bounded-nonblocking-mpmc-mailbox-√
+act - #15501 - Adding support for a Non-blocking, bounded, MPMC mailbox...
This commit is contained in:
commit
296f5a7cab
5 changed files with 317 additions and 26 deletions
|
|
@ -43,6 +43,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
|
||||||
|
|
||||||
"create a bounded mailbox with 10 capacity and with push timeout" in {
|
"create a bounded mailbox with 10 capacity and with push timeout" in {
|
||||||
val config = BoundedMailbox(10, 10 milliseconds)
|
val config = BoundedMailbox(10, 10 milliseconds)
|
||||||
|
config.capacity should be(10)
|
||||||
val q = factory(config)
|
val q = factory(config)
|
||||||
ensureInitialMailboxState(config, q)
|
ensureInitialMailboxState(config, q)
|
||||||
|
|
||||||
|
|
@ -241,7 +242,7 @@ class SingleConsumerOnlyMailboxSpec extends MailboxSpec {
|
||||||
override def maxConsumers = 1
|
override def maxConsumers = 1
|
||||||
def factory = {
|
def factory = {
|
||||||
case u: UnboundedMailbox ⇒ SingleConsumerOnlyUnboundedMailbox().create(None, None)
|
case u: UnboundedMailbox ⇒ SingleConsumerOnlyUnboundedMailbox().create(None, None)
|
||||||
case b: BoundedMailbox ⇒ pending; null
|
case b @ BoundedMailbox(capacity, _) ⇒ NonBlockingBoundedMailbox(capacity).create(None, None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -249,16 +250,21 @@ object SingleConsumerOnlyMailboxVerificationSpec {
|
||||||
case object Ping
|
case object Ping
|
||||||
val mailboxConf = ConfigFactory.parseString("""
|
val mailboxConf = ConfigFactory.parseString("""
|
||||||
akka.actor.serialize-messages = off
|
akka.actor.serialize-messages = off
|
||||||
test-dispatcher {
|
test-unbounded-dispatcher {
|
||||||
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||||
throughput = 1
|
throughput = 1
|
||||||
|
}
|
||||||
|
test-bounded-dispatcher {
|
||||||
|
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
|
||||||
|
mailbox-capacity = 1
|
||||||
|
throughput = 1
|
||||||
}""")
|
}""")
|
||||||
}
|
}
|
||||||
|
|
||||||
class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerOnlyMailboxVerificationSpec.mailboxConf) {
|
class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerOnlyMailboxVerificationSpec.mailboxConf) {
|
||||||
import SingleConsumerOnlyMailboxVerificationSpec.Ping
|
import SingleConsumerOnlyMailboxVerificationSpec.Ping
|
||||||
"A SingleConsumerOnlyMailbox" should {
|
|
||||||
"support pathological ping-ponging" in within(30.seconds) {
|
def pathologicalPingPong(dispatcherId: String): Unit = {
|
||||||
val total = 2000000
|
val total = 2000000
|
||||||
val runner = system.actorOf(Props(new Actor {
|
val runner = system.actorOf(Props(new Actor {
|
||||||
val a, b = context.watch(
|
val a, b = context.watch(
|
||||||
|
|
@ -271,7 +277,7 @@ class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerO
|
||||||
if (n == 0)
|
if (n == 0)
|
||||||
context stop self
|
context stop self
|
||||||
}
|
}
|
||||||
}).withDispatcher("test-dispatcher")))
|
}).withDispatcher(dispatcherId)))
|
||||||
def receive = {
|
def receive = {
|
||||||
case Ping ⇒ a.tell(Ping, b)
|
case Ping ⇒ a.tell(Ping, b)
|
||||||
case Terminated(`a` | `b`) ⇒ if (context.children.isEmpty) context stop self
|
case Terminated(`a` | `b`) ⇒ if (context.children.isEmpty) context stop self
|
||||||
|
|
@ -281,5 +287,14 @@ class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerO
|
||||||
runner ! Ping
|
runner ! Ping
|
||||||
expectTerminated(runner)
|
expectTerminated(runner)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"A SingleConsumerOnlyMailbox" should {
|
||||||
|
"support pathological ping-ponging for the unbounded case" in within(30.seconds) {
|
||||||
|
pathologicalPingPong("test-unbounded-dispatcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
"support pathological ping-ponging for the bounded case" in within(30.seconds) {
|
||||||
|
pathologicalPingPong("test-bounded-dispatcher")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,215 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.dispatch;
|
||||||
|
|
||||||
|
import akka.util.Unsafe;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lock-free bounded non-blocking multiple-producer multiple-consumer queue based on the works of:
|
||||||
|
*
|
||||||
|
* Andriy Plokhotnuyk (https://github.com/plokhotnyuk)
|
||||||
|
* - https://github.com/plokhotnyuk/actors/blob/2e65abb7ce4cbfcb1b29c98ee99303d6ced6b01f/src/test/scala/akka/dispatch/Mailboxes.scala
|
||||||
|
* (Apache V2: https://github.com/plokhotnyuk/actors/blob/master/LICENSE)
|
||||||
|
*
|
||||||
|
* Dmitriy Vyukov's non-intrusive MPSC queue:
|
||||||
|
* - http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
|
||||||
|
* (Simplified BSD)
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
public abstract class AbstractBoundedNodeQueue<T> {
|
||||||
|
private final int capacity;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private volatile Node<T> _enqDoNotCallMeDirectly;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private volatile Node<T> _deqDoNotCallMeDirectly;
|
||||||
|
|
||||||
|
protected AbstractBoundedNodeQueue(final int capacity) {
|
||||||
|
if (capacity < 0) throw new IllegalArgumentException("AbstractBoundedNodeQueue.capacity must be >= 0");
|
||||||
|
this.capacity = capacity;
|
||||||
|
final Node<T> n = new Node<T>();
|
||||||
|
setDeq(n);
|
||||||
|
setEnq(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final void setEnq(Node<T> n) {
|
||||||
|
Unsafe.instance.putObjectVolatile(this, enqOffset, n);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private final Node<T> getEnq() {
|
||||||
|
return (Node<T>)Unsafe.instance.getObjectVolatile(this, enqOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final boolean casEnq(Node<T> old, Node<T> nju) {
|
||||||
|
return Unsafe.instance.compareAndSwapObject(this, enqOffset, old, nju);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final void setDeq(Node<T> n) {
|
||||||
|
Unsafe.instance.putObjectVolatile(this, deqOffset, n);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private final Node<T> getDeq() {
|
||||||
|
return (Node<T>)Unsafe.instance.getObjectVolatile(this, deqOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final boolean casDeq(Node<T> old, Node<T> nju) {
|
||||||
|
return Unsafe.instance.compareAndSwapObject(this, deqOffset, old, nju);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
protected final Node<T> peekNode() {
|
||||||
|
for(;;) {
|
||||||
|
final Node<T> deq = getDeq();
|
||||||
|
final Node<T> next = deq.next();
|
||||||
|
if (next != null || getEnq() == deq)
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return the first value of this queue, null if empty
|
||||||
|
*/
|
||||||
|
public final T peek() {
|
||||||
|
final Node<T> n = peekNode();
|
||||||
|
return (n != null) ? n.value : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the maximum capacity of this queue
|
||||||
|
*/
|
||||||
|
public final int capacity() {
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
// Possible TODO — impl. could be switched to addNode(new Node(value)) if we want to allocate even if full already
|
||||||
|
public final boolean add(final T value) {
|
||||||
|
for(Node<T> n = null;;) {
|
||||||
|
final Node<T> lastNode = getEnq();
|
||||||
|
final int lastNodeCount = lastNode.count;
|
||||||
|
if (lastNodeCount - getDeq().count < capacity) {
|
||||||
|
// Trade a branch for avoiding to create a new node if full,
|
||||||
|
// and to avoid creating multiple nodes on write conflict á la Be Kind to Your GC
|
||||||
|
if (n == null) {
|
||||||
|
n = new Node<T>();
|
||||||
|
n.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq()
|
||||||
|
|
||||||
|
// Try to append the node to the end, if we fail we continue loopin'
|
||||||
|
if(casEnq(lastNode, n)) {
|
||||||
|
lastNode.setNext(n);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else return false; // Over capacity—couldn't add the node
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean addNode(final Node<T> n) {
|
||||||
|
n.setNext(null); // Make sure we're not corrupting the queue
|
||||||
|
for(;;) {
|
||||||
|
final Node<T> lastNode = getEnq();
|
||||||
|
final int lastNodeCount = lastNode.count;
|
||||||
|
if (lastNodeCount - getDeq().count < capacity) {
|
||||||
|
n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq()
|
||||||
|
// Try to append the node to the end, if we fail we continue loopin'
|
||||||
|
if(casEnq(lastNode, n)) {
|
||||||
|
lastNode.setNext(n);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else return false; // Over capacity—couldn't add the node
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean isEmpty() {
|
||||||
|
return peekNode() == null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an approximation of the queue's "current" size
|
||||||
|
*/
|
||||||
|
public final int size() {
|
||||||
|
//Order of operations is extremely important here
|
||||||
|
// If no item was dequeued between when we looked at the count of the enqueueing end,
|
||||||
|
// there should be no out-of-bounds
|
||||||
|
for(;;) {
|
||||||
|
final int deqCountBefore = getDeq().count;
|
||||||
|
final int enqCount = getEnq().count;
|
||||||
|
final int deqCountAfter = getDeq().count;
|
||||||
|
|
||||||
|
if (deqCountAfter == deqCountBefore)
|
||||||
|
return enqCount - deqCountAfter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the first element of this queue if any
|
||||||
|
* @return the value of the first element of the queue, null if empty
|
||||||
|
*/
|
||||||
|
public final T poll() {
|
||||||
|
final Node<T> n = pollNode();
|
||||||
|
return (n != null) ? n.value : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the first element of this queue if any
|
||||||
|
* @return the `Node` of the first element of the queue, null if empty
|
||||||
|
*/
|
||||||
|
public final Node<T> pollNode() {
|
||||||
|
for(;;) {
|
||||||
|
final Node<T> deq = getDeq();
|
||||||
|
final Node<T> next = deq.next();
|
||||||
|
if (next != null) {
|
||||||
|
if (casDeq(deq, next)) {
|
||||||
|
deq.value = next.value;
|
||||||
|
next.value = null;
|
||||||
|
return deq;
|
||||||
|
} // else we retry (concurrent consumers)
|
||||||
|
} else if (getEnq() == deq) return null; // If we got a null and head meets tail, we are empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static long enqOffset, deqOffset;
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
enqOffset = Unsafe.instance.objectFieldOffset(AbstractBoundedNodeQueue.class.getDeclaredField("_enqDoNotCallMeDirectly"));
|
||||||
|
deqOffset = Unsafe.instance.objectFieldOffset(AbstractBoundedNodeQueue.class.getDeclaredField("_deqDoNotCallMeDirectly"));
|
||||||
|
} catch(Throwable t){
|
||||||
|
throw new ExceptionInInitializerError(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Node<T> {
|
||||||
|
protected T value;
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private volatile Node<T> _nextDoNotCallMeDirectly;
|
||||||
|
protected int count;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public final Node<T> next() {
|
||||||
|
return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final void setNext(final Node<T> newNext) {
|
||||||
|
Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static long nextOffset;
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
|
||||||
|
} catch(Throwable t){
|
||||||
|
throw new ExceptionInInitializerError(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -370,6 +370,31 @@ class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue wit
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Discards overflowing messages into DeadLetters
|
||||||
|
class BoundedNodeMessageQueue(capacity: Int) extends AbstractBoundedNodeQueue[Envelope](capacity)
|
||||||
|
with MessageQueue with BoundedMessageQueueSemantics with MultipleConsumerSemantics {
|
||||||
|
final def pushTimeOut: Duration = Duration.Undefined
|
||||||
|
|
||||||
|
final def enqueue(receiver: ActorRef, handle: Envelope): Unit =
|
||||||
|
if (!add(handle))
|
||||||
|
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
|
||||||
|
DeadLetter(handle.message, handle.sender, receiver), handle.sender)
|
||||||
|
|
||||||
|
final def dequeue(): Envelope = poll()
|
||||||
|
|
||||||
|
final def numberOfMessages: Int = size()
|
||||||
|
|
||||||
|
final def hasMessages: Boolean = !isEmpty()
|
||||||
|
|
||||||
|
@tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
|
||||||
|
val envelope = dequeue()
|
||||||
|
if (envelope ne null) {
|
||||||
|
deadLetters.enqueue(owner, envelope)
|
||||||
|
cleanUp(owner, deadLetters)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
|
|
@ -576,6 +601,22 @@ final case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with P
|
||||||
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue
|
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NonBlockingBoundedMailbox is a high-performance, multiple producer—multiple consumer, bounded MailboxType,
|
||||||
|
* Noteworthy is that it discards overflow as DeadLetters.
|
||||||
|
*
|
||||||
|
* NOTE: NonBlockingBoundedMailbox does not use `mailbox-push-timeout-time` as it is non-blocking.
|
||||||
|
*/
|
||||||
|
case class NonBlockingBoundedMailbox(val capacity: Int) extends MailboxType with ProducesMessageQueue[BoundedNodeMessageQueue] {
|
||||||
|
|
||||||
|
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"))
|
||||||
|
|
||||||
|
if (capacity < 0) throw new IllegalArgumentException("The capacity for NonBlockingBoundedMailbox can not be negative")
|
||||||
|
|
||||||
|
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
|
||||||
|
new BoundedNodeMessageQueue(capacity)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
|
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -131,6 +131,16 @@ Akka comes shipped with a number of mailbox implementations:
|
||||||
|
|
||||||
- Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
- Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||||
|
|
||||||
|
* NonBlockingBoundedMailbox
|
||||||
|
|
||||||
|
- Backed by a very efficient MultiPle-Producer Multiple-Consumer queue
|
||||||
|
|
||||||
|
- Blocking: No
|
||||||
|
|
||||||
|
- Bounded: Yes
|
||||||
|
|
||||||
|
- Configuration name: "akka.dispatch.NonBlockingBoundedMailbox"
|
||||||
|
|
||||||
* BoundedMailbox
|
* BoundedMailbox
|
||||||
|
|
||||||
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
|
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
|
||||||
|
|
|
||||||
|
|
@ -123,7 +123,7 @@ Akka comes shipped with a number of mailbox implementations:
|
||||||
|
|
||||||
* SingleConsumerOnlyUnboundedMailbox
|
* SingleConsumerOnlyUnboundedMailbox
|
||||||
|
|
||||||
- Backed by a very efficient Multiple Producer Single Consumer queue, cannot be used with BalancingDispatcher
|
- Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with BalancingDispatcher
|
||||||
|
|
||||||
- Blocking: No
|
- Blocking: No
|
||||||
|
|
||||||
|
|
@ -141,6 +141,16 @@ Akka comes shipped with a number of mailbox implementations:
|
||||||
|
|
||||||
- Configuration name: "bounded" or "akka.dispatch.BoundedMailbox"
|
- Configuration name: "bounded" or "akka.dispatch.BoundedMailbox"
|
||||||
|
|
||||||
|
* NonBlockingBoundedMailbox
|
||||||
|
|
||||||
|
- Backed by a very efficient MultiPle-Producer Multiple-Consumer queue
|
||||||
|
|
||||||
|
- Blocking: No
|
||||||
|
|
||||||
|
- Bounded: Yes
|
||||||
|
|
||||||
|
- Configuration name: "akka.dispatch.NonBlockingBoundedMailbox"
|
||||||
|
|
||||||
* UnboundedPriorityMailbox
|
* UnboundedPriorityMailbox
|
||||||
|
|
||||||
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
|
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue