Handle rethrows in recover more gracefully (#27506)
The idea is that `.recover { xyz => throw newException }` is common enough
not to log an ERROR message just because we didn't catch it in the Recover stage.
On the other hand, using `mapError` can be a better choice if you just want to
map the error (but there might be other occurrences where a partial function is not
enough to avoid throwing an error from recover).
This commit is contained in:
parent
c03cec2979
commit
f51dc373be
2 changed files with 16 additions and 3 deletions
|
|
@ -8,6 +8,7 @@ import akka.stream.testkit.StreamSpec
|
||||||
import akka.stream.testkit.scaladsl.TestSink
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||||
import akka.stream.testkit.scaladsl.StreamTestKit._
|
import akka.stream.testkit.scaladsl.StreamTestKit._
|
||||||
|
import akka.testkit.EventFilter
|
||||||
|
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
|
|
||||||
|
|
@ -64,5 +65,16 @@ class FlowRecoverSpec extends StreamSpec {
|
||||||
.request(1)
|
.request(1)
|
||||||
.expectComplete()
|
.expectComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not log error when exception is thrown from recover block" in assertAllStagesStopped {
|
||||||
|
val ex = new IndexOutOfBoundsException("quite intuitive")
|
||||||
|
EventFilter[IndexOutOfBoundsException](occurrences = 0).intercept {
|
||||||
|
Source
|
||||||
|
.failed(new IllegalStateException("expected illegal state"))
|
||||||
|
.recover { case _: IllegalStateException => throw ex }
|
||||||
|
.runWith(TestSink.probe[Int])
|
||||||
|
.expectSubscriptionAndError(ex)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -273,8 +273,8 @@ private[stream] object Collect {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
override def onUpstreamFailure(ex: Throwable): Unit =
|
||||||
pf.applyOrElse(ex, NotApplied) match {
|
try pf.applyOrElse(ex, NotApplied) match {
|
||||||
case NotApplied => failStage(ex)
|
case NotApplied => failStage(ex)
|
||||||
case result: T @unchecked => {
|
case result: T @unchecked => {
|
||||||
if (isAvailable(out)) {
|
if (isAvailable(out)) {
|
||||||
|
|
@ -284,8 +284,9 @@ private[stream] object Collect {
|
||||||
recovered = Some(result)
|
recovered = Some(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch {
|
||||||
|
case NonFatal(ex) => failStage(ex)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
setHandlers(in, out, this)
|
setHandlers(in, out, this)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue