diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 91a02beb81..b1fb9040e9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -78,7 +78,7 @@ private[http] object OutgoingConnectionBlueprint { val terminationFanout = b.add(Broadcast[HttpResponse](2)) val terminationMerge = b.add(TerminationMerge) - val logger = b.add(ErrorHandling[ByteString]((t: Throwable) ⇒ log.error(t, "Outgoing request stream error")).named("errorLogger")) + val logger = b.add(MapError[ByteString] { case t ⇒ log.error(t, "Outgoing request stream error"); t }.named("errorLogger")) val wrapTls = b.add(Flow[ByteString].map(SendBytes)) terminationMerge.out ~> requestRendering ~> logger ~> wrapTls diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index caff5e35ca..7f0a5c83d1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -240,15 +240,15 @@ private[http] object HttpServerBluePrint { val responseRendererFactory = new HttpResponseRendererFactory(serverHeader, responseHeaderSizeHint, log) - val errorHandler: Throwable ⇒ Unit = { + val errorHandler: PartialFunction[Throwable, Throwable] = { // idle timeouts should not result in errors in the log. See 19058. - case timeout: HttpConnectionTimeoutException ⇒ log.debug(s"Closing HttpConnection due to timeout: ${timeout.getMessage}") - case t ⇒ log.error(t, "Outgoing response stream error") + case timeout: HttpConnectionTimeoutException ⇒ log.debug(s"Closing HttpConnection due to timeout: ${timeout.getMessage}"); timeout + case t ⇒ log.error(t, "Outgoing response stream error"); t } Flow[ResponseRenderingContext] .via(responseRendererFactory.renderer.named("renderer")) - .via(ErrorHandling[ResponseRenderingOutput](errorHandler).named("errorLogger")) + .via(MapError[ResponseRenderingOutput](errorHandler).named("errorLogger")) } class RequestTimeoutSupport(initialTimeout: FiniteDuration) diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 76f108d695..f213ebd17b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -106,15 +106,17 @@ package util { import akka.stream.{ Attributes, Outlet, Inlet, FlowShape } import scala.concurrent.duration.FiniteDuration - private[http] final case class ErrorHandling[T](handler: Throwable ⇒ Unit) extends SimpleLinearGraphStage[T] { + /** + * Maps error with the provided function if it is defined for an error or, otherwise, passes it on unchanged. + */ + private[http] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] { override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { override def onPush(): Unit = push(out, grab(in)) - override def onUpstreamFailure(ex: Throwable): Unit = { - handler(ex) - super.onUpstreamFailure(ex) - } + override def onUpstreamFailure(ex: Throwable): Unit = + if (f.isDefinedAt(ex)) super.onUpstreamFailure(f(ex)) + else super.onUpstreamFailure(ex) override def onPull(): Unit = pull(in) diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 4e71874794..fbc2a13f5d 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -15,7 +15,7 @@ import akka.http.impl.engine.client._ import akka.http.impl.engine.server._ import akka.http.impl.engine.ws.WebSocketClientBlueprint import akka.http.impl.settings.{ ConnectionPoolSetup, HostConnectionPoolSetup } -import akka.http.impl.util.StreamUtils +import akka.http.impl.util.{ MapError, StreamUtils } import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Host import akka.http.scaladsl.model.ws.{ Message, WebSocketRequest, WebSocketUpgradeResponse } @@ -85,7 +85,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte connections.map { case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ val layer = serverLayer(settings, Some(remoteAddress), log) - val flowWithTimeoutRecovered = flow.recover { case t: TimeoutException ⇒ throw new HttpConnectionTimeoutException(t.getMessage) } + val flowWithTimeoutRecovered = flow.via(MapError { case t: TimeoutException ⇒ new HttpConnectionTimeoutException(t.getMessage) }) IncomingConnection(localAddress, remoteAddress, layer atop tlsStage join flowWithTimeoutRecovered) }.mapMaterializedValue { _.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext)