From eeaec22bd516e45db7536db7dc26567918c8dd91 Mon Sep 17 00:00:00 2001 From: kerr Date: Mon, 14 Nov 2022 13:12:46 +0800 Subject: [PATCH] !str Eagerly fails flow if the future is already failed. --- .../stream/scaladsl/FlowFromFutureSpec.scala | 8 +++++++ .../stream/impl/fusing/GraphStages.scala | 24 +++++++++---------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala index 48cde853b2..54709e4b07 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import org.apache.pekko.stream.testkit._ +import org.apache.pekko.stream.testkit.scaladsl.TestSink @nowarn("msg=deprecated") // testing deprecated API class FlowFromFutureSpec extends StreamSpec { @@ -42,6 +43,13 @@ class FlowFromFutureSpec extends StreamSpec { c.expectSubscriptionAndError(ex) } + "fails flow from already failed Future even no demands" in { + val ex = new RuntimeException("test") with NoStackTrace + val sub = Source.fromFuture(Future.failed[Int](ex)) + .runWith(TestSink.probe) + sub.expectSubscriptionAndError(ex) + } + "produce one element when Future is completed" in { val promise = Promise[Int]() val c = TestSubscriber.manualProbe[Int]() diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala index 036976b238..f3319b0a7e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala @@ -390,27 +390,25 @@ import pekko.stream.stage._ override def initialAttributes: Attributes = DefaultAttributes.futureSource override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with OutHandler { - def onPull(): Unit = { + override def preStart(): Unit = { future.value match { case Some(completed) => // optimization if the future is already completed - onFutureCompleted(completed) + handle(completed) case None => - val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _ + val cb = getAsyncCallback[Try[T]](handle).invoke _ future.onComplete(cb)(ExecutionContexts.parasitic) } - - def onFutureCompleted(result: Try[T]): Unit = { - result match { - case scala.util.Success(null) => completeStage() - case scala.util.Success(v) => emit(out, v, () => completeStage()) - case scala.util.Failure(t) => failStage(t) - } - } - - setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more } + private def handle(result: Try[T]): Unit = result match { + case scala.util.Success(null) => completeStage() + case scala.util.Success(v) => emit(out, v, () => completeStage()) + case scala.util.Failure(t) => failStage(t) + } + + def onPull(): Unit = () + setHandler(out, this) }