Add tests for BoundedBlockingQueue (#24850)
This commit is contained in:
parent
fad74150ee
commit
80eff85527
2 changed files with 827 additions and 4 deletions
|
|
@ -0,0 +1,818 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
||||
import java.util
|
||||
import java.util.concurrent.locks.{ Condition, LockSupport, ReentrantLock }
|
||||
import java.util.concurrent._
|
||||
|
||||
import org.scalatest.concurrent.{ Signaler, ThreadSignaler }
|
||||
import org.scalatest.{ Matchers, WordSpec }
|
||||
import org.scalatest.time.SpanSugar._
|
||||
import org.scalatest.time.{ Span, SpanSugar }
|
||||
import org.scalatest.exceptions.TestFailedDueToTimeoutException
|
||||
import org.scalactic.source.Position
|
||||
import org.scalatest.matchers.{ MatchResult, Matcher }
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future }
|
||||
import scala.util.control.Exception
|
||||
import DefaultExecutionContext._
|
||||
|
||||
import collection.JavaConverters._
|
||||
|
||||
class BoundedBlockingQueueSpec
|
||||
extends WordSpec
|
||||
with Matchers
|
||||
with QueueSetupHelper
|
||||
with CustomContainsMatcher
|
||||
with BlockingHelpers {
|
||||
|
||||
import QueueTestEvents._
|
||||
|
||||
"Constructor" must {
|
||||
"not accept a null backing Queue" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
new BoundedBlockingQueue(10, null)
|
||||
}
|
||||
}
|
||||
|
||||
"not accept a non-empty backing Queue" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
val listQ = new util.LinkedList[String]()
|
||||
listQ.add("Hello")
|
||||
new BoundedBlockingQueue[String](10, listQ)
|
||||
}
|
||||
}
|
||||
|
||||
"accept and empty backing Queue" in {
|
||||
new BoundedBlockingQueue(10, new util.LinkedList()).size() should equal(0)
|
||||
}
|
||||
|
||||
"get a positive maxCapacity" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
new BoundedBlockingQueue(0, new util.LinkedList())
|
||||
}
|
||||
}
|
||||
|
||||
"check remainingCapacity for BlockingQueue" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
new BoundedBlockingQueue(0, new LinkedBlockingQueue(1))
|
||||
}
|
||||
new BoundedBlockingQueue(1, new LinkedBlockingQueue(1)).remainingCapacity() should equal(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"remainingCapacity" must {
|
||||
"check the backing queue size" in {
|
||||
val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
|
||||
backingQueue.offer("1")
|
||||
queue.remainingCapacity() shouldBe 0
|
||||
events should contain(getSize)
|
||||
}
|
||||
}
|
||||
|
||||
"put" must {
|
||||
"reject null elements" in {
|
||||
val queue = newBoundedBlockingQueue(1).queue
|
||||
intercept[NullPointerException] {
|
||||
queue.put(null)
|
||||
}
|
||||
}
|
||||
|
||||
"call the backing queue if not full" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
queue.put("Hello")
|
||||
queue.put("World")
|
||||
events should contain inOrder (offer("Hello"), offer("World"))
|
||||
}
|
||||
|
||||
"signal notEmpty when an element is inserted" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
events should contain(signalNotEmpty)
|
||||
}
|
||||
|
||||
"block when the queue is full" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.offer("1")
|
||||
|
||||
mustBlockFor(100 milliseconds) {
|
||||
queue.put("2")
|
||||
}
|
||||
events should contain inOrder (offer("1"), awaitNotFull)
|
||||
events should not contain offer("2")
|
||||
}
|
||||
|
||||
"block until the backing queue has space" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.offer("a")
|
||||
|
||||
val f = Future(queue.put("b"))
|
||||
|
||||
after(10 milliseconds) {
|
||||
queue.take()
|
||||
}
|
||||
|
||||
Await.result(f, 100 milliseconds)
|
||||
events should contain inOrder (offer("a"), poll, offer("b"))
|
||||
}
|
||||
|
||||
"check the backing queue size before offering" in {
|
||||
val TestContext(queue, events, _, notFull, lock, _) = newBoundedBlockingQueue(1)
|
||||
queue.offer("a")
|
||||
|
||||
// Blocks until another thread signals `notFull`
|
||||
val f = Future(queue.put("b"))
|
||||
|
||||
after(10 milliseconds) {
|
||||
lock.lockInterruptibly()
|
||||
notFull.signal()
|
||||
lock.unlock()
|
||||
}
|
||||
|
||||
mustBlockFor(100 milliseconds, f)
|
||||
events should containInSequence(offer("a"), awaitNotFull, signalNotFull, getSize, awaitNotFull)
|
||||
events shouldNot contain(offer("b"))
|
||||
}
|
||||
}
|
||||
|
||||
"take" must {
|
||||
"call the backing queue if not empty" in {
|
||||
val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
queue.take()
|
||||
|
||||
events should contain inOrder (offer("Hello"), poll)
|
||||
}
|
||||
|
||||
"signal notFull when taking an element" in {
|
||||
val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
queue.take()
|
||||
events should contain(signalNotFull)
|
||||
}
|
||||
|
||||
"block when the queue is empty" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
|
||||
mustBlockFor(100 milliseconds) {
|
||||
queue.take()
|
||||
}
|
||||
events should contain(awaitNotEmpty)
|
||||
events should not contain (poll)
|
||||
}
|
||||
|
||||
"block until the backing queue is non-empty" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
|
||||
val f = Future(queue.take())
|
||||
after(10 milliseconds) {
|
||||
queue.put("a")
|
||||
}
|
||||
|
||||
Await.ready(f, 100 milliseconds)
|
||||
events should contain inOrder (awaitNotEmpty, offer("a"), poll)
|
||||
}
|
||||
|
||||
"check the backing queue size before polling" in {
|
||||
val TestContext(queue, events, notEmpty, _, lock, _) = newBoundedBlockingQueue(1)
|
||||
|
||||
// Blocks until another thread signals `notEmpty`
|
||||
val f = Future(queue.take())
|
||||
|
||||
// Cause `notFull` signal, but don't fill the queue
|
||||
after(10 milliseconds) {
|
||||
lock.lockInterruptibly()
|
||||
notEmpty.signal()
|
||||
lock.unlock()
|
||||
}
|
||||
|
||||
// `f` should still block since the queue is still empty
|
||||
mustBlockFor(100 milliseconds, f)
|
||||
events should containInSequence(getSize, awaitNotEmpty, signalNotEmpty, getSize, awaitNotEmpty)
|
||||
events shouldNot contain(poll)
|
||||
}
|
||||
}
|
||||
|
||||
"offer" must {
|
||||
"reject null elements" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
intercept[NullPointerException] {
|
||||
queue.offer(null)
|
||||
}
|
||||
}
|
||||
|
||||
"not block if the queue is full" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.offer("Hello") should equal(true)
|
||||
queue.offer("World") should equal(false)
|
||||
events should not contain offer("World")
|
||||
}
|
||||
|
||||
"call the backing queue if not full" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
queue.offer("Hello") should equal(true)
|
||||
queue.offer("World") should equal(true)
|
||||
events should contain inOrder (offer("Hello"), offer("World"))
|
||||
}
|
||||
|
||||
"signal notEmpty when the call succeeds" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.offer("Hello")
|
||||
events should contain(signalNotEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
"offer with timeout" must {
|
||||
"not block if the queue is not full" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.offer("Hello", 100, TimeUnit.MILLISECONDS) should equal(true)
|
||||
events should contain(offer("Hello"))
|
||||
}
|
||||
|
||||
"signal notEmpty if the queue is not full" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.offer("Hello", 100, TimeUnit.MILLISECONDS) should equal(true)
|
||||
events should contain(signalNotEmpty)
|
||||
}
|
||||
|
||||
"block for at least the timeout if the queue is full" in {
|
||||
val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
|
||||
notFull.manualTimeControl(true)
|
||||
|
||||
val f = Future(queue.offer("World", 100, TimeUnit.MILLISECONDS))
|
||||
after(10 milliseconds) {
|
||||
notFull.advanceTime(99 milliseconds)
|
||||
}
|
||||
mustBlockFor(100 milliseconds, f)
|
||||
events shouldNot contain(offer("World"))
|
||||
}
|
||||
|
||||
"return false if the timeout is exceeded" in {
|
||||
val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
queue.offer("World", 100, TimeUnit.MILLISECONDS) should equal(false)
|
||||
events shouldNot contain(offer("World"))
|
||||
}
|
||||
|
||||
"block for less than the timeout when the queue becomes not full" in {
|
||||
val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
|
||||
notFull.manualTimeControl(true)
|
||||
val f = Future(queue.offer("World", 100, TimeUnit.MILLISECONDS))
|
||||
notFull.advanceTime(99 milliseconds)
|
||||
after(50 milliseconds) {
|
||||
queue.take()
|
||||
}
|
||||
Await.result(f, 100 milliseconds) should equal(true)
|
||||
events should contain inOrder (awaitNotFull, signalNotFull, offer("World"))
|
||||
}
|
||||
|
||||
"check the backing queue size before offering" in {
|
||||
val TestContext(queue, events, _, notFull, lock, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
// Blocks until another thread signals `notFull`
|
||||
val f = Future(queue.offer("World", 1000, TimeUnit.DAYS))
|
||||
|
||||
// Cause `notFull` signal, but don't fill the queue
|
||||
after(10 milliseconds) {
|
||||
lock.lockInterruptibly()
|
||||
notFull.signal()
|
||||
lock.unlock()
|
||||
}
|
||||
|
||||
// `f` should still block since the queue is still empty
|
||||
mustBlockFor(100 milliseconds, f)
|
||||
events should containInSequence(getSize, awaitNotFull, signalNotFull, getSize, awaitNotFull)
|
||||
events shouldNot contain(offer("World"))
|
||||
}
|
||||
}
|
||||
|
||||
"poll" must {
|
||||
"return null when the queue is empty" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.poll() should equal(null)
|
||||
}
|
||||
|
||||
"poll the backing queue" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.poll()
|
||||
events should contain(poll)
|
||||
}
|
||||
|
||||
"return the first element inserted" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
queue.put("Hello")
|
||||
queue.put("World")
|
||||
queue.poll() should equal("Hello")
|
||||
}
|
||||
|
||||
"signal notEmpty when an element is removed" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
queue.poll()
|
||||
events should contain(signalNotEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
"poll with timeout" must {
|
||||
"not block if the queue is not empty" in {
|
||||
val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
backingQueue.offer("Hello")
|
||||
queue.poll(1, TimeUnit.DAYS) should equal("Hello")
|
||||
events should contain(poll)
|
||||
}
|
||||
|
||||
"signal notFull when an element is removed" in {
|
||||
val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
backingQueue.offer("Hello")
|
||||
|
||||
queue.poll(1, TimeUnit.DAYS)
|
||||
events should contain(signalNotFull)
|
||||
}
|
||||
|
||||
"block for at least the timeout if the queue is empty" in {
|
||||
val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1)
|
||||
notEmpty.manualTimeControl(true)
|
||||
|
||||
val f = Future(queue.poll(100, TimeUnit.MILLISECONDS))
|
||||
|
||||
after(10 milliseconds) {
|
||||
notEmpty.advanceTime(99 milliseconds)
|
||||
}
|
||||
mustBlockFor(100 milliseconds, f)
|
||||
events should contain(awaitNotEmpty)
|
||||
}
|
||||
|
||||
"immediately poll the backing queue" in {
|
||||
val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1)
|
||||
|
||||
queue.poll(100, TimeUnit.MILLISECONDS)
|
||||
events should contain(poll)
|
||||
}
|
||||
|
||||
"return null if the timeout is exceeded" in {
|
||||
val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1)
|
||||
|
||||
queue.poll(100, TimeUnit.MILLISECONDS) should equal(null)
|
||||
}
|
||||
|
||||
"block for less than the timeout when the queue becomes non-empty" in {
|
||||
val TestContext(queue, events, notEmpty, _, _, _) = newBoundedBlockingQueue(1)
|
||||
|
||||
notEmpty.manualTimeControl(true)
|
||||
|
||||
val f = Future(queue.poll(100, TimeUnit.MILLISECONDS))
|
||||
|
||||
notEmpty.advanceTime(99 milliseconds)
|
||||
after(50 milliseconds) {
|
||||
queue.put("Hello")
|
||||
}
|
||||
Await.result(f, 100 milliseconds) should equal("Hello")
|
||||
events should contain inOrder (awaitNotEmpty, signalNotEmpty, poll)
|
||||
}
|
||||
}
|
||||
|
||||
"remove" must {
|
||||
"return false when the queue is empty" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.remove("Hello") should equal(false)
|
||||
}
|
||||
|
||||
"return false when the queue does not contain the element" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("H3llo")
|
||||
queue.remove("Hello") should equal(false)
|
||||
}
|
||||
|
||||
"return true if the element was removed" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
queue.put("Hello")
|
||||
queue.remove("Hello") should equal(true)
|
||||
}
|
||||
|
||||
"signal notFull when an element is removed" in {
|
||||
val TestContext(queue, events, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.put("Hello")
|
||||
queue.remove("Hello")
|
||||
events should contain(signalNotFull)
|
||||
}
|
||||
}
|
||||
|
||||
"contains" must {
|
||||
"return false when the queue is empty" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.contains("Hello") should equal(false)
|
||||
}
|
||||
|
||||
"return false when the queue does not contain the element" in {
|
||||
val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
backingQueue.offer("H3llo")
|
||||
queue.contains("Hello") should equal(false)
|
||||
}
|
||||
|
||||
"return true if the backing queue contains the element" in {
|
||||
val TestContext(queue, events, _, _, _, backingQueue) = newBoundedBlockingQueue(2)
|
||||
backingQueue.offer("Hello")
|
||||
queue.contains("Hello") should equal(true)
|
||||
}
|
||||
}
|
||||
|
||||
"clear" must {
|
||||
"remove all elements from the backing queue" in {
|
||||
val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(2)
|
||||
queue.put("Hello")
|
||||
queue.clear()
|
||||
backingQueue.size() should equal(0)
|
||||
}
|
||||
}
|
||||
|
||||
"peek" must {
|
||||
"return the first element in the queue" in {
|
||||
val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(2)
|
||||
backingQueue.offer("Hello")
|
||||
queue.peek() should equal("Hello")
|
||||
}
|
||||
|
||||
"not remove from the queue" in {
|
||||
val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
backingQueue.offer("Hello")
|
||||
queue.peek()
|
||||
backingQueue.contains("Hello") should equal(true)
|
||||
}
|
||||
}
|
||||
|
||||
"drainTo" must {
|
||||
"not accept a null argument" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
intercept[NullPointerException] {
|
||||
queue.drainTo(null)
|
||||
}
|
||||
}
|
||||
|
||||
"not accept the source queue as argument" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
intercept[IllegalArgumentException] {
|
||||
queue.drainTo(queue)
|
||||
}
|
||||
}
|
||||
|
||||
"not accept the backing queue as argument" in {
|
||||
val TestContext(queue, _, _, _, _, backingQueue) = newBoundedBlockingQueue(1)
|
||||
intercept[IllegalArgumentException] {
|
||||
queue.drainTo(backingQueue)
|
||||
}
|
||||
}
|
||||
|
||||
"remove all elements from the queue" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
queue.put("Hello")
|
||||
queue.put("World")
|
||||
queue.drainTo(new util.LinkedList[String]())
|
||||
queue.remainingCapacity() should equal(2)
|
||||
}
|
||||
|
||||
"add all elements to the target collection" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
val elems = List("Hello", "World")
|
||||
val target = mutable.Buffer[String]()
|
||||
elems.foreach(queue.put)
|
||||
queue.drainTo(target.asJava)
|
||||
elems should contain theSameElementsAs (target)
|
||||
}
|
||||
}
|
||||
|
||||
"containsAll" must {
|
||||
"return false if the queue contains a subset" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
val elems = List("Hello", "World")
|
||||
elems.foreach(queue.put)
|
||||
queue.containsAll(("Akka" :: elems).asJava) should equal(false)
|
||||
}
|
||||
|
||||
"returns true if the queue contains exactly all elements" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
val elems = List("Hello", "World")
|
||||
elems.foreach(queue.put)
|
||||
queue.containsAll(elems.asJava) should equal(true)
|
||||
}
|
||||
|
||||
"returns true if the queue contains more elements" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(3)
|
||||
val elems = List("Hello", "World")
|
||||
elems.foreach(queue.put)
|
||||
queue.put("Akka")
|
||||
queue.containsAll(elems.asJava) should equal(true)
|
||||
}
|
||||
}
|
||||
|
||||
"removeAll" must {
|
||||
"remove all elements from the argument collection" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(3)
|
||||
val elems = List("Hello", "World")
|
||||
elems.foreach(queue.put)
|
||||
queue.put("Akka")
|
||||
|
||||
queue.removeAll(elems.asJava) should equal(true)
|
||||
queue.remainingCapacity() should equal(2)
|
||||
queue.poll() should equal("Akka")
|
||||
}
|
||||
|
||||
"return false if no elements were removed" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
val elems = List("Hello", "World")
|
||||
queue.put("Akka")
|
||||
|
||||
queue.removeAll(elems.asJava) should equal(false)
|
||||
queue.poll() should equal("Akka")
|
||||
}
|
||||
}
|
||||
|
||||
"retainAll" must {
|
||||
"remove all elements not in the argument collection" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(3)
|
||||
val elems = List("Hello", "World")
|
||||
elems.foreach(queue.put)
|
||||
queue.put("Akka")
|
||||
|
||||
queue.retainAll(elems.asJava) should equal(true)
|
||||
queue.remainingCapacity() should equal(1)
|
||||
queue.toArray() shouldNot contain("Akka")
|
||||
queue.toArray() should contain theSameElementsAs (elems)
|
||||
}
|
||||
|
||||
"return false if no elements were removed" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
val elems = List("Hello", "World", "Akka")
|
||||
queue.put("Hello")
|
||||
queue.put("World")
|
||||
|
||||
queue.retainAll(elems.asJava) should equal(false)
|
||||
queue.toArray() should contain allOf ("Hello", "World")
|
||||
}
|
||||
}
|
||||
|
||||
"iterator" must {
|
||||
"not any elements if the queue is empty" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(1)
|
||||
queue.iterator().hasNext() should equal(false)
|
||||
}
|
||||
|
||||
"disallow remove() before calling next()" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
queue.put("Hello")
|
||||
queue.put("Akka")
|
||||
|
||||
val iter = queue.iterator()
|
||||
intercept[IllegalStateException] {
|
||||
iter.remove()
|
||||
}
|
||||
}
|
||||
|
||||
"disallow two subsequent remove()s" in {
|
||||
val TestContext(queue, _, _, _, _, _) = newBoundedBlockingQueue(2)
|
||||
queue.put("Hello")
|
||||
queue.put("Akka")
|
||||
|
||||
val iter = queue.iterator()
|
||||
iter.next()
|
||||
iter.remove()
|
||||
intercept[IllegalStateException] {
|
||||
iter.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom trait for matching sequences.
|
||||
*
|
||||
* The following will succeed:
|
||||
* `List(0,1,2,3,1) should containInSequence(1,2,3)`
|
||||
*
|
||||
* Note that this fails with `... should contain inOrder ...`
|
||||
*/
|
||||
trait CustomContainsMatcher {
|
||||
|
||||
class ContainsInSequenceMatcher[A](right: List[A]) extends Matcher[Seq[A]] {
|
||||
|
||||
def apply(left: Seq[A]): MatchResult = {
|
||||
|
||||
def attemptMatch(remainingTruth: List[A], remainingSequence: List[A]): MatchResult =
|
||||
(remainingTruth, remainingSequence) match {
|
||||
case (_, Nil) ⇒ matchResult(true)
|
||||
case (Nil, _) ⇒ matchResult(false)
|
||||
case (x :: xs, y :: ys) if x.equals(y) ⇒ attemptMatch(xs, ys)
|
||||
case (x :: xs, ys) ⇒ attemptMatch(xs, ys)
|
||||
}
|
||||
|
||||
def matchResult(success: Boolean): MatchResult =
|
||||
MatchResult(
|
||||
success,
|
||||
s"""$left did not contain all of $right in sequence""",
|
||||
s"""$left contains all of $right in sequence"""
|
||||
)
|
||||
|
||||
attemptMatch(left.toList, right)
|
||||
}
|
||||
}
|
||||
def containInSequence(one: Any, two: Any, rest: Any*) = new ContainsInSequenceMatcher[Any](one :: two :: rest.toList)
|
||||
}
|
||||
|
||||
/**
|
||||
* Some helper methods for dealing with things that should be blocking.
|
||||
* Whether or not a call blocks is impossible to determine precisely,
|
||||
* therefore we resort to checking that a call does not return within a set time.
|
||||
*/
|
||||
trait BlockingHelpers {
|
||||
import org.scalatest.concurrent.TimeLimits._
|
||||
import org.scalatest.Assertions._
|
||||
|
||||
implicit val timeoutSignaler: Signaler = ThreadSignaler
|
||||
|
||||
/**
|
||||
* Delay execution of some code until the timespan has passed.
|
||||
* The action is wrapped in a Future that initially calls `Thread.sleep()`
|
||||
*/
|
||||
def after(timespan: Span)(action: ⇒ Unit): Future[Unit] = Future {
|
||||
LockSupport.parkNanos(timespan.toNanos)
|
||||
action
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that a call does not return within a set timespan.
|
||||
*/
|
||||
def mustBlockFor(timeout: Span)(action: ⇒ Unit)(implicit pos: Position): Unit =
|
||||
Exception.ignoring(classOf[TestFailedDueToTimeoutException]) {
|
||||
failAfter(timeout)(action)
|
||||
fail("Expected action to block for at least " + timeout.prettyString + " but it completed.")
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that a Future does not complete within a set timespan.
|
||||
*/
|
||||
def mustBlockFor(timeout: Span, action: Future[_])(implicit pos: Position): Unit =
|
||||
Exception.ignoring(classOf[TimeoutException]) {
|
||||
Await.ready(action, timeout)
|
||||
fail("Expected action to block for at least " + timeout.prettyString + " but it completed.")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* All events that can be recorded and asserted during a test.
|
||||
*/
|
||||
object QueueTestEvents {
|
||||
sealed abstract class QueueEvent
|
||||
case class Poll() extends QueueEvent
|
||||
case class Offer(value: String) extends QueueEvent
|
||||
case class GetSize() extends QueueEvent
|
||||
case class AwaitNotEmpty() extends QueueEvent
|
||||
case class AwaitNotFull() extends QueueEvent
|
||||
case class SignalNotEmpty() extends QueueEvent
|
||||
case class SignalNotFull() extends QueueEvent
|
||||
|
||||
val poll = Poll()
|
||||
val getSize = GetSize()
|
||||
def offer(value: String) = Offer(value)
|
||||
val signalNotEmpty = SignalNotEmpty()
|
||||
val signalNotFull = SignalNotFull()
|
||||
val awaitNotEmpty = AwaitNotEmpty()
|
||||
val awaitNotFull = AwaitNotFull()
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper for setting up a queue under test with injected lock, conditions and backing queue.
|
||||
*/
|
||||
trait QueueSetupHelper {
|
||||
import akka.util.QueueTestEvents._
|
||||
import java.util.Date
|
||||
|
||||
case class TestContext(queue: BoundedBlockingQueue[String], events: mutable.MutableList[QueueEvent], notEmpty: TestCondition, notFull: TestCondition, lock: ReentrantLock, backingQueue: util.Queue[String])
|
||||
|
||||
/**
|
||||
* Backing queue that records all poll and offer calls in `events`
|
||||
*/
|
||||
class TestBackingQueue(events: mutable.MutableList[QueueEvent])
|
||||
extends util.LinkedList[String] {
|
||||
|
||||
override def poll(): String = {
|
||||
events += Poll()
|
||||
super.poll()
|
||||
}
|
||||
|
||||
override def offer(e: String): Boolean = {
|
||||
events += Offer(e)
|
||||
super.offer(e)
|
||||
}
|
||||
|
||||
override def size(): Int = {
|
||||
events += GetSize()
|
||||
super.size()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reentrant lock condition that records when the condition is signaled or `await`ed.
|
||||
*/
|
||||
class TestCondition(events: mutable.MutableList[QueueEvent], condition: Condition, signalEvent: QueueEvent, awaitEvent: QueueEvent)
|
||||
extends Condition {
|
||||
|
||||
private var waitTime: Long = 0
|
||||
private var manual = false
|
||||
|
||||
private var waitingThread: Option[Thread] = None
|
||||
|
||||
def advanceTime(timespan: Span): Unit = {
|
||||
waitTime -= timespan.toNanos
|
||||
if (waitTime <= 0 && waitingThread.isDefined) {
|
||||
waitingThread.get.interrupt()
|
||||
waitingThread = None
|
||||
}
|
||||
}
|
||||
|
||||
def manualTimeControl(on: Boolean): Unit =
|
||||
manual = on
|
||||
|
||||
override def signalAll(): Unit = condition.signalAll()
|
||||
|
||||
override def awaitNanos(nanosTimeout: Long): Long = {
|
||||
if (!manual) {
|
||||
events += awaitEvent
|
||||
condition.awaitNanos(nanosTimeout)
|
||||
} else {
|
||||
waitTime = nanosTimeout
|
||||
waitingThread = Some(Thread.currentThread())
|
||||
|
||||
try {
|
||||
this.await()
|
||||
} catch {
|
||||
case e: InterruptedException ⇒
|
||||
}
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
override def awaitUninterruptibly(): Unit = condition.awaitUninterruptibly()
|
||||
|
||||
override def await(): Unit = {
|
||||
events += awaitEvent
|
||||
condition.await()
|
||||
}
|
||||
|
||||
override def await(time: Long, unit: TimeUnit): Boolean = condition.await(time, unit)
|
||||
|
||||
override def signal(): Unit = {
|
||||
events += signalEvent
|
||||
condition.signal()
|
||||
}
|
||||
|
||||
override def awaitUntil(deadline: Date): Boolean = condition.awaitUntil(deadline)
|
||||
}
|
||||
|
||||
def newBoundedBlockingQueue(maxCapacity: Int): TestContext = {
|
||||
val events: mutable.MutableList[QueueEvent] = new mutable.MutableList()
|
||||
|
||||
val realLock = new ReentrantLock(false)
|
||||
val wrappedNotEmpty = new TestCondition(events, realLock.newCondition(), signalNotEmpty, awaitNotEmpty)
|
||||
val wrappedNotFull = new TestCondition(events, realLock.newCondition(), signalNotFull, awaitNotFull)
|
||||
|
||||
val backingQueue = new TestBackingQueue(events)
|
||||
|
||||
/**
|
||||
* Class under test with the necessary backing queue, lock and conditions injected.
|
||||
*/
|
||||
class TestBoundedBlockingQueue()
|
||||
extends BoundedBlockingQueue[String](maxCapacity, backingQueue) {
|
||||
|
||||
override def createLock(): ReentrantLock = realLock
|
||||
override def createNotEmptyCondition(): Condition = wrappedNotEmpty
|
||||
override def createNotFullCondition(): Condition = wrappedNotFull
|
||||
}
|
||||
|
||||
TestContext(new TestBoundedBlockingQueue(), events,
|
||||
wrappedNotEmpty, wrappedNotFull, realLock,
|
||||
backingQueue)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule every Future on a new thread.
|
||||
* Ensures that we don't accidentally run things on one thread that should be concurrent.
|
||||
*/
|
||||
object DefaultExecutionContext {
|
||||
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(new Executor {
|
||||
override def execute(command: Runnable): Unit = new Thread() {
|
||||
override def run(): Unit = command.run()
|
||||
}.start()
|
||||
})
|
||||
}
|
||||
|
|
@ -4,9 +4,10 @@
|
|||
|
||||
package akka.util
|
||||
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.locks.{ Condition, ReentrantLock }
|
||||
import java.util.concurrent.{ TimeUnit, BlockingQueue }
|
||||
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
|
||||
|
||||
import annotation.tailrec
|
||||
|
||||
/**
|
||||
|
|
@ -28,14 +29,18 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
require(maxCapacity > 0)
|
||||
}
|
||||
|
||||
protected val lock = new ReentrantLock(false)
|
||||
protected val lock = createLock()
|
||||
protected val notEmpty = createNotEmptyCondition()
|
||||
protected val notFull = createNotFullCondition()
|
||||
|
||||
private val notEmpty = lock.newCondition()
|
||||
private val notFull = lock.newCondition()
|
||||
protected def createLock(): ReentrantLock = new ReentrantLock(false)
|
||||
protected def createNotEmptyCondition(): Condition = lock.newCondition()
|
||||
protected def createNotFullCondition(): Condition = lock.newCondition()
|
||||
|
||||
def put(e: E) { //Blocks until not full
|
||||
if (e eq null) throw new NullPointerException
|
||||
lock.lockInterruptibly()
|
||||
|
||||
try {
|
||||
@tailrec def putElement() {
|
||||
if (backing.size() < maxCapacity) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue