From 7d6fb67b122af0e1bc8236660e2f646ad7287170 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 23 Jul 2015 22:01:05 -0400 Subject: [PATCH] =str #18002 FlowTest#mustBeAbleToRecover timing issue --- .../test/java/akka/stream/javadsl/SourceTest.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index e7cb7e6e56..8146260114 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -15,6 +15,7 @@ import akka.stream.StreamTest; import akka.stream.stage.*; import akka.japi.function.*; import akka.stream.testkit.AkkaSpec; +import akka.stream.testkit.TestPublisher; import akka.testkit.JavaTestKit; import org.junit.ClassRule; @@ -26,9 +27,10 @@ import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; import scala.util.Try; import java.util.*; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; +import static akka.stream.testkit.TestPublisher.ManualProbe; @SuppressWarnings("serial") public class SourceTest extends StreamTest { @@ -524,11 +526,13 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToRecover() throws Exception { + final ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); final JavaTestKit probe = new JavaTestKit(system); - final Source source = Source.from(Arrays.asList(0, 1, 2, 3)).map( + + final Source source = Source.from(publisherProbe).map( new Function() { public Integer apply(Integer elem) { - if (elem == 2) throw new RuntimeException("ex"); + if (elem == 1) throw new RuntimeException("ex"); else return elem; } }) @@ -545,9 +549,12 @@ public class SourceTest extends StreamTest { } }), materializer); + final PublisherProbeSubscription s = publisherProbe.expectSubscription(); + s.sendNext(0); probe.expectMsgEquals(0); - probe.expectMsgEquals(1); + s.sendNext(1); probe.expectMsgEquals(0); + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); }