diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala new file mode 100644 index 0000000000..ddb15501d1 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala @@ -0,0 +1,165 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.{ CompletableFuture, TimeUnit } + +import akka.stream._ +import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue } +import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import akka.testkit.TestLatch +import akka.{ Done, NotUsed } + +import scala.concurrent.{ Await, Future, Promise } + +class FutureFlattenSourceSpec extends StreamSpec { + + implicit val materializer = ActorMaterializer() + implicit def ec = system.dispatcher + + "Future source" must { + + val underlying: Source[Int, String] = + Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ "foo") + + "emit the elements of the future source" in assertAllStagesStopped { + + val sourcePromise = Promise[Source[Int, String]]() + val (sourceMatVal, sinkMatVal) = + Source.fromFutureSource(sourcePromise.future) + .toMat(Sink.seq)(Keep.both) + .run() + + sourcePromise.success(underlying) + // should complete as soon as inner source has been materialized + sourceMatVal.futureValue should ===("foo") + sinkMatVal.futureValue should ===(List(1, 2, 3)) + } + + "emit the elements from a source in a completion stage" in assertAllStagesStopped { + val (sourceMatVal, sinkMatVal) = + Source.fromSourceCompletionStage( + // can't be inferred + CompletableFuture.completedFuture[Graph[SourceShape[Int], String]](underlying) + ).toMat(Sink.seq)(Keep.both) + .run() + + sourceMatVal.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS) should ===("foo") + sinkMatVal.futureValue should ===(List(1, 2, 3)) + } + + "handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped { + val sourcePromise = Promise[Source[Int, Int]]() + + val (sourceMatVal, termination) = + Source.fromFutureSource(sourcePromise.future) + .watchTermination()(Keep.both) + .to(Sink.cancelled) + .run() + + // wait for cancellation to occur + termination.futureValue should ===(Done) + sourceMatVal.failed.futureValue.getMessage should ===("Downstream cancelled before future source completed") + } + + "fail if the underlying Future is failed" in assertAllStagesStopped { + val failure = TE("foo") + val underlying = Future.failed[Source[Int, String]](failure) + val (sourceMatVal, sinkMatVal) = Source.fromFutureSource(underlying).toMat(Sink.seq)(Keep.both).run() + sourceMatVal.failed.futureValue should ===(failure) + sinkMatVal.failed.futureValue should ===(failure) + } + + "fail as the underlying Future fails after outer source materialization" in assertAllStagesStopped { + val failure = TE("foo") + val sourcePromise = Promise[Source[Int, String]]() + val materializationLatch = TestLatch(1) + val (sourceMatVal, sinkMatVal) = + Source.fromFutureSource(sourcePromise.future) + .mapMaterializedValue { value ⇒ + materializationLatch.countDown() + value + } + .toMat(Sink.seq)(Keep.both) + .run() + + // we don't know that materialization completed yet (this is still a bit racy) + Await.ready(materializationLatch, remainingOrDefault) + Thread.sleep(100) + sourcePromise.failure(failure) + + sourceMatVal.failed.futureValue should ===(failure) + sinkMatVal.failed.futureValue should ===(failure) + } + + "fail as the underlying Future fails after outer source materialization with no demand" in assertAllStagesStopped { + val failure = TE("foo") + val sourcePromise = Promise[Source[Int, String]]() + val testProbe = TestSubscriber.probe[Int]() + val sourceMatVal = + Source.fromFutureSource(sourcePromise.future) + .to(Sink.fromSubscriber(testProbe)) + .run() + + testProbe.expectSubscription() + sourcePromise.failure(failure) + + sourceMatVal.failed.futureValue should ===(failure) + } + + "handle back-pressure when the future completes" in assertAllStagesStopped { + val subscriber = TestSubscriber.probe[Int]() + val publisher = TestPublisher.probe[Int]() + + val sourcePromise = Promise[Source[Int, String]]() + + val matVal = Source.fromFutureSource(sourcePromise.future) + .to(Sink.fromSubscriber(subscriber)) + .run() + + subscriber.ensureSubscription() + + sourcePromise.success(Source.fromPublisher(publisher).mapMaterializedValue(_ ⇒ "woho")) + + // materialized value completes but still no demand + matVal.futureValue should ===("woho") + + // then demand and let an element through to see it works + subscriber.ensureSubscription() + subscriber.request(1) + publisher.expectRequest() + publisher.sendNext(1) + subscriber.expectNext(1) + publisher.sendComplete() + subscriber.expectComplete() + } + + class FailingMatGraphStage extends GraphStageWithMaterializedValue[SourceShape[Int], String] { + val out = Outlet[Int]("whatever") + override val shape: SourceShape[Int] = SourceShape(out) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, String) = { + throw TE("argh, materialization failed") + } + + } + + // Behaviour when inner source throws during materialization is undefined (leaks ActorGraphInterpreters) + // until ticket #22358 has been fixed, this test fails because of it + "fail when the future source materialization fails" in pendingUntilFixed(assertAllStagesStopped { + val failure = TE("MatEx") + + val (sourceMatVal, sinkMatVal) = + Source.fromFutureSource( + Future.successful(Source.fromGraph(new FailingMatGraphStage)) + ).toMat(Sink.seq)(Keep.both) + .run() + + sinkMatVal.failed.futureValue should ===(failure) + println(sourceMatVal.futureValue) + sourceMatVal.failed.futureValue should ===(failure) + + }) + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSpec.scala deleted file mode 100644 index a9ce59dd56..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSpec.scala +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package akka.stream.scaladsl - -import scala.concurrent.{ Await, ExecutionContext, Future, Promise } -import scala.concurrent.duration._ - -import java.util.concurrent.CompletionStage -import scala.compat.java8.FutureConverters._ - -import akka.NotUsed -import akka.stream._ -import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } -import akka.stream.scaladsl._ -import akka.stream.impl.fusing.GraphStages.FutureFlattenSource - -import org.scalatest.concurrent.PatienceConfiguration.Timeout - -import akka.testkit.EventFilter -import akka.stream.testkit.{ StreamSpec, TestSubscriber } -import akka.stream.testkit.Utils.assertAllStagesStopped - -class FutureFlattenSpec extends StreamSpec { - implicit val materializer = ActorMaterializer() - - "Future source" must { - { - implicit def ec = materializer.executionContext - - "flatten elements" in assertAllStagesStopped { - val subSource: Source[Int, String] = - Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ "foo") - - val futureSource = new FutureFlattenSource(Future(subSource)) - val source: Source[Int, Future[String]] = Source.fromGraph(futureSource) - - val materialized = Promise[String]() - val watched: Source[Int, NotUsed] = source.watchTermination() { (m, d) ⇒ - materialized.completeWith(d.flatMap(_ ⇒ m)) - NotUsed - } - - val p = watched.runWith(Sink asPublisher false) - val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) - - val sub = c.expectSubscription() - sub.request(5) - - c.expectNext(1) - c.expectNext(2) - c.expectNext(3) - - c.expectComplete() - - materialized.future.futureValue(Timeout(3.seconds)) should ===("foo") - } - - "flatten elements from a completion stage" in assertAllStagesStopped { - val subSource: Graph[SourceShape[Int], Int] = - Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ 1) - - val future = Future(subSource) - val stage: CompletionStage[Graph[SourceShape[Int], Int]] = future.toJava - val g = Source.fromSourceCompletionStage(stage) - - val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run() - mat.toScala.futureValue should ===(1) - fut.futureValue should ===(List(1, 2, 3)) - } - } - - "be cancelled before the underlying Future completes" in { - assertAllStagesStopped { - val promise = Promise[Source[Int, Int]]() - val aside = Promise[Int]() - val result = Promise[akka.Done]() - def futureSource = Source.fromFutureSource( - promise.future).map { i ⇒ - aside.success(i); i // should never occur - }.watchTermination[Unit]() { - case (_, res) ⇒ result.completeWith(res); () - } - - futureSource.runWith(Sink.cancelled) should ===(NotUsed) - result.future.futureValue should ===(akka.Done) - aside.future.isCompleted should ===(false) - } - } - - "fails as the underlying Future is failed" in { - assertAllStagesStopped { - val promise = Promise[Source[Int, Int]]() - val result = Promise[akka.Done]() - def futureSource = Source.fromFutureSource(promise.future) - def sink = Sink.fold[Int, Int](1)(_ * _) - - promise.failure(new Exception("Foo")) - - futureSource.runWith(sink).failed.map(_.getMessage)( - materializer.executionContext).futureValue should ===("Foo") - } - } - - "applies back-pressure according future completion" in { - assertAllStagesStopped { - val probe = TestSubscriber.probe[Int]() - val underlying = Iterator.iterate(1)(_ + 1).take(3) - val promise = Promise[Source[Int, NotUsed]]() - val first = Promise[Unit]() - lazy val futureSource = - Source.fromFutureSource(promise.future).map { - case 1 ⇒ - first.success({}); 11 - case f ⇒ (f * 10) + 1 - } - - futureSource.runWith(Sink asPublisher true).subscribe(probe) - promise.isCompleted should ===(false) - - val sub = probe.expectSubscription() - - sub.request(5) - - promise.success(Source.fromIterator(() ⇒ underlying)) - - // First value - probe.expectNext(11) - first.future.futureValue should ===({}) - - probe.expectNext(21) - probe.expectNext(31) - probe.expectComplete() - - first.isCompleted should ===(true) - } - } - - "fail when the future source materialization fails" in { - implicit def ec = materializer.executionContext - - assertAllStagesStopped { - def underlying = Future(Source.single(100L). - mapMaterializedValue[String](_ ⇒ sys.error("MatEx"))) - - val aside = Promise[Long]() - def futureSource: Source[Long, Future[String]] = - Source.fromFutureSource(underlying). - map { i ⇒ aside.success(i); i } - - def graph = futureSource.toMat(Sink.last) { (m, _) ⇒ m } - - graph.run().failed.map(_.getMessage).futureValue should ===("MatEx") - aside.future.futureValue should ===(100L) - } - } - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 5441f7928c..d52279fe81 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -282,57 +282,69 @@ object GraphStages { ReactiveStreamsCompliance.requireNonNullElement(future) - val out = Outlet[T]("futureFlatten.out") - val shape = SourceShape(out) + val out = Outlet[T]("FutureFlattenSource.out") + override val shape = SourceShape(out) override def initialAttributes = DefaultAttributes.futureSource - def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[M]) = { + override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[M]) = { val materialized = Promise[M]() val logic = new GraphStageLogic(shape) with InHandler with OutHandler { - private val sinkIn = new SubSinkInlet[T]("FlattenMergeSink") + private val sinkIn = new SubSinkInlet[T]("FutureFlattenSource.in") - private val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]] { - case scala.util.Success(graph) ⇒ { - setHandler(out, this) - sinkIn.setHandler(this) - - sinkIn.pull() - - val src = Source.fromGraph(graph) - val runnable = src.to(sinkIn.sink) - - try { - materialized.success(interpreter.subFusingMaterializer. - materialize(runnable, initialAttributes = attr)) - } catch { - case cause: Throwable ⇒ - materialized.failure(cause) - } - } - - case scala.util.Failure(t) ⇒ failStage(t) - }.invoke _ + override def preStart(): Unit = { + val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _ + future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + } + // initial handler (until future completes) setHandler(out, new OutHandler { - def onPull(): Unit = - future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + def onPull(): Unit = {} + + override def onDownstreamFinish(): Unit = { + materialized.tryFailure(new RuntimeException("Downstream cancelled before future source completed")) + super.onDownstreamFinish() + } }) def onPush(): Unit = { - if (isAvailable(out)) { - push(out, sinkIn.grab()) - sinkIn.pull() - } + push(out, sinkIn.grab()) } - def onPull(): Unit = {} + def onPull(): Unit = { + sinkIn.pull() + } override def onUpstreamFinish(): Unit = - if (!sinkIn.isAvailable) completeStage() + completeStage() - override def postStop(): Unit = sinkIn.cancel() + override def postStop(): Unit = { + // I don't think this can happen, but just to be sure we don't leave the matval promise unfulfilled + materialized.tryFailure(new RuntimeException("FutureFlattenSource stage stopped without materialization of inner source completing")) + if (!sinkIn.isClosed) sinkIn.cancel() + } + + def onFutureSourceCompleted(result: Try[Graph[SourceShape[T], M]]): Unit = { + result.map { graph ⇒ + val runnable = Source.fromGraph(graph).toMat(sinkIn.sink)(Keep.left) + val matVal = interpreter.subFusingMaterializer.materialize(runnable, initialAttributes = attr) + materialized.success(matVal) + + setHandler(out, this) + sinkIn.setHandler(this) + + if (isAvailable(out)) { + sinkIn.pull() + } + + }.recover { + case t ⇒ + sinkIn.cancel() + materialized.failure(t) + failStage(t) + } + } } (logic, materialized.future)