=htc #19058 prevent errors to be logged for http timeouts
This commit is contained in:
parent
cabd445a59
commit
78b1cd7de4
4 changed files with 14 additions and 12 deletions
|
|
@ -78,7 +78,7 @@ private[http] object OutgoingConnectionBlueprint {
|
|||
val terminationFanout = b.add(Broadcast[HttpResponse](2))
|
||||
val terminationMerge = b.add(TerminationMerge)
|
||||
|
||||
val logger = b.add(ErrorHandling[ByteString]((t: Throwable) ⇒ log.error(t, "Outgoing request stream error")).named("errorLogger"))
|
||||
val logger = b.add(MapError[ByteString] { case t ⇒ log.error(t, "Outgoing request stream error"); t }.named("errorLogger"))
|
||||
val wrapTls = b.add(Flow[ByteString].map(SendBytes))
|
||||
terminationMerge.out ~> requestRendering ~> logger ~> wrapTls
|
||||
|
||||
|
|
|
|||
|
|
@ -240,15 +240,15 @@ private[http] object HttpServerBluePrint {
|
|||
|
||||
val responseRendererFactory = new HttpResponseRendererFactory(serverHeader, responseHeaderSizeHint, log)
|
||||
|
||||
val errorHandler: Throwable ⇒ Unit = {
|
||||
val errorHandler: PartialFunction[Throwable, Throwable] = {
|
||||
// idle timeouts should not result in errors in the log. See 19058.
|
||||
case timeout: HttpConnectionTimeoutException ⇒ log.debug(s"Closing HttpConnection due to timeout: ${timeout.getMessage}")
|
||||
case t ⇒ log.error(t, "Outgoing response stream error")
|
||||
case timeout: HttpConnectionTimeoutException ⇒ log.debug(s"Closing HttpConnection due to timeout: ${timeout.getMessage}"); timeout
|
||||
case t ⇒ log.error(t, "Outgoing response stream error"); t
|
||||
}
|
||||
|
||||
Flow[ResponseRenderingContext]
|
||||
.via(responseRendererFactory.renderer.named("renderer"))
|
||||
.via(ErrorHandling[ResponseRenderingOutput](errorHandler).named("errorLogger"))
|
||||
.via(MapError[ResponseRenderingOutput](errorHandler).named("errorLogger"))
|
||||
}
|
||||
|
||||
class RequestTimeoutSupport(initialTimeout: FiniteDuration)
|
||||
|
|
|
|||
|
|
@ -106,15 +106,17 @@ package util {
|
|||
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
private[http] final case class ErrorHandling[T](handler: Throwable ⇒ Unit) extends SimpleLinearGraphStage[T] {
|
||||
/**
|
||||
* Maps error with the provided function if it is defined for an error or, otherwise, passes it on unchanged.
|
||||
*/
|
||||
private[http] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] {
|
||||
override def createLogic(attr: Attributes) =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
handler(ex)
|
||||
super.onUpstreamFailure(ex)
|
||||
}
|
||||
override def onUpstreamFailure(ex: Throwable): Unit =
|
||||
if (f.isDefinedAt(ex)) super.onUpstreamFailure(f(ex))
|
||||
else super.onUpstreamFailure(ex)
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import akka.http.impl.engine.client._
|
|||
import akka.http.impl.engine.server._
|
||||
import akka.http.impl.engine.ws.WebSocketClientBlueprint
|
||||
import akka.http.impl.settings.{ ConnectionPoolSetup, HostConnectionPoolSetup }
|
||||
import akka.http.impl.util.StreamUtils
|
||||
import akka.http.impl.util.{ MapError, StreamUtils }
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers.Host
|
||||
import akka.http.scaladsl.model.ws.{ Message, WebSocketRequest, WebSocketUpgradeResponse }
|
||||
|
|
@ -85,7 +85,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
|
|||
connections.map {
|
||||
case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒
|
||||
val layer = serverLayer(settings, Some(remoteAddress), log)
|
||||
val flowWithTimeoutRecovered = flow.recover { case t: TimeoutException ⇒ throw new HttpConnectionTimeoutException(t.getMessage) }
|
||||
val flowWithTimeoutRecovered = flow.via(MapError { case t: TimeoutException ⇒ new HttpConnectionTimeoutException(t.getMessage) })
|
||||
IncomingConnection(localAddress, remoteAddress, layer atop tlsStage join flowWithTimeoutRecovered)
|
||||
}.mapMaterializedValue {
|
||||
_.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue