diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java deleted file mode 100644 index 39819ee528..0000000000 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package akka.stream.javadsl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -import akka.NotUsed; -import org.junit.ClassRule; -import org.junit.Test; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; -import akka.japi.Pair; -import akka.stream.*; -import akka.testkit.AkkaSpec; -import akka.stream.javadsl.GraphDSL.Builder; -import akka.japi.function.*; -import akka.util.ByteString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertArrayEquals; -import akka.testkit.AkkaJUnitActorSystemResource; - -public class BidiFlowTest extends StreamTest { - public BidiFlowTest() { - super(actorSystemResource); - } - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource( - "FlowTest", AkkaSpec.testConf()); - - private final BidiFlow bidi = BidiFlow - .fromGraph(GraphDSL.create( - new Function, BidiShape>() { - @Override - public BidiShape apply(Builder b) - throws Exception { - final FlowShape top = b.add(Flow - .of(Integer.class).map(new Function() { - @Override - public Long apply(Integer arg) { - return (long) ((int) arg) + 2; - } - })); - final FlowShape bottom = b.add(Flow - .of(ByteString.class).map(new Function() { - @Override - public String apply(ByteString arg) { - return arg.decodeString("UTF-8"); - } - })); - return new BidiShape(top - .in(), top.out(), bottom.in(), bottom.out()); - } - })); - - private final BidiFlow inverse = BidiFlow - .fromGraph( - GraphDSL.create( - new Function, BidiShape>() { - @Override - public BidiShape apply(Builder b) - throws Exception { - final FlowShape top = b.add(Flow.of(Long.class) - .map(new Function() { - @Override - public Integer apply(Long arg) { - return (int) ((long) arg) + 2; - } - })); - final FlowShape bottom = b.add(Flow - .of(String.class).map(new Function() { - @Override - public ByteString apply(String arg) { - return ByteString.fromString(arg); - } - })); - return new BidiShape(top - .in(), top.out(), bottom.in(), bottom.out()); - } - })); - - private final BidiFlow> bidiMat = - BidiFlow.fromGraph( - GraphDSL.create( - Sink.head(), - (b, sink) -> { - b.from(b.add(Source.single(42))).to(sink); - final FlowShape top = b.add(Flow - .of(Integer.class).map(i -> (long)(i + 2))); - final FlowShape bottom = b.add(Flow - .of(ByteString.class).map(bytes -> bytes.decodeString("UTF-8"))); - return new BidiShape(top - .in(), top.out(), bottom.in(), bottom.out()); - } - )); - - private final String str = "Hello World"; - private final ByteString bytes = ByteString.fromString(str); - private final List list = new ArrayList(); - { - list.add(1); - list.add(2); - list.add(3); - } - private final FiniteDuration oneSec = Duration.create(1, TimeUnit.SECONDS); - - @Test - public void mustWorkInIsolation() throws Exception { - final Pair, CompletionStage> p = - RunnableGraph.fromGraph(GraphDSL - .create(Sink. head(), Sink. head(), - Keep.both(), - (b, st, sb) -> { - final BidiShape s = - b.add(bidi); - b.from(b.add(Source.single(1))).toInlet(s.in1()); - b.from(s.out1()).to(st); - b.from(b.add(Source.single(bytes))).toInlet(s.in2()); - b.from(s.out2()).to(sb); - return ClosedShape.getInstance(); - })).run(materializer); - - final Long rt = p.first().toCompletableFuture().get(1, TimeUnit.SECONDS); - final String rb = p.second().toCompletableFuture().get(1, TimeUnit.SECONDS); - - assertEquals((Long) 3L, rt); - assertEquals(str, rb); - } - - @Test - public void mustWorkAsAFlowThatIsOpenOnTheLeft() throws Exception { - final Flow f = bidi.join(Flow.of(Long.class).map( - new Function() { - @Override public ByteString apply(Long arg) { - return ByteString.fromString("Hello " + arg); - } - })); - - final CompletionStage> result = Source.from(list).via(f).limit(10).runWith(Sink.seq(), materializer); - assertEquals(Arrays.asList("Hello 3", "Hello 4", "Hello 5"), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); - } - - @Test - public void mustWorkAsAFlowThatIsOpenOnTheRight() throws Exception { - final Flow f = Flow.of(String.class).map( - new Function() { - @Override public Integer apply(String arg) { - return Integer.valueOf(arg); - } - }).join(bidi); - final List inputs = Arrays.asList(ByteString.fromString("1"), ByteString.fromString("2")); - final CompletionStage> result = Source.from(inputs).via(f).limit(10).runWith(Sink.seq(), materializer); - assertEquals(Arrays.asList(3L, 4L), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); - } - - @Test - public void mustWorkWhenAtopItsInverse() throws Exception { - final Flow f = bidi.atop(inverse).join(Flow.of(Integer.class).map( - new Function() { - @Override public String apply(Integer arg) { - return arg.toString(); - } - })); - final CompletionStage> result = Source.from(list).via(f).limit(10).runWith(Sink.seq(), materializer); - assertEquals(Arrays.asList("5", "6", "7"), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); - } - - @Test - public void mustWorkWhenReversed() throws Exception { - final Flow f = Flow.of(Integer.class).map( - new Function() { - @Override public String apply(Integer arg) { - return arg.toString(); - } - }).join(inverse.reversed()).join(bidi.reversed()); - final CompletionStage> result = Source.from(list).via(f).limit(10).runWith(Sink.seq(), materializer); - assertEquals(Arrays.asList("5", "6", "7"), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); - } - - @Test - public void mustMaterializeToItsValue() throws Exception { - final CompletionStage f = RunnableGraph.fromGraph( - GraphDSL.create(bidiMat, (b, shape) -> { - final FlowShape left = b.add(Flow.of(String.class).map(Integer::valueOf)); - final FlowShape right = b.add(Flow.of(Long.class).map(s -> ByteString.fromString("Hello " + s))); - b.from(shape.out2()).via(left).toInlet(shape.in1()) - .from(shape.out1()).via(right).toInlet(shape.in2()); - return ClosedShape.getInstance(); - })).run(materializer); - assertEquals((Integer) 42, f.toCompletableFuture().get(1, TimeUnit.SECONDS)); - } - - @Test - public void mustCombineMaterializationValues() throws Exception { - final Flow> left = Flow.fromGraph(GraphDSL.create( - Sink.head(), (b, sink) -> { - final UniformFanOutShape bcast = b.add(Broadcast.create(2)); - final UniformFanInShape merge = b.add(Merge.create(2)); - final FlowShape flow = b.add(Flow.of(String.class).map(Integer::valueOf)); - b.from(bcast).to(sink) - .from(b.add(Source.single(1))).viaFanOut(bcast).toFanIn(merge) - .from(flow).toFanIn(merge); - return new FlowShape(flow.in(), merge.out()); - })); - final Flow>> right = Flow.fromGraph(GraphDSL.create( - Sink.>head(), (b, sink) -> { - final FlowShape> flow = b.add(Flow.of(Long.class).grouped(10)); - b.from(flow).to(sink); - return new FlowShape(flow.in(), b.add(Source.single(ByteString.fromString("10"))).out()); - })); - final Pair, CompletionStage>, CompletionStage>> result = - left.joinMat(bidiMat, Keep.both()).joinMat(right, Keep.both()).run(materializer); - final CompletionStage l = result.first().first(); - final CompletionStage m = result.first().second(); - final CompletionStage> r = result.second(); - assertEquals((Integer) 1, l.toCompletableFuture().get(1, TimeUnit.SECONDS)); - assertEquals((Integer) 42, m.toCompletableFuture().get(1, TimeUnit.SECONDS)); - final Long[] rr = r.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray(new Long[2]); - Arrays.sort(rr); - assertArrayEquals(new Long[] { 3L, 12L }, rr); - } - - public void mustSuitablyOverrideAttributeHandlingMethods() { - @SuppressWarnings("unused") - final BidiFlow b = - bidi.withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); - } -} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java deleted file mode 100644 index 960c0a7fa7..0000000000 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Copyright (C) 2014-2016 Lightbend Inc. - */ -package akka.stream.javadsl; - -import akka.NotUsed; -import akka.japi.Pair; -import akka.pattern.PatternsCS; -import akka.japi.tuple.Tuple4; -import akka.stream.*; -import akka.stream.javadsl.GraphDSL.Builder; -import akka.stream.stage.*; -import akka.japi.function.*; -import akka.testkit.AkkaSpec; -import akka.testkit.JavaTestKit; -import akka.testkit.TestProbe; -import akka.testkit.AkkaJUnitActorSystemResource; - -import org.junit.ClassRule; -import org.junit.Test; -import org.reactivestreams.Publisher; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; - -import java.util.*; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; - -public class GraphDSLTest extends StreamTest { - public GraphDSLTest() { - super(actorSystemResource); - } - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("GraphDSLTest", - AkkaSpec.testConf()); - - @SuppressWarnings("serial") - public Creator> op() { - return new akka.japi.function.Creator>() { - @Override - public PushPullStage create() throws Exception { - return new PushPullStage() { - @Override - public SyncDirective onPush(T element, Context ctx) { - return ctx.push(element); - } - - @Override - public SyncDirective onPull(Context ctx) { - return ctx.pull(); - } - }; - } - }; - } - - @Test - public void mustBeAbleToUseMerge() throws Exception { - final Flow f1 = - Flow.of(String.class).transform(GraphDSLTest.this. op()).named("f1"); - final Flow f2 = - Flow.of(String.class).transform(GraphDSLTest.this. op()).named("f2"); - @SuppressWarnings("unused") - final Flow f3 = - Flow.of(String.class).transform(GraphDSLTest.this. op()).named("f3"); - - final Source in1 = Source.from(Arrays.asList("a", "b", "c")); - final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - - final Sink> publisher = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT); - - final Source source = Source.fromGraph( - GraphDSL.create(new Function, SourceShape>() { - @Override - public SourceShape apply(Builder b) throws Exception { - final UniformFanInShape merge = b.add(Merge.create(2)); - b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0)); - b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1)); - return new SourceShape(merge.out()); - } - })); - - // collecting - final Publisher pub = source.runWith(publisher, materializer); - final CompletionStage> all = Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), materializer); - - final List result = all.toCompletableFuture().get(3, TimeUnit.SECONDS); - assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); - } - - @Test - public void mustBeAbleToUseZip() { - final JavaTestKit probe = new JavaTestKit(system); - final Iterable input1 = Arrays.asList("A", "B", "C"); - final Iterable input2 = Arrays.asList(1, 2, 3); - - RunnableGraph.fromGraph( GraphDSL.create( - new Function,ClosedShape>() { - @Override - public ClosedShape apply(final Builder b) throws Exception { - final Source in1 = Source.from(input1); - final Source in2 = Source.from(input2); - final FanInShape2> zip = b.add(Zip.create()); - final Sink, NotUsed> out = createSink(probe); - - b.from(b.add(in1)).toInlet(zip.in0()); - b.from(b.add(in2)).toInlet(zip.in1()); - b.from(zip.out()).to(b.add(out)); - return ClosedShape.getInstance(); - } - })).run(materializer); - - List output = Arrays.asList(probe.receiveN(3)); - @SuppressWarnings("unchecked") - List> expected = Arrays.asList(new Pair("A", 1), new Pair( - "B", 2), new Pair("C", 3)); - assertEquals(expected, output); - } - - @Test - public void mustBeAbleToUseUnzip() { - final JavaTestKit probe1 = new JavaTestKit(system); - final JavaTestKit probe2 = new JavaTestKit(system); - - @SuppressWarnings("unchecked") - final List> input = Arrays.asList(new Pair("A", 1), - new Pair("B", 2), new Pair("C", 3)); - - final Iterable expected1 = Arrays.asList("A", "B", "C"); - final Iterable expected2 = Arrays.asList(1, 2, 3); - - RunnableGraph.fromGraph(GraphDSL.create( - new Function, ClosedShape>() { - @Override - public ClosedShape apply(final Builder b) throws Exception { - final SourceShape> in = b.add(Source.from(input)); - final FanOutShape2, String, Integer> unzip = b.add(Unzip.create()); - - final SinkShape out1 = b.add(GraphDSLTest.createSink(probe1)); - final SinkShape out2 = b.add(GraphDSLTest.createSink(probe2)); - - b.from(in).toInlet(unzip.in()); - b.from(unzip.out0()).to(out1); - b.from(unzip.out1()).to(out2); - return ClosedShape.getInstance(); - } - })).run(materializer); - - List output1 = Arrays.asList(probe1.receiveN(3)); - List output2 = Arrays.asList(probe2.receiveN(3)); - assertEquals(expected1, output1); - assertEquals(expected2, output2); - } - - private static Sink createSink(final JavaTestKit probe){ - return Sink.actorRef(probe.getRef(), "onComplete"); - } - - @Test - public void mustBeAbleToUseUnzipWith() throws Exception { - final JavaTestKit probe1 = new JavaTestKit(system); - final JavaTestKit probe2 = new JavaTestKit(system); - - RunnableGraph.fromGraph(GraphDSL.create( - new Function, ClosedShape>() { - @Override - public ClosedShape apply(final Builder b) throws Exception { - final Source in = Source.single(1); - - final FanOutShape2 unzip = b.add(UnzipWith.create( - new Function>() { - @Override - public Pair apply(Integer l) throws Exception { - return new Pair(l + "!", l); - } - }) - ); - - final SinkShape out1 = b.add(GraphDSLTest.createSink(probe1)); - final SinkShape out2 = b.add(GraphDSLTest.createSink(probe2)); - - b.from(b.add(in)).toInlet(unzip.in()); - b.from(unzip.out0()).to(out1); - b.from(unzip.out1()).to(out2); - return ClosedShape.getInstance(); - } - } - )).run(materializer); - - Duration d = Duration.create(3, TimeUnit.SECONDS); - - Object output1 = probe1.receiveOne(d); - Object output2 = probe2.receiveOne(d); - - assertEquals("1!", output1); - assertEquals(1, output2); - - } - - @Test - public void mustBeAbleToUseUnzip4With() throws Exception { - final JavaTestKit probe1 = new JavaTestKit(system); - final JavaTestKit probe2 = new JavaTestKit(system); - final JavaTestKit probe3 = new JavaTestKit(system); - final JavaTestKit probe4 = new JavaTestKit(system); - - RunnableGraph.fromGraph(GraphDSL.create( - new Function, ClosedShape>() { - @Override - public ClosedShape apply(final Builder b) throws Exception { - final Source in = Source.single(1); - - final FanOutShape4 unzip = b.add(UnzipWith.create4( - new Function>() { - @Override - public Tuple4 apply(Integer l) throws Exception { - return new Tuple4(l.toString(), l, l + "+" + l, l + l); - } - }) - ); - - final SinkShape out1 = b.add(GraphDSLTest.createSink(probe1)); - final SinkShape out2 = b.add(GraphDSLTest.createSink(probe2)); - final SinkShape out3 = b.add(GraphDSLTest.createSink(probe3)); - final SinkShape out4 = b.add(GraphDSLTest.createSink(probe4)); - - b.from(b.add(in)).toInlet(unzip.in()); - b.from(unzip.out0()).to(out1); - b.from(unzip.out1()).to(out2); - b.from(unzip.out2()).to(out3); - b.from(unzip.out3()).to(out4); - return ClosedShape.getInstance(); - } - })).run(materializer); - - Duration d = Duration.create(3, TimeUnit.SECONDS); - - Object output1 = probe1.receiveOne(d); - Object output2 = probe2.receiveOne(d); - Object output3 = probe3.receiveOne(d); - Object output4 = probe4.receiveOne(d); - - assertEquals("1", output1); - assertEquals(1, output2); - assertEquals("1+1", output3); - assertEquals(2, output4); - } - - @Test - public void mustBeAbleToUseZipWith() throws Exception { - final Source in1 = Source.single(1); - final Source in2 = Source.single(10); - - final Graph, NotUsed> sumZip = ZipWith.create( - new Function2() { - @Override public Integer apply(Integer l, Integer r) throws Exception { - return l + r; - } - }); - - final CompletionStage future = RunnableGraph.fromGraph(GraphDSL.create(Sink.head(), - (b, out) -> { - final FanInShape2 zip = b.add(sumZip); - b.from(b.add(in1)).toInlet(zip.in0()); - b.from(b.add(in2)).toInlet(zip.in1()); - b.from(zip.out()).to(out); - return ClosedShape.getInstance(); - })).run(materializer); - - final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - assertEquals(11, (int) result); - } - - @Test - public void mustBeAbleToUseZipN() throws Exception { - final Source in1 = Source.single(1); - final Source in2 = Source.single(10); - - final Graph>, NotUsed> sumZip = ZipN.create(2); - - final CompletionStage> future = RunnableGraph.fromGraph(GraphDSL.create(Sink.>head(), - (b, out) -> { - final UniformFanInShape> zip = b.add(sumZip); - b.from(b.add(in1)).toInlet(zip.in(0)); - b.from(b.add(in2)).toInlet(zip.in(1)); - b.from(zip.out()).to(out); - return ClosedShape.getInstance(); - })).run(materializer); - - final List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - - assertEquals(Arrays.asList(1, 10), result); - } - - @Test - public void mustBeAbleToUseZipWithN() throws Exception { - final Source in1 = Source.single(1); - final Source in2 = Source.single(10); - - final Graph, NotUsed> sumZip = ZipWithN.create( - new Function, Integer>() { - @Override public Integer apply(List list) throws Exception { - Integer sum = 0; - - for(Integer i : list) { - sum += i; - } - - return sum; - } - }, 2); - - final CompletionStage future = RunnableGraph.fromGraph(GraphDSL.create(Sink.head(), - (b, out) -> { - final UniformFanInShape zip = b.add(sumZip); - b.from(b.add(in1)).toInlet(zip.in(0)); - b.from(b.add(in2)).toInlet(zip.in(1)); - b.from(zip.out()).to(out); - return ClosedShape.getInstance(); - })).run(materializer); - - final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - assertEquals(11, (int) result); - } - - @Test - public void mustBeAbleToUseZip4With() throws Exception { - final Source in1 = Source.single(1); - final Source in2 = Source.single(10); - final Source in3 = Source.single(100); - final Source in4 = Source.single(1000); - - final Graph, NotUsed> sumZip = ZipWith.create4( - new Function4() { - @Override public Integer apply(Integer i1, Integer i2, Integer i3, Integer i4) throws Exception { - return i1 + i2 + i3 + i4; - } - }); - - final CompletionStage future = RunnableGraph.fromGraph( - GraphDSL.create(Sink.head(), (b, out) -> { - final FanInShape4 zip = b.add(sumZip); - b.from(b.add(in1)).toInlet(zip.in0()); - b.from(b.add(in2)).toInlet(zip.in1()); - b.from(b.add(in3)).toInlet(zip.in2()); - b.from(b.add(in4)).toInlet(zip.in3()); - b.from(zip.out()).to(out); - return ClosedShape.getInstance(); - })).run(materializer); - - final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - assertEquals(1111, (int) result); - } - - @Test - public void mustBeAbleToUseMatValue() throws Exception { - @SuppressWarnings("unused") - final Source in1 = Source.single(1); - final TestProbe probe = TestProbe.apply(system); - - final CompletionStage future = RunnableGraph.fromGraph( - GraphDSL.create(Sink. head(), (b, out) -> { - b.from(b.add(Source.single(1))).to(out); - b.from(b.materializedValue()).to(b.add(Sink.foreach(mat -> PatternsCS.pipe(mat, system.dispatcher()).to(probe.ref())))); - return ClosedShape.getInstance(); - })).run(materializer); - - final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - assertEquals(1, (int) result); - - probe.expectMsg(1); - } - -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala new file mode 100644 index 0000000000..411c692257 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala @@ -0,0 +1,271 @@ +/** + * Copyright (C) 2014-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import scala.util.control.NoStackTrace + +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + +import akka.NotUsed +import akka.stream.ActorMaterializer +import akka.stream.ActorAttributes.supervisionStrategy +import akka.stream.Supervision.{ restartingDecider, resumingDecider } +import akka.stream.impl.ReactiveStreamsCompliance + +import akka.testkit.{ AkkaSpec, TestLatch } +import akka.stream.testkit._, Utils._ + +import org.scalatest.concurrent.PatienceConfiguration.Timeout + +class FlowFoldAsyncSpec extends StreamSpec { + implicit val materializer = ActorMaterializer() + implicit def ec = materializer.executionContext + val timeout = Timeout(3.seconds) + + "A FoldAsync" must { + val input = 1 to 100 + val expected = input.sum + val inputSource = Source(input) + val foldSource = inputSource.foldAsync[Int](0) { (a, b) ⇒ + Future(a + b) + } + val flowDelayMS = 100L + val foldFlow = Flow[Int].foldAsync(0) { + (a, b) ⇒ Future { Thread.sleep(flowDelayMS); a + b } + } + val foldSink = Sink.foldAsync[Int, Int](0) { (a, b) ⇒ Future(a + b) } + + "work when using Source.foldAsync" in assertAllStagesStopped { + foldSource.runWith(Sink.head).futureValue(timeout) should ===(expected) + } + + "work when using Sink.foldAsync" in assertAllStagesStopped { + inputSource.runWith(foldSink).futureValue(timeout) should ===(expected) + } + + "work when using Flow.foldAsync" in assertAllStagesStopped { + val flowTimeout = + Timeout((flowDelayMS * input.size).milliseconds + 3.seconds) + + inputSource.via(foldFlow).runWith(Sink.head). + futureValue(flowTimeout) should ===(expected) + } + + "work when using Source.foldAsync + Flow.foldAsync + Sink.foldAsync" in assertAllStagesStopped { + foldSource.via(foldFlow).runWith(foldSink). + futureValue(timeout) should ===(expected) + } + + "propagate an error" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFoldAsync[NotUsed](NotUsed)(noneAsync) + the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) + } + + "complete future with failure when folding function throws" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val future = inputSource.runFoldAsync(0) { (x, y) ⇒ + if (x > 50) Future.failed(error) else Future(x + y) + } + + the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) + } + + "not blow up with high request counts" in { + val probe = TestSubscriber.manualProbe[Long]() + var i = 0 + + Source.fromIterator(() ⇒ Iterator.fill[Int](10000) { i += 1; i }). + foldAsync(1L) { (a, b) ⇒ Future(a + b) }. + runWith(Sink.asPublisher(true)).subscribe(probe) + + val subscription = probe.expectSubscription() + subscription.request(Int.MaxValue) + + probe.expectNext(50005001L) + probe.expectComplete() + } + + "signal future failure" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + implicit val ec = system.dispatcher + Source(1 to 5).foldAsync(0) { (_, n) ⇒ + Future(if (n == 3) throw TE("err1") else n) + }.to(Sink.fromSubscriber(probe)).run() + + val sub = probe.expectSubscription() + sub.request(10) + probe.expectError().getMessage should be("err1") + } + + "signal error from foldAsync" in assertAllStagesStopped { + val latch = TestLatch(1) + val c = TestSubscriber.manualProbe[Int]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).mapAsync(4)(n ⇒ + if (n == 3) throw new RuntimeException("err2") with NoStackTrace + else { + Future { + Await.ready(latch, 10.seconds) + n + } + }). + to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectError().getMessage should be("err2") + latch.countDown() + } + + "resume after future failure" in assertAllStagesStopped { + val probe = TestSubscriber.probe[(Int, Int)]() + implicit val ec = system.dispatcher + Source(1 to 5).foldAsync(0 → 1) { + case ((i, res), n) ⇒ + Future { + if (n == 3) throw new RuntimeException("err3") with NoStackTrace + else n → (i + (res * n)) + } + }.withAttributes(supervisionStrategy(resumingDecider)). + to(Sink.fromSubscriber(probe)).run() + + val sub = probe.expectSubscription() + sub.request(10) + probe.expectNext(5 → 74) + probe.expectComplete() + } + + "restart after future failure" in assertAllStagesStopped { + val probe = TestSubscriber.probe[(Int, Int)]() + implicit val ec = system.dispatcher + Source(1 to 5).foldAsync(0 → 1) { + case ((i, res), n) ⇒ + Future { + if (n == 3) throw new RuntimeException("err3") with NoStackTrace + else n → (i + (res * n)) + } + }.withAttributes(supervisionStrategy(restartingDecider)). + to(Sink.fromSubscriber(probe)).run() + + val sub = probe.expectSubscription() + sub.request(10) + probe.expectNext(5 → 24) + probe.expectComplete() + } + + "resume after multiple failures" in assertAllStagesStopped { + val futures: List[Future[String]] = List( + Future.failed(Utils.TE("failure1")), + Future.failed(Utils.TE("failure2")), + Future.failed(Utils.TE("failure3")), + Future.failed(Utils.TE("failure4")), + Future.failed(Utils.TE("failure5")), + Future.successful("happy!")) + + Source(futures).mapAsync(2)(identity). + withAttributes(supervisionStrategy(resumingDecider)).runWith(Sink.head). + futureValue(timeout) should ===("happy!") + } + + "finish after future failure" in assertAllStagesStopped { + Source(1 to 3).foldAsync(1) { (_, n) ⇒ + Future { + if (n == 3) throw new RuntimeException("err3b") with NoStackTrace + else n + } + }.withAttributes(supervisionStrategy(resumingDecider)) + .grouped(10).runWith(Sink.head). + futureValue(Timeout(1.second)) should ===(Seq(2)) + } + + "resume when foldAsync throws" in { + val c = TestSubscriber.manualProbe[(Int, Int)]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).foldAsync(0 → 1) { + case ((i, res), n) ⇒ + if (n == 3) throw new RuntimeException("err4") with NoStackTrace + else Future(n → (i + (res * n))) + }.withAttributes(supervisionStrategy(resumingDecider)). + to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(5 → 74) + c.expectComplete() + } + + "restart when foldAsync throws" in { + val c = TestSubscriber.manualProbe[(Int, Int)]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).foldAsync(0 → 1) { + case ((i, res), n) ⇒ + if (n == 3) throw new RuntimeException("err4") with NoStackTrace + else Future(n → (i + (res * n))) + }.withAttributes(supervisionStrategy(restartingDecider)). + to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(5 → 24) + c.expectComplete() + } + + "signal NPE when future is completed with null" in { + val c = TestSubscriber.manualProbe[String]() + val p = Source(List("a", "b")).foldAsync("") { (_, elem) ⇒ + Future.successful(null.asInstanceOf[String]) + }.to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectError().getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) + } + + "resume when future is completed with null" in { + val c = TestSubscriber.manualProbe[String]() + val p = Source(List("a", "b", "c")).foldAsync("") { (str, elem) ⇒ + if (elem == "b") Future.successful(null.asInstanceOf[String]) + else Future.successful(str + elem) + }.withAttributes(supervisionStrategy(resumingDecider)). + to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectNext("ac") // 1: "" + "a"; 2: null => resume "a"; 3: "a" + "c" + c.expectComplete() + } + + "restart when future is completed with null" in { + val c = TestSubscriber.manualProbe[String]() + val p = Source(List("a", "b", "c")).foldAsync("") { (str, elem) ⇒ + if (elem == "b") Future.successful(null.asInstanceOf[String]) + else Future.successful(str + elem) + }.withAttributes(supervisionStrategy(restartingDecider)). + to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectNext("c") // 1: "" + "a"; 2: null => restart ""; 3: "" + "c" + c.expectComplete() + } + + "should handle cancel properly" in assertAllStagesStopped { + val pub = TestPublisher.manualProbe[Int]() + val sub = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(pub). + foldAsync(0) { (_, n) ⇒ Future.successful(n) }. + runWith(Sink.fromSubscriber(sub)) + + val upstream = pub.expectSubscription() + upstream.expectRequest() + + sub.expectSubscription().cancel() + + upstream.expectCancellation() + } + } + + // Keep + def noneAsync[L, R]: (L, R) ⇒ Future[NotUsed] = { (_: Any, _: Any) ⇒ + Future.successful(NotUsed) + }.asInstanceOf[(L, R) ⇒ Future[NotUsed]] + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index 542822a161..a71eda4145 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -19,9 +19,9 @@ class FlowFoldSpec extends StreamSpec { "A Fold" must { val input = 1 to 100 val expected = input.sum - val inputSource = Source(input).filter(_ ⇒ true).map(identity) - val foldSource = inputSource.fold[Int](0)(_ + _).filter(_ ⇒ true).map(identity) - val foldFlow = Flow[Int].filter(_ ⇒ true).map(identity).fold(0)(_ + _).filter(_ ⇒ true).map(identity) + val inputSource = Source(input) + val foldSource = inputSource.fold[Int](0)(_ + _) + val foldFlow = Flow[Int].fold(0)(_ + _) val foldSink = Sink.fold[Int, Int](0)(_ + _) "work when using Source.runFold" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 961a3b8a5a..e34ebc5871 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -45,6 +45,7 @@ object Stages { val dropWhile = name("dropWhile") val scan = name("scan") val fold = name("fold") + val foldAsync = name("foldAsync") val reduce = name("reduce") val intersperse = name("intersperse") val buffer = name("buffer") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index da46440efd..e2bcbe76d9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -161,6 +161,7 @@ final case class DropWhile[T](p: T ⇒ Boolean) extends GraphStage[FlowShape[T, */ abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) { private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + def withSupervision[T](f: () ⇒ T): Option[T] = try { Some(f()) } catch { case NonFatal(ex) ⇒ @@ -376,6 +377,8 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta val out = Outlet[Out]("Fold.out") override val shape: FlowShape[In, Out] = FlowShape(in, out) + override def toString: String = "Fold" + override val initialAttributes = DefaultAttributes.fold override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = @@ -419,6 +422,98 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta } } +/** + * INTERNAL API + */ +final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { + import akka.dispatch.ExecutionContexts + + val in = Inlet[In]("FoldAsync.in") + val out = Outlet[Out]("FoldAsync.out") + val shape = FlowShape.of(in, out) + + override def toString: String = "FoldAsync" + + override val initialAttributes = DefaultAttributes.foldAsync + + def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + + private var aggregator: Out = zero + private var aggregating: Future[Out] = Future.successful(aggregator) + + private def onRestart(t: Throwable): Unit = { + aggregator = zero + } + + private def ec = ExecutionContexts.sameThreadExecutionContext + + private val futureCB = getAsyncCallback[Try[Out]]((result: Try[Out]) ⇒ { + result match { + case Success(update) if update != null ⇒ { + aggregator = update + + if (isClosed(in)) { + push(out, update) + completeStage() + } else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in) + } + + case other ⇒ { + val ex = other match { + case Failure(t) ⇒ t + case Success(s) if s == null ⇒ + ReactiveStreamsCompliance.elementMustNotBeNullException + } + val supervision = decider(ex) + + if (supervision == Supervision.Stop) failStage(ex) + else { + if (supervision == Supervision.Restart) onRestart(ex) + + if (isClosed(in)) { + push(out, aggregator) + completeStage() + } else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in) + } + } + } + }).invoke _ + + def onPush(): Unit = { + try { + aggregating = f(aggregator, grab(in)) + + aggregating.value match { + case Some(result) ⇒ futureCB(result) // already completed + case _ ⇒ aggregating.onComplete(futureCB)(ec) + } + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case supervision ⇒ { + supervision match { + case Supervision.Restart ⇒ onRestart(ex) + case _ ⇒ () // just ignore on Resume + } + + tryPull(in) + } + } + } + } + + override def onUpstreamFinish(): Unit = {} + + def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in) + + setHandlers(in, out, this) + + override def toString = + s"FoldAsync.Logic(completed=${aggregating.isCompleted})" + } +} + /** * INTERNAL API */ @@ -954,8 +1049,8 @@ final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[O val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - var inFlight = 0 - var buffer: BufferImpl[Out] = _ + private var inFlight = 0 + private var buffer: BufferImpl[Out] = _ private[this] def todo = inFlight + buffer.used override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) @@ -993,6 +1088,7 @@ final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[O } if (todo < parallelism) tryPull(in) } + override def onUpstreamFinish(): Unit = { if (todo == 0) completeStage() } @@ -1000,6 +1096,7 @@ final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[O override def onPull(): Unit = { if (!buffer.isEmpty) push(out, buffer.dequeue()) else if (isClosed(in) && todo == 0) completeStage() + if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 0d88054317..392d7badb4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -568,6 +568,25 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but with an asynchronous function. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` returns a failure and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.foldAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `fold` but uses first element as zero element. * Applies the given function towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 61ed2566bb..b9ad9d4e2a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -29,6 +29,15 @@ object Sink { def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, CompletionStage[U]] = new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply).toCompletionStage()) + /** + * A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure is signaled in the stream. + */ + def foldAsync[U, In](zero: U, f: function.Function2[U, In, CompletionStage[U]]): javadsl.Sink[In, CompletionStage[U]] = new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).toScala).toCompletionStage()) + /** * A `Sink` that will invoke the given function for every received element, giving it its previous * output (from the second element) and the element as input. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 420b4edfe1..e5428cbfdb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -531,6 +531,16 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def runFold[U](zero: U, f: function.Function2[U, Out, U], materializer: Materializer): CompletionStage[U] = runWith(Sink.fold(zero, f), materializer) + /** + * Shortcut for running this `Source` with an asynchronous fold function. + * The given function is invoked for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure is signaled in the stream. + */ + def runFoldAsync[U](zero: U, f: function.Function2[U, Out, CompletionStage[U]], materializer: Materializer): CompletionStage[U] = runWith(Sink.foldAsync(zero, f), materializer) + /** * Shortcut for running this `Source` with a reduce function. * The given function is invoked for every received element, giving it its previous @@ -1201,6 +1211,25 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but with an asynchronous function. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` returns a failure and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.foldAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `fold` but uses first element as zero element. * Applies the given function towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 1e8e562253..cb315e5932 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -406,6 +406,25 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but with an asynchronous function. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` returns a failure and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.foldAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `fold` but uses first element as zero element. * Applies the given function towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index da0061adc9..abe7defdad 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -404,6 +404,25 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = new SubSource(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but with an asynchronous function. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` returns a failure and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = new SubSource(delegate.foldAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `fold` but uses first element as zero element. * Applies the given function towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 0db0efd21d..8b588e7e9b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -786,6 +786,27 @@ trait FlowOps[+Out, +Mat] { */ def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[T] = via(Fold(zero, f)) + /** + * Similar to `fold` but with an asynchronous function. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` returns a failure and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.fold]] + */ + def foldAsync[T](zero: T)(f: (T, Out) ⇒ Future[T]): Repr[T] = via(new FoldAsync(zero, f)) + /** * Similar to `fold` but uses first element as zero element. * Applies the given function towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala index bd3170a483..160cd2e72f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Materialization.scala @@ -3,6 +3,8 @@ */ package akka.stream.scaladsl +import scala.concurrent.Future + import akka.NotUsed /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index c82445586c..55cfb7d8cc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -244,10 +244,23 @@ object Sink { * The returned [[scala.concurrent.Future]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure signaled in the stream. + * + * @see [[#foldAsync]] */ def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] = Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink") + /** + * A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure signaled in the stream. + * + * @see [[#fold]] + */ + def foldAsync[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink") + /** * A `Sink` that will invoke the given function for every received element, giving it its previous * output (from the second element) and the element as input. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index c0549efb71..56d1a836a4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -88,8 +88,17 @@ final class Source[+Out, +Mat](override val module: Module) * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure signaled in the stream. */ - def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] = - runWith(Sink.fold(zero)(f)) + def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] = runWith(Sink.fold(zero)(f)) + + /** + * Shortcut for running this `Source` with a foldAsync function. + * The given function is invoked for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure signaled in the stream. + */ + def runFoldAsync[U](zero: U)(f: (U, Out) ⇒ Future[U])(implicit materializer: Materializer): Future[U] = runWith(Sink.foldAsync(zero)(f)) /** * Shortcut for running this `Source` with a reduce function. diff --git a/project/MiMa.scala b/project/MiMa.scala index 2dfed1cffe..e4e5a4c502 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -851,6 +851,9 @@ object MiMa extends AutoPlugin { // internal api FilterAnyProblemStartingWith("akka.stream.impl"), + // #20888 new FoldAsync op for Flow + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.foldAsync"), + // #20214 SNI disabling for single connections (AkkaSSLConfig being passed around) ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.ConnectionContext.sslConfig"), // class meant only for internal extension