diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala index 3cd81a4077..b72ae91b1d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.testkit.EventFilter import scala.util.control.NoStackTrace @@ -64,5 +65,16 @@ class FlowRecoverSpec extends StreamSpec { .request(1) .expectComplete() } + + "not log error when exception is thrown from recover block" in assertAllStagesStopped { + val ex = new IndexOutOfBoundsException("quite intuitive") + EventFilter[IndexOutOfBoundsException](occurrences = 0).intercept { + Source + .failed(new IllegalStateException("expected illegal state")) + .recover { case _: IllegalStateException => throw ex } + .runWith(TestSink.probe[Int]) + .expectSubscriptionAndError(ex) + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 4d9a9e9d4d..bac398fd1d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -273,8 +273,8 @@ private[stream] object Collect { } } - override def onUpstreamFailure(ex: Throwable): Unit = { - pf.applyOrElse(ex, NotApplied) match { + override def onUpstreamFailure(ex: Throwable): Unit = + try pf.applyOrElse(ex, NotApplied) match { case NotApplied => failStage(ex) case result: T @unchecked => { if (isAvailable(out)) { @@ -284,8 +284,9 @@ private[stream] object Collect { recovered = Some(result) } } + } catch { + case NonFatal(ex) => failStage(ex) } - } setHandlers(in, out, this) }