tests: shorten probabilitic test in BoundedSourceQueueSpec

Fixes #29800
This commit is contained in:
Johannes Rudolph 2020-11-09 13:04:44 +01:00
parent 12110822df
commit c48ca096d2
No known key found for this signature in database
GPG key ID: 52AF1C9ABD77E6E5

View file

@ -10,10 +10,13 @@ import java.util.concurrent.{ CountDownLatch, ThreadLocalRandom }
import akka.stream.{ QueueCompletionResult, QueueOfferResult } import akka.stream.{ QueueCompletionResult, QueueOfferResult }
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.testkit.WithLogCapturing
import scala.concurrent.duration._ import scala.concurrent.duration._
class BoundedSourceQueueSpec extends StreamSpec { class BoundedSourceQueueSpec extends StreamSpec("""akka.loglevel = debug
|akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
|""".stripMargin) with WithLogCapturing {
override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds) override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds)
@ -111,15 +114,15 @@ class BoundedSourceQueueSpec extends StreamSpec {
awaitAssert(queue.offer(1) should be(QueueOfferResult.Failure(ex))) awaitAssert(queue.offer(1) should be(QueueOfferResult.Failure(ex)))
} }
"without cancellation only flag elements as enqueued that will also passed to downstream" in { "without cancellation only flag elements as enqueued that will also be passed to downstream" in {
val counter = new AtomicLong() val counter = new AtomicLong()
val (queue, result) = val (queue, result) =
Source.queue[Int](100000).toMat(Sink.fold(0L)(_ + _))(Keep.both).run() Source.queue[Int](100000).toMat(Sink.fold(0L)(_ + _))(Keep.both).run()
val numThreads = 32 val numThreads = Runtime.getRuntime.availableProcessors() * 4
val stopProb = 1000000 val stopProb = 10000 // specifies run time of test indirectly
val expected = 1d / (1d - math.pow(1d - 1d / stopProb, numThreads)) val expected = 1d / (1d - math.pow(1d - 1d / stopProb, numThreads))
println(s"Expected elements per thread: $expected") // variance might be quite high depending on number of threads log.debug(s"Expected elements per thread: $expected") // variance might be quite high depending on number of threads
val barrier = new CountDownLatch(numThreads) val barrier = new CountDownLatch(numThreads)
class QueueingThread extends Thread { class QueueingThread extends Thread {
@ -152,7 +155,8 @@ class BoundedSourceQueueSpec extends StreamSpec {
barrier.countDown() barrier.countDown()
barrier.await() // wait for all threads being in this state before starting race barrier.await() // wait for all threads being in this state before starting race
runLoop() runLoop()
println(f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: $numElemsDropped%7d before completion") log.debug(
f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: $numElemsDropped%7d before completion")
} }
} }