diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 65d11c3b34..e51e934999 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -76,7 +76,6 @@ If the ``CompletionStage`` fails the stream is failed with that exception. **completes** after the ``CompletionStage`` has completed or when it fails - fromFuture ^^^^^^^^^^ Send the single value of the Scala ``Future`` when it completes and there is demand. @@ -86,6 +85,24 @@ If the future fails the stream is failed with that exception. **completes** after the future has completed +fromFutureSource +^^^^^^^^^^^^^^^^ +Streams the elements of the given future source once it successfully completes. +If the future fails the stream is failed. + +**emits** the next value from the `future` source, once it has completed + +**completes** after the `future` source completes + +fromSourceCompletionStage +^^^^^^^^^^^^^^^^^^^^^^^^^ +Streams the elements of an asynchronous source once its given `completion` stage completes. +If the `completion` fails the stream is failed with that exception. + +**emits** the next value from the asynchronous source, once its `completion stage` has completed + +**completes** after the asynchronous source completes + unfold ^^^^^^ Stream the result of a function as long as it returns a ``Optional``, the value inside the optional diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 8b3ffcf882..d14bfb4440 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -84,6 +84,23 @@ If the future fails the stream is failed with that exception. **completes** after the future has completed +fromFutureSource +^^^^^^^^^^^^^^^^ +Streams the elements of the given future source once it successfully completes. +If the future fails the stream is failed. + +**emits** the next value from the `future` source, once it has completed + +**completes** after the `future` source completes + +fromSourceCompletionStage +^^^^^^^^^^^^^^^^^^^^^^^^^ +Streams the elements of an asynchronous source once its given `completion` stage completes. +If the `completion` fails the stream is failed with that exception. + +**emits** the next value from the asynchronous source, once its `completion stage` has completed + +**completes** after the asynchronous source completes unfold ^^^^^^ diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index f3b93911f2..b556b892a2 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -29,6 +29,7 @@ import scala.concurrent.duration.FiniteDuration; import akka.testkit.AkkaJUnitActorSystemResource; import java.util.*; +import java.util.function.Supplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -381,6 +382,55 @@ public class FlowTest extends StreamTest { assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); } + @Test + public void mustBeAbleToUsefromSourceCompletionStage() throws Exception { + final Flow f1 = + Flow.of(String.class).via(FlowTest.this. op()).named("f1"); + + final Flow f2 = + Flow.of(String.class).via(FlowTest.this. op()).named("f2"); + + @SuppressWarnings("unused") + final Flow f3 = + Flow.of(String.class).via(FlowTest.this. op()).named("f3"); + + final Source in1 = Source.from(Arrays.asList("a", "b", "c")); + final Source in2 = Source.from(Arrays.asList("d", "e", "f")); + + final Sink> publisher = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT); + + final Graph, NotUsed> graph = Source.fromGraph( + GraphDSL.create(new Function, SourceShape>() { + @Override + public SourceShape apply(Builder b) + throws Exception { + final UniformFanInShape merge = + b.add(Merge.create(2)); + b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0)); + b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1)); + return new SourceShape(merge.out()); + } + })); + + final Supplier, NotUsed>> fn = + new Supplier, NotUsed>>() { + public Graph, NotUsed> get() { return graph; } + }; + + final CompletionStage, NotUsed>> stage = + CompletableFuture.supplyAsync(fn); + + final Source> source = + Source.fromSourceCompletionStage(stage); + + // collecting + final Publisher pub = source.runWith(publisher, materializer); + final CompletionStage> all = Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), materializer); + + final List result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); + } + @Test public void mustBeAbleToUseZip() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index 975e51fc50..fafdf269fd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -257,7 +257,6 @@ class ActorGraphInterpreterSpec extends StreamSpec { EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept { Await.result(Source.fromGraph(failyStage).runWith(Sink.ignore), 3.seconds) } - } "be able to properly handle case where a stage fails before subscription happens" in assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSpec.scala new file mode 100644 index 0000000000..a9ce59dd56 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSpec.scala @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import scala.concurrent.{ Await, ExecutionContext, Future, Promise } +import scala.concurrent.duration._ + +import java.util.concurrent.CompletionStage +import scala.compat.java8.FutureConverters._ + +import akka.NotUsed +import akka.stream._ +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } +import akka.stream.scaladsl._ +import akka.stream.impl.fusing.GraphStages.FutureFlattenSource + +import org.scalatest.concurrent.PatienceConfiguration.Timeout + +import akka.testkit.EventFilter +import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.stream.testkit.Utils.assertAllStagesStopped + +class FutureFlattenSpec extends StreamSpec { + implicit val materializer = ActorMaterializer() + + "Future source" must { + { + implicit def ec = materializer.executionContext + + "flatten elements" in assertAllStagesStopped { + val subSource: Source[Int, String] = + Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ "foo") + + val futureSource = new FutureFlattenSource(Future(subSource)) + val source: Source[Int, Future[String]] = Source.fromGraph(futureSource) + + val materialized = Promise[String]() + val watched: Source[Int, NotUsed] = source.watchTermination() { (m, d) ⇒ + materialized.completeWith(d.flatMap(_ ⇒ m)) + NotUsed + } + + val p = watched.runWith(Sink asPublisher false) + val c = TestSubscriber.manualProbe[Int]() + p.subscribe(c) + + val sub = c.expectSubscription() + sub.request(5) + + c.expectNext(1) + c.expectNext(2) + c.expectNext(3) + + c.expectComplete() + + materialized.future.futureValue(Timeout(3.seconds)) should ===("foo") + } + + "flatten elements from a completion stage" in assertAllStagesStopped { + val subSource: Graph[SourceShape[Int], Int] = + Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ 1) + + val future = Future(subSource) + val stage: CompletionStage[Graph[SourceShape[Int], Int]] = future.toJava + val g = Source.fromSourceCompletionStage(stage) + + val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run() + mat.toScala.futureValue should ===(1) + fut.futureValue should ===(List(1, 2, 3)) + } + } + + "be cancelled before the underlying Future completes" in { + assertAllStagesStopped { + val promise = Promise[Source[Int, Int]]() + val aside = Promise[Int]() + val result = Promise[akka.Done]() + def futureSource = Source.fromFutureSource( + promise.future).map { i ⇒ + aside.success(i); i // should never occur + }.watchTermination[Unit]() { + case (_, res) ⇒ result.completeWith(res); () + } + + futureSource.runWith(Sink.cancelled) should ===(NotUsed) + result.future.futureValue should ===(akka.Done) + aside.future.isCompleted should ===(false) + } + } + + "fails as the underlying Future is failed" in { + assertAllStagesStopped { + val promise = Promise[Source[Int, Int]]() + val result = Promise[akka.Done]() + def futureSource = Source.fromFutureSource(promise.future) + def sink = Sink.fold[Int, Int](1)(_ * _) + + promise.failure(new Exception("Foo")) + + futureSource.runWith(sink).failed.map(_.getMessage)( + materializer.executionContext).futureValue should ===("Foo") + } + } + + "applies back-pressure according future completion" in { + assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val underlying = Iterator.iterate(1)(_ + 1).take(3) + val promise = Promise[Source[Int, NotUsed]]() + val first = Promise[Unit]() + lazy val futureSource = + Source.fromFutureSource(promise.future).map { + case 1 ⇒ + first.success({}); 11 + case f ⇒ (f * 10) + 1 + } + + futureSource.runWith(Sink asPublisher true).subscribe(probe) + promise.isCompleted should ===(false) + + val sub = probe.expectSubscription() + + sub.request(5) + + promise.success(Source.fromIterator(() ⇒ underlying)) + + // First value + probe.expectNext(11) + first.future.futureValue should ===({}) + + probe.expectNext(21) + probe.expectNext(31) + probe.expectComplete() + + first.isCompleted should ===(true) + } + } + + "fail when the future source materialization fails" in { + implicit def ec = materializer.executionContext + + assertAllStagesStopped { + def underlying = Future(Source.single(100L). + mapMaterializedValue[String](_ ⇒ sys.error("MatEx"))) + + val aside = Promise[Long]() + def futureSource: Source[Long, Future[String]] = + Source.fromFutureSource(underlying). + map { i ⇒ aside.success(i); i } + + def graph = futureSource.toMat(Sink.last) { (m, _) ⇒ m } + + graph.run().failed.map(_.getMessage).futureValue should ===("MatEx") + aside.future.futureValue should ===(100L) + } + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 04f29d4cbd..82985a91b5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import akka.testkit.DefaultTimeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{ Span, Millis } -import scala.concurrent.{ Future, Await } +import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ import scala.util.Failure import scala.util.control.NoStackTrace @@ -48,7 +48,6 @@ class SourceSpec extends StreamSpec with DefaultTimeout { p.subscribe(c2) c2.expectSubscriptionAndError() } - } "Empty Source" must { @@ -417,5 +416,4 @@ class SourceSpec extends StreamSpec with DefaultTimeout { closed should ===(true) } } - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index e55a36d9a4..5441f7928c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -276,6 +276,71 @@ object GraphStages { override def toString: String = "SingleSource" } + final class FutureFlattenSource[T, M]( + val future: Future[Graph[SourceShape[T], M]]) + extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { + + ReactiveStreamsCompliance.requireNonNullElement(future) + + val out = Outlet[T]("futureFlatten.out") + val shape = SourceShape(out) + + override def initialAttributes = DefaultAttributes.futureSource + + def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[M]) = { + val materialized = Promise[M]() + + val logic = new GraphStageLogic(shape) with InHandler with OutHandler { + private val sinkIn = new SubSinkInlet[T]("FlattenMergeSink") + + private val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]] { + case scala.util.Success(graph) ⇒ { + setHandler(out, this) + sinkIn.setHandler(this) + + sinkIn.pull() + + val src = Source.fromGraph(graph) + val runnable = src.to(sinkIn.sink) + + try { + materialized.success(interpreter.subFusingMaterializer. + materialize(runnable, initialAttributes = attr)) + } catch { + case cause: Throwable ⇒ + materialized.failure(cause) + } + } + + case scala.util.Failure(t) ⇒ failStage(t) + }.invoke _ + + setHandler(out, new OutHandler { + def onPull(): Unit = + future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + }) + + def onPush(): Unit = { + if (isAvailable(out)) { + push(out, sinkIn.grab()) + sinkIn.pull() + } + } + + def onPull(): Unit = {} + + override def onUpstreamFinish(): Unit = + if (!sinkIn.isAvailable) completeStage() + + override def postStop(): Unit = sinkIn.cancel() + } + + (logic, materialized.future) + } + + override def toString: String = "FutureFlattenSource" + } + final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] { ReactiveStreamsCompliance.requireNonNullElement(future) val shape = SourceShape(Outlet[T]("future.out")) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 97d1f9e013..d34a0998cf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -171,7 +171,7 @@ object Source { new Source(scaladsl.Source.fromFuture(future)) /** - * Start a new `Source` from the given `CompletionStage`. The stream will consist of + * Starts a new `Source` from the given `CompletionStage`. The stream will consist of * one element when the `CompletionStage` is completed with a successful value, which * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `CompletionStage` is completed with a failure. @@ -179,6 +179,18 @@ object Source { def fromCompletionStage[O](future: CompletionStage[O]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.fromCompletionStage(future)) + /** + * Streams the elements of the given future source once it successfully completes. + * If the future fails the stream is failed. + */ + def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) + + /** + * Streams the elements of an asynchronous source once its given `completion` stage completes. + * If the `completion` fails the stream is failed with that exception. + */ + def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = new Source(scaladsl.Source.fromSourceCompletionStage(completion)) + /** * Elements are emitted periodically with the specified interval. * The tick element will be delivered to downstream consumers that has requested any elements. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 2e7a51a3c8..0f434810f9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -212,9 +212,9 @@ object Source { }) /** - * Create [[Source]] that will continually produce given elements in specified order. + * Creates [[Source]] that will continually produce given elements in specified order. * - * Start a new 'cycled' `Source` from the given elements. The producer stream of elements + * Starts a new 'cycled' `Source` from the given elements. The producer stream of elements * will continue infinitely by repeating the sequence of elements provided by function parameter. */ def cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] = { @@ -247,7 +247,7 @@ object Source { single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) /** - * Start a new `Source` from the given `Future`. The stream will consist of + * Starts a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. @@ -256,7 +256,7 @@ object Source { fromGraph(new FutureSource(future)) /** - * Start a new `Source` from the given `Future`. The stream will consist of + * Starts a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. @@ -264,6 +264,18 @@ object Source { def fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed] = fromGraph(new FutureSource(future.toScala)) + /** + * Streams the elements of the given future source once it successfully completes. + * If the future fails the stream is failed. + */ + def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]] = fromGraph(new FutureFlattenSource(future)) + + /** + * Streams the elements of an asynchronous source once its given `completion` stage completes. + * If the `completion` fails the stream is failed with that exception. + */ + def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava) + /** * Elements are emitted periodically with the specified interval. * The tick element will be delivered to downstream consumers that has requested any elements.