diff --git a/akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala b/akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala index b8439b10b6..ea4acd162e 100644 --- a/akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/BoundedBlockingQueueSpec.scala @@ -5,23 +5,22 @@ package akka.util import java.util -import java.util.concurrent.locks.{ Condition, LockSupport, ReentrantLock } import java.util.concurrent._ +import java.util.concurrent.locks.{ Condition, LockSupport, ReentrantLock } -import org.scalatest.concurrent.{ Signaler, ThreadSignaler } -import org.scalatest.{ Matchers, WordSpec } -import org.scalatest.time.SpanSugar._ -import org.scalatest.time.{ Span, SpanSugar } -import org.scalatest.exceptions.TestFailedDueToTimeoutException +import akka.util.DefaultExecutionContext._ import org.scalactic.source.Position +import org.scalatest.concurrent.{ Signaler, ThreadSignaler } +import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.matchers.{ MatchResult, Matcher } +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar._ +import org.scalatest.{ Matchers, WordSpec } +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future } import scala.util.control.Exception -import DefaultExecutionContext._ - -import collection.JavaConverters._ class BoundedBlockingQueueSpec extends WordSpec @@ -267,6 +266,7 @@ class BoundedBlockingQueueSpec } "block for less than the timeout when the queue becomes not full" in { + val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1) queue.put("Hello") @@ -642,8 +642,8 @@ trait CustomContainsMatcher { * therefore we resort to checking that a call does not return within a set time. */ trait BlockingHelpers { - import org.scalatest.concurrent.TimeLimits._ import org.scalatest.Assertions._ + import org.scalatest.concurrent.TimeLimits._ implicit val timeoutSignaler: Signaler = ThreadSignaler @@ -702,9 +702,10 @@ object QueueTestEvents { * Helper for setting up a queue under test with injected lock, conditions and backing queue. */ trait QueueSetupHelper { - import akka.util.QueueTestEvents._ import java.util.Date + import akka.util.QueueTestEvents._ + case class TestContext(queue: BoundedBlockingQueue[String], events: mutable.MutableList[QueueEvent], notEmpty: TestCondition, notFull: TestCondition, lock: ReentrantLock, backingQueue: util.Queue[String]) /** @@ -735,31 +736,40 @@ trait QueueSetupHelper { class TestCondition(events: mutable.MutableList[QueueEvent], condition: Condition, signalEvent: QueueEvent, awaitEvent: QueueEvent) extends Condition { - private var waitTime: Long = 0 - private var manual = false + case class Manual(waitTime: Long = 0, waitingThread: Option[Thread] = None) - private var waitingThread: Option[Thread] = None + @volatile private var waiting: Option[Manual] = None def advanceTime(timespan: Span): Unit = { - waitTime -= timespan.toNanos - if (waitTime <= 0 && waitingThread.isDefined) { - waitingThread.get.interrupt() - waitingThread = None + waiting match { + case Some(manual) ⇒ + val newWaitTime = manual.waitTime - timespan.toNanos + waiting = + if (newWaitTime <= 0 && manual.waitingThread.isDefined) { + manual.waitingThread.get.interrupt() + Some(Manual(newWaitTime, None)) + } else { + Some(manual.copy(waitTime = newWaitTime)) + } + + case None ⇒ + sys.error("Called advance time but hasn't enabled manualTimeControl") } } def manualTimeControl(on: Boolean): Unit = - manual = on + waiting = Some(Manual()) override def signalAll(): Unit = condition.signalAll() override def awaitNanos(nanosTimeout: Long): Long = { - if (!manual) { + if (waiting.isEmpty) { events += awaitEvent condition.awaitNanos(nanosTimeout) } else { - waitTime = nanosTimeout - waitingThread = Some(Thread.currentThread()) + val waitTime = nanosTimeout + val waitingThread = Some(Thread.currentThread()) + waiting = Some(Manual(waitTime, waitingThread)) try { this.await()