From c48ca096d2d88c4a358435b744bef8fd9efccf7c Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 9 Nov 2020 13:04:44 +0100 Subject: [PATCH] tests: shorten probabilitic test in BoundedSourceQueueSpec Fixes #29800 --- .../stream/scaladsl/BoundedSourceQueueSpec.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala index c517ef7590..471c7c2a22 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedSourceQueueSpec.scala @@ -10,10 +10,13 @@ import java.util.concurrent.{ CountDownLatch, ThreadLocalRandom } import akka.stream.{ QueueCompletionResult, QueueOfferResult } import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.testkit.WithLogCapturing import scala.concurrent.duration._ -class BoundedSourceQueueSpec extends StreamSpec { +class BoundedSourceQueueSpec extends StreamSpec("""akka.loglevel = debug + |akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + |""".stripMargin) with WithLogCapturing { override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds) @@ -111,15 +114,15 @@ class BoundedSourceQueueSpec extends StreamSpec { awaitAssert(queue.offer(1) should be(QueueOfferResult.Failure(ex))) } - "without cancellation only flag elements as enqueued that will also passed to downstream" in { + "without cancellation only flag elements as enqueued that will also be passed to downstream" in { val counter = new AtomicLong() val (queue, result) = Source.queue[Int](100000).toMat(Sink.fold(0L)(_ + _))(Keep.both).run() - val numThreads = 32 - val stopProb = 1000000 + val numThreads = Runtime.getRuntime.availableProcessors() * 4 + val stopProb = 10000 // specifies run time of test indirectly val expected = 1d / (1d - math.pow(1d - 1d / stopProb, numThreads)) - println(s"Expected elements per thread: $expected") // variance might be quite high depending on number of threads + log.debug(s"Expected elements per thread: $expected") // variance might be quite high depending on number of threads val barrier = new CountDownLatch(numThreads) class QueueingThread extends Thread { @@ -152,7 +155,8 @@ class BoundedSourceQueueSpec extends StreamSpec { barrier.countDown() barrier.await() // wait for all threads being in this state before starting race runLoop() - println(f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: $numElemsDropped%7d before completion") + log.debug( + f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: $numElemsDropped%7d before completion") } }