From f51dc373be34a68fd4b573353cfffd804516cf6c Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 28 Aug 2019 10:44:02 +0200 Subject: [PATCH] Handle rethrows in `recover` more gracefully (#27506) The idea is that `.recover { xyz => throw newException }` is common enough not to log an ERROR message just because we didn't catch it in the Recover stage. On the other hand, using `mapError` can be a better choice if you just want to map the error (but there might be other occurrences where a partial function is not enough to avoid throwing an error from recover). --- .../scala/akka/stream/scaladsl/FlowRecoverSpec.scala | 12 ++++++++++++ .../src/main/scala/akka/stream/impl/fusing/Ops.scala | 7 ++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala index 3cd81a4077..b72ae91b1d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.testkit.EventFilter import scala.util.control.NoStackTrace @@ -64,5 +65,16 @@ class FlowRecoverSpec extends StreamSpec { .request(1) .expectComplete() } + + "not log error when exception is thrown from recover block" in assertAllStagesStopped { + val ex = new IndexOutOfBoundsException("quite intuitive") + EventFilter[IndexOutOfBoundsException](occurrences = 0).intercept { + Source + .failed(new IllegalStateException("expected illegal state")) + .recover { case _: IllegalStateException => throw ex } + .runWith(TestSink.probe[Int]) + .expectSubscriptionAndError(ex) + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 4d9a9e9d4d..bac398fd1d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -273,8 +273,8 @@ private[stream] object Collect { } } - override def onUpstreamFailure(ex: Throwable): Unit = { - pf.applyOrElse(ex, NotApplied) match { + override def onUpstreamFailure(ex: Throwable): Unit = + try pf.applyOrElse(ex, NotApplied) match { case NotApplied => failStage(ex) case result: T @unchecked => { if (isAvailable(out)) { @@ -284,8 +284,9 @@ private[stream] object Collect { recovered = Some(result) } } + } catch { + case NonFatal(ex) => failStage(ex) } - } setHandlers(in, out, this) }