diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 2876ab34cc..136f9dd8b5 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -7,11 +7,12 @@ import language.postfixOps import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue } import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } -import com.typesafe.config.Config -import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell } +import com.typesafe.config.{ Config, ConfigFactory } +import akka.actor._ import akka.testkit.{ EventFilter, AkkaSpec } -import scala.concurrent.{ Future, Promise, Await, ExecutionContext } +import scala.concurrent.{ Future, Await, ExecutionContext } import scala.concurrent.duration._ +import akka.dispatch.{ UnboundedMailbox, BoundedMailbox, SingleConsumerOnlyUnboundedMailbox } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -236,3 +237,41 @@ class SingleConsumerOnlyMailboxSpec extends MailboxSpec { case b: BoundedMailbox ⇒ pending; null } } + +object SingleConsumerOnlyMailboxVerificationSpec { + case object Ping + val mailboxConf = ConfigFactory.parseString(""" + test-dispatcher { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + throughput = 1 + }""") +} + +class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerOnlyMailboxVerificationSpec.mailboxConf) { + import SingleConsumerOnlyMailboxVerificationSpec.Ping + "A SingleConsumerOnlyMailbox" should { + "support pathological ping-ponging" in within(30.seconds) { + val total = 2000000 + val runner = system.actorOf(Props(new Actor { + val a, b = context.watch( + context.actorOf(Props(new Actor { + var n = total / 2 + def receive = { + case Ping ⇒ + n -= 1 + sender ! Ping + if (n == 0) + context stop self + } + }).withDispatcher("test-dispatcher"))) + def receive = { + case Ping ⇒ a.tell(Ping, b) + case Terminated(`a` | `b`) ⇒ if (context.children.isEmpty) context stop self + } + })) + watch(runner) + runner ! Ping + expectTerminated(runner) + } + } +} diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java index e72a6b2e18..33954553be 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java @@ -23,7 +23,12 @@ public abstract class AbstractNodeQueue extends AtomicReference peekNode() { - return ((Node)Unsafe.instance.getObjectVolatile(this, tailOffset)).next(); + for(;;) { + final Node tail = ((Node)Unsafe.instance.getObjectVolatile(this, tailOffset)); + final Node next = tail.next(); + if (next != null || get() == tail) + return next; + } } public final T peek() { @@ -53,7 +58,7 @@ public abstract class AbstractNodeQueue extends AtomicReference extends AtomicReference newNext) { - Unsafe.instance.putOrderedObject(this, nextOffset, newNext); + Unsafe.instance.putOrderedObject(this, nextOffset, newNext); } private final static long nextOffset;