Re-enable javadsl Flow 'via' test #18863
This commit is contained in:
parent
80c7769767
commit
78b5ed8498
2 changed files with 60 additions and 8 deletions
|
|
@ -12,11 +12,11 @@ import akka.japi.function.*;
|
|||
import akka.japi.pf.PFBuilder;
|
||||
import akka.stream.*;
|
||||
import akka.stream.impl.ConstantFun;
|
||||
import akka.stream.stage.*;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
|
@ -84,6 +84,63 @@ public class SourceTest extends StreamTest {
|
|||
probe.expectMsgEquals("Done");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseVia() {
|
||||
final TestKit probe = new TestKit(system);
|
||||
final Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
|
||||
// duplicate each element, stop after 4 elements, and emit sum to the end
|
||||
Source.from(input).via(new GraphStage<FlowShape<Integer, Integer>>() {
|
||||
public final Inlet<Integer> in = Inlet.create("in");
|
||||
public final Outlet<Integer> out = Outlet.create("out");
|
||||
|
||||
@Override
|
||||
public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception {
|
||||
return new GraphStageLogic(shape()) {
|
||||
int sum = 0;
|
||||
int count = 0;
|
||||
|
||||
{
|
||||
setHandler(in, new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() {
|
||||
final Integer element = grab(in);
|
||||
sum += element;
|
||||
count += 1;
|
||||
if (count == 4) {
|
||||
emitMultiple(out, Arrays.asList(element, element, sum).iterator(), () -> completeStage());
|
||||
} else {
|
||||
emitMultiple(out, Arrays.asList(element, element).iterator());
|
||||
}
|
||||
}
|
||||
});
|
||||
setHandler(out, new AbstractOutHandler() {
|
||||
@Override
|
||||
public void onPull() throws Exception {
|
||||
pull(in);
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowShape<Integer, Integer> shape() {
|
||||
return FlowShape.of(in, out);
|
||||
}
|
||||
}).runForeach((Procedure<Integer>) elem ->
|
||||
probe.getRef().tell(elem, ActorRef.noSender()), materializer);
|
||||
|
||||
probe.expectMsgEquals(0);
|
||||
probe.expectMsgEquals(0);
|
||||
probe.expectMsgEquals(1);
|
||||
probe.expectMsgEquals(1);
|
||||
probe.expectMsgEquals(2);
|
||||
probe.expectMsgEquals(2);
|
||||
probe.expectMsgEquals(3);
|
||||
probe.expectMsgEquals(3);
|
||||
probe.expectMsgEquals(6);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void mustBeAbleToUseGroupBy() throws Exception {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue