=str #18002 FlowTest#mustBeAbleToRecover timing issue

This commit is contained in:
Alexander Golubev 2015-07-23 22:01:05 -04:00
parent dfccdff6fb
commit 7d6fb67b12

View file

@ -15,6 +15,7 @@ import akka.stream.StreamTest;
import akka.stream.stage.*;
import akka.japi.function.*;
import akka.stream.testkit.AkkaSpec;
import akka.stream.testkit.TestPublisher;
import akka.testkit.JavaTestKit;
import org.junit.ClassRule;
@ -26,9 +27,10 @@ import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.util.Try;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
import static akka.stream.testkit.TestPublisher.ManualProbe;
@SuppressWarnings("serial")
public class SourceTest extends StreamTest {
@ -524,11 +526,13 @@ public class SourceTest extends StreamTest {
@Test
public void mustBeAbleToRecover() throws Exception {
final ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);
final JavaTestKit probe = new JavaTestKit(system);
final Source<Integer, ?> source = Source.from(Arrays.asList(0, 1, 2, 3)).map(
final Source<Integer, ?> source = Source.from(publisherProbe).map(
new Function<Integer, Integer>() {
public Integer apply(Integer elem) {
if (elem == 2) throw new RuntimeException("ex");
if (elem == 1) throw new RuntimeException("ex");
else return elem;
}
})
@ -545,9 +549,12 @@ public class SourceTest extends StreamTest {
}
}), materializer);
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
s.sendNext(0);
probe.expectMsgEquals(0);
probe.expectMsgEquals(1);
s.sendNext(1);
probe.expectMsgEquals(0);
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
}