Flatten a Future[Graph[SourceShape[T], M]] as Source[T, Future[M]] #22359

This commit is contained in:
cchantep 2016-09-04 16:11:49 +02:00 committed by Johan Andrén
parent db0a473cd5
commit 5b542d99fa
10 changed files with 339 additions and 10 deletions

View file

@ -29,6 +29,7 @@ import scala.concurrent.duration.FiniteDuration;
import akka.testkit.AkkaJUnitActorSystemResource;
import java.util.*;
import java.util.function.Supplier;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
@ -381,6 +382,55 @@ public class FlowTest extends StreamTest {
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
}
@Test
public void mustBeAbleToUsefromSourceCompletionStage() throws Exception {
final Flow<String, String, NotUsed> f1 =
Flow.of(String.class).via(FlowTest.this.<String> op()).named("f1");
final Flow<String, String, NotUsed> f2 =
Flow.of(String.class).via(FlowTest.this.<String> op()).named("f2");
@SuppressWarnings("unused")
final Flow<String, String, NotUsed> f3 =
Flow.of(String.class).via(FlowTest.this.<String> op()).named("f3");
final Source<String, NotUsed> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, NotUsed> in2 = Source.from(Arrays.asList("d", "e", "f"));
final Sink<String, Publisher<String>> publisher = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT);
final Graph<SourceShape<String>, NotUsed> graph = Source.fromGraph(
GraphDSL.create(new Function<GraphDSL.Builder<NotUsed>, SourceShape<String>>() {
@Override
public SourceShape<String> apply(Builder<NotUsed> b)
throws Exception {
final UniformFanInShape<String, String> merge =
b.add(Merge.<String>create(2));
b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0));
b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1));
return new SourceShape<String>(merge.out());
}
}));
final Supplier<Graph<SourceShape<String>, NotUsed>> fn =
new Supplier<Graph<SourceShape<String>, NotUsed>>() {
public Graph<SourceShape<String>, NotUsed> get() { return graph; }
};
final CompletionStage<Graph<SourceShape<String>, NotUsed>> stage =
CompletableFuture.supplyAsync(fn);
final Source<String, CompletionStage<NotUsed>> source =
Source.fromSourceCompletionStage(stage);
// collecting
final Publisher<String> pub = source.runWith(publisher, materializer);
final CompletionStage<List<String>> all = Source.fromPublisher(pub).limit(100).runWith(Sink.<String>seq(), materializer);
final List<String> result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
}
@Test
public void mustBeAbleToUseZip() {
final JavaTestKit probe = new JavaTestKit(system);