From 8e152fe745725a2db72efb4fd94e92f634e0dceb Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Fri, 24 Jul 2015 12:14:43 -0400 Subject: [PATCH] =str #18074 UnzipWith#sad case scenario timing issue --- .../stream/scaladsl/GraphUnzipWithSpec.scala | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala index fab8fe5c7c..6f9f0094af 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala @@ -138,6 +138,9 @@ class GraphUnzipWithSpec extends AkkaSpec { } "work in the sad case" in { + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + val leftProbe = TestSubscriber.manualProbe[LeftOutput]() val rightProbe = TestSubscriber.manualProbe[RightOutput]() @@ -146,21 +149,28 @@ class GraphUnzipWithSpec extends AkkaSpec { Source(-2 to 2) ~> unzip.in - unzip.out0 ~> Flow[LeftOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(leftProbe) - unzip.out1 ~> Flow[RightOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(rightProbe) + unzip.out0 ~> Sink(leftProbe) + unzip.out1 ~> Sink(rightProbe) }.run() val leftSubscription = leftProbe.expectSubscription() val rightSubscription = rightProbe.expectSubscription() - leftSubscription.request(2) - leftProbe.expectNext(1 / -2) - leftProbe.expectNext(1 / -1) + def requestFromBoth(): Unit = { + leftSubscription.request(1) + rightSubscription.request(1) + } - rightSubscription.request(1) + requestFromBoth() + leftProbe.expectNext(1 / -2) rightProbe.expectNext("1/-2") - leftSubscription.request(2) + requestFromBoth() + leftProbe.expectNext(1 / -1) + rightProbe.expectNext("1/-1") + + requestFromBoth() + leftProbe.expectError() match { case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") }