=str #18002 FlowTest#mustBeAbleToRecover timing issue

This commit is contained in:
Alexander Golubev 2015-08-16 00:04:04 -04:00
parent 343d64050d
commit dec87c2e56

View file

@ -8,27 +8,26 @@ import akka.dispatch.Foreach;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import akka.japi.JavaPartialFunction; import akka.japi.JavaPartialFunction;
import akka.japi.Pair; import akka.japi.Pair;
import akka.stream.Outlet;
import akka.stream.OverflowStrategy;
import akka.stream.StreamTest;
import akka.stream.stage.*;
import akka.stream.javadsl.FlowGraph.Builder;
import akka.japi.function.*; import akka.japi.function.*;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.FlowGraph.Builder;
import akka.stream.stage.*;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.stream.testkit.TestPublisher;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import org.reactivestreams.Publisher;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@SuppressWarnings("serial") @SuppressWarnings("serial")
@ -522,8 +521,10 @@ public class FlowTest extends StreamTest {
@Test @Test
public void mustBeAbleToRecover() throws Exception { public void mustBeAbleToRecover() throws Exception {
final TestPublisher.ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);
final Source<Integer, ?> source = Source.from(Arrays.asList(0, 1, 2, 3));
final Source<Integer, ?> source = Source.from(publisherProbe);
final Flow<Integer, Integer, ?> flow = Flow.of(Integer.class).map( final Flow<Integer, Integer, ?> flow = Flow.of(Integer.class).map(
new Function<Integer, Integer>() { new Function<Integer, Integer>() {
public Integer apply(Integer elem) { public Integer apply(Integer elem) {
@ -544,8 +545,13 @@ public class FlowTest extends StreamTest {
} }
}), materializer); }), materializer);
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
s.sendNext(0);
probe.expectMsgEquals(0); probe.expectMsgEquals(0);
s.sendNext(1);
probe.expectMsgEquals(1); probe.expectMsgEquals(1);
s.sendNext(2);
probe.expectMsgEquals(0); probe.expectMsgEquals(0);
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
} }