From 80eff855275579c8cf3c30a2329c6653de491f9e Mon Sep 17 00:00:00 2001 From: Thomas Smith Date: Tue, 24 Apr 2018 14:30:34 +0200 Subject: [PATCH] Add tests for BoundedBlockingQueue (#24850) --- .../akka/util/BoundedBlockingQueueSpec.scala | 818 ++++++++++++++++++ .../akka/util/BoundedBlockingQueue.scala | 13 +- 2 files changed, 827 insertions(+), 4 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala b/akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala new file mode 100644 index 0000000000..0055d9c30d --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala @@ -0,0 +1,818 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.util + +import java.util +import java.util.concurrent.locks.{ Condition, LockSupport, ReentrantLock } +import java.util.concurrent._ + +import org.scalatest.concurrent.{ Signaler, ThreadSignaler } +import org.scalatest.{ Matchers, WordSpec } +import org.scalatest.time.SpanSugar._ +import org.scalatest.time.{ Span, SpanSugar } +import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalactic.source.Position +import org.scalatest.matchers.{ MatchResult, Matcher } + +import scala.collection.mutable +import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future } +import scala.util.control.Exception +import DefaultExecutionContext._ + +import collection.JavaConverters._ + +class BoundedBlockingQueueSpec + extends WordSpec + with Matchers + with QueueSetupHelper + with CustomContainsMatcher + with BlockingHelpers { + + import QueueTestEvents._ + + "Constructor" must { + "not accept a null backing Queue" in { + intercept[IllegalArgumentException] { + new BoundedBlockingQueue(10, null) + } + } + + "not accept a non-empty backing Queue" in { + intercept[IllegalArgumentException] { + val listQ = new util.LinkedList[String]() + listQ.add("Hello") + new BoundedBlockingQueue[String](10, listQ) + } + } + + "accept and empty backing Queue" in { + new BoundedBlockingQueue(10, new util.LinkedList()).size() should equal(0) + } + + "get a positive maxCapacity" in { + intercept[IllegalArgumentException] { + new BoundedBlockingQueue(0, new util.LinkedList()) + } + } + + "check remainingCapacity for BlockingQueue" in { + intercept[IllegalArgumentException] { + new BoundedBlockingQueue(0, new LinkedBlockingQueue(1)) + } + new BoundedBlockingQueue(1, new LinkedBlockingQueue(1)).remainingCapacity() should equal(1) + } + + } + + "remainingCapacity" must { + "check the backing queue size" in { + val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + + backingQueue.offer("1") + queue.remainingCapacity() shouldBe 0 + events should contain(getSize) + } + } + + "put" must { + "reject null elements" in { + val queue = newBoundedBlockingQueue(1).queue + intercept[NullPointerException] { + queue.put(null) + } + } + + "call the backing queue if not full" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2) + queue.put("Hello") + queue.put("World") + events should contain inOrder (offer("Hello"), offer("World")) + } + + "signal notEmpty when an element is inserted" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.put("Hello") + events should contain(signalNotEmpty) + } + + "block when the queue is full" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.offer("1") + + mustBlockFor(100 milliseconds) { + queue.put("2") + } + events should contain inOrder (offer("1"), awaitNotFull) + events should not contain offer("2") + } + + "block until the backing queue has space" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.offer("a") + + val f = Future(queue.put("b")) + + after(10 milliseconds) { + queue.take() + } + + Await.result(f, 100 milliseconds) + events should contain inOrder (offer("a"), poll, offer("b")) + } + + "check the backing queue size before offering" in { + val TestContext(queue, events, _, notFull, lock, _) = newBoundedBlockingQueue(1) + queue.offer("a") + + // Blocks until another thread signals `notFull` + val f = Future(queue.put("b")) + + after(10 milliseconds) { + lock.lockInterruptibly() + notFull.signal() + lock.unlock() + } + + mustBlockFor(100 milliseconds, f) + events should containInSequence(offer("a"), awaitNotFull, signalNotFull, getSize, awaitNotFull) + events shouldNot contain(offer("b")) + } + } + + "take" must { + "call the backing queue if not empty" in { + val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + queue.put("Hello") + queue.take() + + events should contain inOrder (offer("Hello"), poll) + } + + "signal notFull when taking an element" in { + val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + queue.put("Hello") + queue.take() + events should contain(signalNotFull) + } + + "block when the queue is empty" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + + mustBlockFor(100 milliseconds) { + queue.take() + } + events should contain(awaitNotEmpty) + events should not contain (poll) + } + + "block until the backing queue is non-empty" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + + val f = Future(queue.take()) + after(10 milliseconds) { + queue.put("a") + } + + Await.ready(f, 100 milliseconds) + events should contain inOrder (awaitNotEmpty, offer("a"), poll) + } + + "check the backing queue size before polling" in { + val TestContext(queue, events, notEmpty, _, lock, _) = newBoundedBlockingQueue(1) + + // Blocks until another thread signals `notEmpty` + val f = Future(queue.take()) + + // Cause `notFull` signal, but don't fill the queue + after(10 milliseconds) { + lock.lockInterruptibly() + notEmpty.signal() + lock.unlock() + } + + // `f` should still block since the queue is still empty + mustBlockFor(100 milliseconds, f) + events should containInSequence(getSize, awaitNotEmpty, signalNotEmpty, getSize, awaitNotEmpty) + events shouldNot contain(poll) + } + } + + "offer" must { + "reject null elements" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + intercept[NullPointerException] { + queue.offer(null) + } + } + + "not block if the queue is full" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.offer("Hello") should equal(true) + queue.offer("World") should equal(false) + events should not contain offer("World") + } + + "call the backing queue if not full" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2) + queue.offer("Hello") should equal(true) + queue.offer("World") should equal(true) + events should contain inOrder (offer("Hello"), offer("World")) + } + + "signal notEmpty when the call succeeds" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.offer("Hello") + events should contain(signalNotEmpty) + } + } + + "offer with timeout" must { + "not block if the queue is not full" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.offer("Hello", 100, TimeUnit.MILLISECONDS) should equal(true) + events should contain(offer("Hello")) + } + + "signal notEmpty if the queue is not full" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.offer("Hello", 100, TimeUnit.MILLISECONDS) should equal(true) + events should contain(signalNotEmpty) + } + + "block for at least the timeout if the queue is full" in { + val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1) + queue.put("Hello") + + notFull.manualTimeControl(true) + + val f = Future(queue.offer("World", 100, TimeUnit.MILLISECONDS)) + after(10 milliseconds) { + notFull.advanceTime(99 milliseconds) + } + mustBlockFor(100 milliseconds, f) + events shouldNot contain(offer("World")) + } + + "return false if the timeout is exceeded" in { + val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1) + queue.put("Hello") + queue.offer("World", 100, TimeUnit.MILLISECONDS) should equal(false) + events shouldNot contain(offer("World")) + } + + "block for less than the timeout when the queue becomes not full" in { + val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1) + queue.put("Hello") + + notFull.manualTimeControl(true) + val f = Future(queue.offer("World", 100, TimeUnit.MILLISECONDS)) + notFull.advanceTime(99 milliseconds) + after(50 milliseconds) { + queue.take() + } + Await.result(f, 100 milliseconds) should equal(true) + events should contain inOrder (awaitNotFull, signalNotFull, offer("World")) + } + + "check the backing queue size before offering" in { + val TestContext(queue, events, _, notFull, lock, _) = newBoundedBlockingQueue(1) + queue.put("Hello") + // Blocks until another thread signals `notFull` + val f = Future(queue.offer("World", 1000, TimeUnit.DAYS)) + + // Cause `notFull` signal, but don't fill the queue + after(10 milliseconds) { + lock.lockInterruptibly() + notFull.signal() + lock.unlock() + } + + // `f` should still block since the queue is still empty + mustBlockFor(100 milliseconds, f) + events should containInSequence(getSize, awaitNotFull, signalNotFull, getSize, awaitNotFull) + events shouldNot contain(offer("World")) + } + } + + "poll" must { + "return null when the queue is empty" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + queue.poll() should equal(null) + } + + "poll the backing queue" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.poll() + events should contain(poll) + } + + "return the first element inserted" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2) + queue.put("Hello") + queue.put("World") + queue.poll() should equal("Hello") + } + + "signal notEmpty when an element is removed" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.put("Hello") + queue.poll() + events should contain(signalNotEmpty) + } + } + + "poll with timeout" must { + "not block if the queue is not empty" in { + val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + backingQueue.offer("Hello") + queue.poll(1, TimeUnit.DAYS) should equal("Hello") + events should contain(poll) + } + + "signal notFull when an element is removed" in { + val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + backingQueue.offer("Hello") + + queue.poll(1, TimeUnit.DAYS) + events should contain(signalNotFull) + } + + "block for at least the timeout if the queue is empty" in { + val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1) + notEmpty.manualTimeControl(true) + + val f = Future(queue.poll(100, TimeUnit.MILLISECONDS)) + + after(10 milliseconds) { + notEmpty.advanceTime(99 milliseconds) + } + mustBlockFor(100 milliseconds, f) + events should contain(awaitNotEmpty) + } + + "immediately poll the backing queue" in { + val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1) + + queue.poll(100, TimeUnit.MILLISECONDS) + events should contain(poll) + } + + "return null if the timeout is exceeded" in { + val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1) + + queue.poll(100, TimeUnit.MILLISECONDS) should equal(null) + } + + "block for less than the timeout when the queue becomes non-empty" in { + val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1) + + notEmpty.manualTimeControl(true) + + val f = Future(queue.poll(100, TimeUnit.MILLISECONDS)) + + notEmpty.advanceTime(99 milliseconds) + after(50 milliseconds) { + queue.put("Hello") + } + Await.result(f, 100 milliseconds) should equal("Hello") + events should contain inOrder (awaitNotEmpty, signalNotEmpty, poll) + } + } + + "remove" must { + "return false when the queue is empty" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + queue.remove("Hello") should equal(false) + } + + "return false when the queue does not contain the element" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + queue.put("H3llo") + queue.remove("Hello") should equal(false) + } + + "return true if the element was removed" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2) + queue.put("Hello") + queue.remove("Hello") should equal(true) + } + + "signal notFull when an element is removed" in { + val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1) + queue.put("Hello") + queue.remove("Hello") + events should contain(signalNotFull) + } + } + + "contains" must { + "return false when the queue is empty" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + queue.contains("Hello") should equal(false) + } + + "return false when the queue does not contain the element" in { + val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + backingQueue.offer("H3llo") + queue.contains("Hello") should equal(false) + } + + "return true if the backing queue contains the element" in { + val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(2) + backingQueue.offer("Hello") + queue.contains("Hello") should equal(true) + } + } + + "clear" must { + "remove all elements from the backing queue" in { + val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(2) + queue.put("Hello") + queue.clear() + backingQueue.size() should equal(0) + } + } + + "peek" must { + "return the first element in the queue" in { + val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(2) + backingQueue.offer("Hello") + queue.peek() should equal("Hello") + } + + "not remove from the queue" in { + val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + backingQueue.offer("Hello") + queue.peek() + backingQueue.contains("Hello") should equal(true) + } + } + + "drainTo" must { + "not accept a null argument" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + intercept[NullPointerException] { + queue.drainTo(null) + } + } + + "not accept the source queue as argument" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + intercept[IllegalArgumentException] { + queue.drainTo(queue) + } + } + + "not accept the backing queue as argument" in { + val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(1) + intercept[IllegalArgumentException] { + queue.drainTo(backingQueue) + } + } + + "remove all elements from the queue" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2) + queue.put("Hello") + queue.put("World") + queue.drainTo(new util.LinkedList[String]()) + queue.remainingCapacity() should equal(2) + } + + "add all elements to the target collection" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2) + val elems = List("Hello", "World") + val target = mutable.Buffer[String]() + elems.foreach(queue.put) + queue.drainTo(target.asJava) + elems should contain theSameElementsAs (target) + } + } + + "containsAll" must { + "return false if the queue contains a subset" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2) + val elems = List("Hello", "World") + elems.foreach(queue.put) + queue.containsAll(("Akka" :: elems).asJava) should equal(false) + } + + "returns true if the queue contains exactly all elements" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2) + val elems = List("Hello", "World") + elems.foreach(queue.put) + queue.containsAll(elems.asJava) should equal(true) + } + + "returns true if the queue contains more elements" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(3) + val elems = List("Hello", "World") + elems.foreach(queue.put) + queue.put("Akka") + queue.containsAll(elems.asJava) should equal(true) + } + } + + "removeAll" must { + "remove all elements from the argument collection" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(3) + val elems = List("Hello", "World") + elems.foreach(queue.put) + queue.put("Akka") + + queue.removeAll(elems.asJava) should equal(true) + queue.remainingCapacity() should equal(2) + queue.poll() should equal("Akka") + } + + "return false if no elements were removed" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + val elems = List("Hello", "World") + queue.put("Akka") + + queue.removeAll(elems.asJava) should equal(false) + queue.poll() should equal("Akka") + } + } + + "retainAll" must { + "remove all elements not in the argument collection" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(3) + val elems = List("Hello", "World") + elems.foreach(queue.put) + queue.put("Akka") + + queue.retainAll(elems.asJava) should equal(true) + queue.remainingCapacity() should equal(1) + queue.toArray() shouldNot contain("Akka") + queue.toArray() should contain theSameElementsAs (elems) + } + + "return false if no elements were removed" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2) + val elems = List("Hello", "World", "Akka") + queue.put("Hello") + queue.put("World") + + queue.retainAll(elems.asJava) should equal(false) + queue.toArray() should contain allOf ("Hello", "World") + } + } + + "iterator" must { + "not any elements if the queue is empty" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1) + queue.iterator().hasNext() should equal(false) + } + + "disallow remove() before calling next()" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2) + queue.put("Hello") + queue.put("Akka") + + val iter = queue.iterator() + intercept[IllegalStateException] { + iter.remove() + } + } + + "disallow two subsequent remove()s" in { + val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2) + queue.put("Hello") + queue.put("Akka") + + val iter = queue.iterator() + iter.next() + iter.remove() + intercept[IllegalStateException] { + iter.remove() + } + } + } + +} + +/** + * Custom trait for matching sequences. + * + * The following will succeed: + * `List(0,1,2,3,1) should containInSequence(1,2,3)` + * + * Note that this fails with `... should contain inOrder ...` + */ +trait CustomContainsMatcher { + + class ContainsInSequenceMatcher[A](right: List[A]) extends Matcher[Seq[A]] { + + def apply(left: Seq[A]): MatchResult = { + + def attemptMatch(remainingTruth: List[A], remainingSequence: List[A]): MatchResult = + (remainingTruth, remainingSequence) match { + case (_, Nil) ⇒ matchResult(true) + case (Nil, _) ⇒ matchResult(false) + case (x :: xs, y :: ys) if x.equals(y) ⇒ attemptMatch(xs, ys) + case (x :: xs, ys) ⇒ attemptMatch(xs, ys) + } + + def matchResult(success: Boolean): MatchResult = + MatchResult( + success, + s"""$left did not contain all of $right in sequence""", + s"""$left contains all of $right in sequence""" + ) + + attemptMatch(left.toList, right) + } + } + def containInSequence(one: Any, two: Any, rest: Any*) = new ContainsInSequenceMatcher[Any](one :: two :: rest.toList) +} + +/** + * Some helper methods for dealing with things that should be blocking. + * Whether or not a call blocks is impossible to determine precisely, + * therefore we resort to checking that a call does not return within a set time. + */ +trait BlockingHelpers { + import org.scalatest.concurrent.TimeLimits._ + import org.scalatest.Assertions._ + + implicit val timeoutSignaler: Signaler = ThreadSignaler + + /** + * Delay execution of some code until the timespan has passed. + * The action is wrapped in a Future that initially calls `Thread.sleep()` + */ + def after(timespan: Span)(action: ⇒ Unit): Future[Unit] = Future { + LockSupport.parkNanos(timespan.toNanos) + action + } + + /** + * Check that a call does not return within a set timespan. + */ + def mustBlockFor(timeout: Span)(action: ⇒ Unit)(implicit pos: Position): Unit = + Exception.ignoring(classOf[TestFailedDueToTimeoutException]) { + failAfter(timeout)(action) + fail("Expected action to block for at least " + timeout.prettyString + " but it completed.") + } + + /** + * Check that a Future does not complete within a set timespan. + */ + def mustBlockFor(timeout: Span, action: Future[_])(implicit pos: Position): Unit = + Exception.ignoring(classOf[TimeoutException]) { + Await.ready(action, timeout) + fail("Expected action to block for at least " + timeout.prettyString + " but it completed.") + } + +} + +/** + * All events that can be recorded and asserted during a test. + */ +object QueueTestEvents { + sealed abstract class QueueEvent + case class Poll() extends QueueEvent + case class Offer(value: String) extends QueueEvent + case class GetSize() extends QueueEvent + case class AwaitNotEmpty() extends QueueEvent + case class AwaitNotFull() extends QueueEvent + case class SignalNotEmpty() extends QueueEvent + case class SignalNotFull() extends QueueEvent + + val poll = Poll() + val getSize = GetSize() + def offer(value: String) = Offer(value) + val signalNotEmpty = SignalNotEmpty() + val signalNotFull = SignalNotFull() + val awaitNotEmpty = AwaitNotEmpty() + val awaitNotFull = AwaitNotFull() +} + +/** + * Helper for setting up a queue under test with injected lock, conditions and backing queue. + */ +trait QueueSetupHelper { + import akka.util.QueueTestEvents._ + import java.util.Date + + case class TestContext(queue: BoundedBlockingQueue[String], events: mutable.MutableList[QueueEvent], notEmpty: TestCondition, notFull: TestCondition, lock: ReentrantLock, backingQueue: util.Queue[String]) + + /** + * Backing queue that records all poll and offer calls in `events` + */ + class TestBackingQueue(events: mutable.MutableList[QueueEvent]) + extends util.LinkedList[String] { + + override def poll(): String = { + events += Poll() + super.poll() + } + + override def offer(e: String): Boolean = { + events += Offer(e) + super.offer(e) + } + + override def size(): Int = { + events += GetSize() + super.size() + } + } + + /** + * Reentrant lock condition that records when the condition is signaled or `await`ed. + */ + class TestCondition(events: mutable.MutableList[QueueEvent], condition: Condition, signalEvent: QueueEvent, awaitEvent: QueueEvent) + extends Condition { + + private var waitTime: Long = 0 + private var manual = false + + private var waitingThread: Option[Thread] = None + + def advanceTime(timespan: Span): Unit = { + waitTime -= timespan.toNanos + if (waitTime <= 0 && waitingThread.isDefined) { + waitingThread.get.interrupt() + waitingThread = None + } + } + + def manualTimeControl(on: Boolean): Unit = + manual = on + + override def signalAll(): Unit = condition.signalAll() + + override def awaitNanos(nanosTimeout: Long): Long = { + if (!manual) { + events += awaitEvent + condition.awaitNanos(nanosTimeout) + } else { + waitTime = nanosTimeout + waitingThread = Some(Thread.currentThread()) + + try { + this.await() + } catch { + case e: InterruptedException ⇒ + } + 0 + } + } + + override def awaitUninterruptibly(): Unit = condition.awaitUninterruptibly() + + override def await(): Unit = { + events += awaitEvent + condition.await() + } + + override def await(time: Long, unit: TimeUnit): Boolean = condition.await(time, unit) + + override def signal(): Unit = { + events += signalEvent + condition.signal() + } + + override def awaitUntil(deadline: Date): Boolean = condition.awaitUntil(deadline) + } + + def newBoundedBlockingQueue(maxCapacity: Int): TestContext = { + val events: mutable.MutableList[QueueEvent] = new mutable.MutableList() + + val realLock = new ReentrantLock(false) + val wrappedNotEmpty = new TestCondition(events, realLock.newCondition(), signalNotEmpty, awaitNotEmpty) + val wrappedNotFull = new TestCondition(events, realLock.newCondition(), signalNotFull, awaitNotFull) + + val backingQueue = new TestBackingQueue(events) + + /** + * Class under test with the necessary backing queue, lock and conditions injected. + */ + class TestBoundedBlockingQueue() + extends BoundedBlockingQueue[String](maxCapacity, backingQueue) { + + override def createLock(): ReentrantLock = realLock + override def createNotEmptyCondition(): Condition = wrappedNotEmpty + override def createNotFullCondition(): Condition = wrappedNotFull + } + + TestContext(new TestBoundedBlockingQueue(), events, + wrappedNotEmpty, wrappedNotFull, realLock, + backingQueue) + } +} + +/** + * Schedule every Future on a new thread. + * Ensures that we don't accidentally run things on one thread that should be concurrent. + */ +object DefaultExecutionContext { + implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(new Executor { + override def execute(command: Runnable): Unit = new Thread() { + override def run(): Unit = command.run() + }.start() + }) +} diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index c7f58e7ca4..ec87c88fd6 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -4,9 +4,10 @@ package akka.util -import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.locks.{ Condition, ReentrantLock } import java.util.concurrent.{ TimeUnit, BlockingQueue } import java.util.{ AbstractQueue, Queue, Collection, Iterator } + import annotation.tailrec /** @@ -28,14 +29,18 @@ class BoundedBlockingQueue[E <: AnyRef]( require(maxCapacity > 0) } - protected val lock = new ReentrantLock(false) + protected val lock = createLock() + protected val notEmpty = createNotEmptyCondition() + protected val notFull = createNotFullCondition() - private val notEmpty = lock.newCondition() - private val notFull = lock.newCondition() + protected def createLock(): ReentrantLock = new ReentrantLock(false) + protected def createNotEmptyCondition(): Condition = lock.newCondition() + protected def createNotFullCondition(): Condition = lock.newCondition() def put(e: E) { //Blocks until not full if (e eq null) throw new NullPointerException lock.lockInterruptibly() + try { @tailrec def putElement() { if (backing.size() < maxCapacity) {