diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index d7bc385357..7bea86b20d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -8,27 +8,26 @@ import akka.dispatch.Foreach; import akka.dispatch.Futures; import akka.japi.JavaPartialFunction; import akka.japi.Pair; -import akka.stream.Outlet; -import akka.stream.OverflowStrategy; -import akka.stream.StreamTest; -import akka.stream.stage.*; -import akka.stream.javadsl.FlowGraph.Builder; import akka.japi.function.*; import akka.stream.*; +import akka.stream.javadsl.FlowGraph.Builder; +import akka.stream.stage.*; import akka.stream.testkit.AkkaSpec; +import akka.stream.testkit.TestPublisher; import akka.testkit.JavaTestKit; - -import org.reactivestreams.Publisher; -import scala.PartialFunction; -import scala.runtime.BoxedUnit; import org.junit.ClassRule; import org.junit.Test; +import org.reactivestreams.Publisher; import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; + import java.util.*; import java.util.concurrent.TimeUnit; + +import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static org.junit.Assert.assertEquals; @SuppressWarnings("serial") @@ -522,8 +521,10 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToRecover() throws Exception { + final TestPublisher.ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); final JavaTestKit probe = new JavaTestKit(system); - final Source source = Source.from(Arrays.asList(0, 1, 2, 3)); + + final Source source = Source.from(publisherProbe); final Flow flow = Flow.of(Integer.class).map( new Function() { public Integer apply(Integer elem) { @@ -544,8 +545,13 @@ public class FlowTest extends StreamTest { } }), materializer); + final PublisherProbeSubscription s = publisherProbe.expectSubscription(); + + s.sendNext(0); probe.expectMsgEquals(0); + s.sendNext(1); probe.expectMsgEquals(1); + s.sendNext(2); probe.expectMsgEquals(0); Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); }