!str Eagerly fails flow if the future is already failed.

This commit is contained in:
kerr 2022-11-14 13:12:46 +08:00
parent b95fab2b3b
commit eeaec22bd5
2 changed files with 19 additions and 13 deletions

View file

@ -20,6 +20,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import org.apache.pekko.stream.testkit._
import org.apache.pekko.stream.testkit.scaladsl.TestSink
@nowarn("msg=deprecated") // testing deprecated API
class FlowFromFutureSpec extends StreamSpec {
@ -42,6 +43,13 @@ class FlowFromFutureSpec extends StreamSpec {
c.expectSubscriptionAndError(ex)
}
"fails flow from already failed Future even no demands" in {
val ex = new RuntimeException("test") with NoStackTrace
val sub = Source.fromFuture(Future.failed[Int](ex))
.runWith(TestSink.probe)
sub.expectSubscriptionAndError(ex)
}
"produce one element when Future is completed" in {
val promise = Promise[Int]()
val c = TestSubscriber.manualProbe[Int]()

View file

@ -390,27 +390,25 @@ import pekko.stream.stage._
override def initialAttributes: Attributes = DefaultAttributes.futureSource
override def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = {
override def preStart(): Unit = {
future.value match {
case Some(completed) =>
// optimization if the future is already completed
onFutureCompleted(completed)
handle(completed)
case None =>
val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _
val cb = getAsyncCallback[Try[T]](handle).invoke _
future.onComplete(cb)(ExecutionContexts.parasitic)
}
def onFutureCompleted(result: Try[T]): Unit = {
result match {
case scala.util.Success(null) => completeStage()
case scala.util.Success(v) => emit(out, v, () => completeStage())
case scala.util.Failure(t) => failStage(t)
}
}
setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more
}
private def handle(result: Try[T]): Unit = result match {
case scala.util.Success(null) => completeStage()
case scala.util.Success(v) => emit(out, v, () => completeStage())
case scala.util.Failure(t) => failStage(t)
}
def onPull(): Unit = ()
setHandler(out, this)
}