feat: Optimize recoverWith stream operator for single source.

This commit is contained in:
He-Pin 2024-01-10 00:50:27 +08:00 committed by kerr
parent 468aab0c63
commit ccce5c0426
2 changed files with 21 additions and 2 deletions

View file

@ -62,6 +62,20 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectComplete()
}
"recover with single source" in {
Source(1 to 4)
.map { a =>
if (a == 3) throw ex else a
}
.recoverWith { case _: Throwable => Source.single(3) }
.runWith(TestSink[Int]())
.request(2)
.expectNextN(1 to 2)
.request(1)
.expectNext(3)
.expectComplete()
}
"cancel substream if parent is terminated when there is a handler" in {
Source(1 to 4)
.map { a =>

View file

@ -2174,8 +2174,13 @@ private[pekko] object TakeWithin {
case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) =>
completeStage()
case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
switchTo(other)
attempt += 1
TraversalBuilder.getSingleSource(other) match {
case OptionVal.Some(singleSource) =>
emit(out, singleSource.elem.asInstanceOf[T], () => completeStage())
case _ =>
switchTo(other)
attempt += 1
}
case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser
}
} else