Concurrency fix for BoundedBlockingQueueSpec #24991

This commit is contained in:
Johan Andrén 2018-07-03 13:19:04 +02:00 committed by GitHub
parent 6e5efccbd6
commit 29cf96b90c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -5,23 +5,22 @@
package akka.util package akka.util
import java.util import java.util
import java.util.concurrent.locks.{ Condition, LockSupport, ReentrantLock }
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.locks.{ Condition, LockSupport, ReentrantLock }
import org.scalatest.concurrent.{ Signaler, ThreadSignaler } import akka.util.DefaultExecutionContext._
import org.scalatest.{ Matchers, WordSpec }
import org.scalatest.time.SpanSugar._
import org.scalatest.time.{ Span, SpanSugar }
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalactic.source.Position import org.scalactic.source.Position
import org.scalatest.concurrent.{ Signaler, ThreadSignaler }
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.matchers.{ MatchResult, Matcher } import org.scalatest.matchers.{ MatchResult, Matcher }
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future } import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future }
import scala.util.control.Exception import scala.util.control.Exception
import DefaultExecutionContext._
import collection.JavaConverters._
class BoundedBlockingQueueSpec class BoundedBlockingQueueSpec
extends WordSpec extends WordSpec
@ -267,6 +266,7 @@ class BoundedBlockingQueueSpec
} }
"block for less than the timeout when the queue becomes not full" in { "block for less than the timeout when the queue becomes not full" in {
val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1) val TestContext(queue, events, _, notFull, _, _) = newBoundedBlockingQueue(1)
queue.put("Hello") queue.put("Hello")
@ -642,8 +642,8 @@ trait CustomContainsMatcher {
* therefore we resort to checking that a call does not return within a set time. * therefore we resort to checking that a call does not return within a set time.
*/ */
trait BlockingHelpers { trait BlockingHelpers {
import org.scalatest.concurrent.TimeLimits._
import org.scalatest.Assertions._ import org.scalatest.Assertions._
import org.scalatest.concurrent.TimeLimits._
implicit val timeoutSignaler: Signaler = ThreadSignaler implicit val timeoutSignaler: Signaler = ThreadSignaler
@ -702,9 +702,10 @@ object QueueTestEvents {
* Helper for setting up a queue under test with injected lock, conditions and backing queue. * Helper for setting up a queue under test with injected lock, conditions and backing queue.
*/ */
trait QueueSetupHelper { trait QueueSetupHelper {
import akka.util.QueueTestEvents._
import java.util.Date import java.util.Date
import akka.util.QueueTestEvents._
case class TestContext(queue: BoundedBlockingQueue[String], events: mutable.MutableList[QueueEvent], notEmpty: TestCondition, notFull: TestCondition, lock: ReentrantLock, backingQueue: util.Queue[String]) case class TestContext(queue: BoundedBlockingQueue[String], events: mutable.MutableList[QueueEvent], notEmpty: TestCondition, notFull: TestCondition, lock: ReentrantLock, backingQueue: util.Queue[String])
/** /**
@ -735,31 +736,40 @@ trait QueueSetupHelper {
class TestCondition(events: mutable.MutableList[QueueEvent], condition: Condition, signalEvent: QueueEvent, awaitEvent: QueueEvent) class TestCondition(events: mutable.MutableList[QueueEvent], condition: Condition, signalEvent: QueueEvent, awaitEvent: QueueEvent)
extends Condition { extends Condition {
private var waitTime: Long = 0 case class Manual(waitTime: Long = 0, waitingThread: Option[Thread] = None)
private var manual = false
private var waitingThread: Option[Thread] = None @volatile private var waiting: Option[Manual] = None
def advanceTime(timespan: Span): Unit = { def advanceTime(timespan: Span): Unit = {
waitTime -= timespan.toNanos waiting match {
if (waitTime <= 0 && waitingThread.isDefined) { case Some(manual)
waitingThread.get.interrupt() val newWaitTime = manual.waitTime - timespan.toNanos
waitingThread = None waiting =
if (newWaitTime <= 0 && manual.waitingThread.isDefined) {
manual.waitingThread.get.interrupt()
Some(Manual(newWaitTime, None))
} else {
Some(manual.copy(waitTime = newWaitTime))
}
case None
sys.error("Called advance time but hasn't enabled manualTimeControl")
} }
} }
def manualTimeControl(on: Boolean): Unit = def manualTimeControl(on: Boolean): Unit =
manual = on waiting = Some(Manual())
override def signalAll(): Unit = condition.signalAll() override def signalAll(): Unit = condition.signalAll()
override def awaitNanos(nanosTimeout: Long): Long = { override def awaitNanos(nanosTimeout: Long): Long = {
if (!manual) { if (waiting.isEmpty) {
events += awaitEvent events += awaitEvent
condition.awaitNanos(nanosTimeout) condition.awaitNanos(nanosTimeout)
} else { } else {
waitTime = nanosTimeout val waitTime = nanosTimeout
waitingThread = Some(Thread.currentThread()) val waitingThread = Some(Thread.currentThread())
waiting = Some(Manual(waitTime, waitingThread))
try { try {
this.await() this.await()